Skip to content

Commit

Permalink
fix(sqllab): reinstate "Force trino client async execution"
Browse files Browse the repository at this point in the history
Reinstate the previous fix and adjust the default sqlite connection
string to prevent the issue causing us to revert it

SQLite seems to be a bit pedantic about multi-threaded access here, and
seems to think we're sharing its objects between threads, though we
haven't actually accessed the sqlite database from the extra thread.

We can prevent it being so pedantic with the check_same_thread=false
option, so configure that by default with our basic development sqlite
backend.
  • Loading branch information
giftig committed Oct 18, 2023
1 parent e58a3ab commit 6d4efca
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 24 deletions.
6 changes: 3 additions & 3 deletions docs/docs/databases/installing-database-drivers.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,12 @@ Some of the recommended packages are shown below. Please refer to [setup.py](htt
| [Oracle](/docs/databases/oracle) | `pip install cx_Oracle` | `oracle://` |
| [PostgreSQL](/docs/databases/postgres) | `pip install psycopg2` | `postgresql://<UserName>:<DBPassword>@<Database Host>/<Database Name>` |
| [Presto](/docs/databases/presto) | `pip install pyhive` | `presto://` |
| [Rockset](/docs/databases/rockset) | `pip install rockset-sqlalchemy` | `rockset://<api_key>:@<api_server>`
| [Rockset](/docs/databases/rockset) | `pip install rockset-sqlalchemy` | `rockset://<api_key>:@<api_server>` |
| [SAP Hana](/docs/databases/hana) | `pip install hdbcli sqlalchemy-hana or pip install apache-superset[hana]` | `hana://{username}:{password}@{host}:{port}` |
| [StarRocks](/docs/databases/starrocks) | `pip install starrocks` | `starrocks://<User>:<Password>@<Host>:<Port>/<Catalog>.<Database>` |
| [Snowflake](/docs/databases/snowflake) | `pip install snowflake-sqlalchemy` | `snowflake://{user}:{password}@{account}.{region}/{database}?role={role}&warehouse={warehouse}` |
| SQLite | No additional library needed | `sqlite://` |
| [SQL Server](/docs/databases/sql-server) | `pip install pymssql` | `mssql+pymssql://` |
| SQLite | No additional library needed | `sqlite://path/to/file.db?check_same_thread=false` |
| [SQL Server](/docs/databases/sql-server) | `pip install pymssql` | `mssql+pymssql://` |
| [Teradata](/docs/databases/teradata) | `pip install teradatasqlalchemy` | `teradatasql://{user}:{password}@{host}` |
| [TimescaleDB](/docs/databases/timescaledb) | `pip install psycopg2` | `postgresql://<UserName>:<DBPassword>@<Database Host>:<Port>/<Database Name>` |
| [Trino](/docs/databases/trino) | `pip install trino` | `trino://{username}:{password}@{hostname}:{port}/{catalog}` |
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/frequently-asked-questions.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ Another workaround is to change where superset stores the sqlite database by add
`superset_config.py`:

```
SQLALCHEMY_DATABASE_URI = 'sqlite:////new/location/superset.db'
SQLALCHEMY_DATABASE_URI = 'sqlite:////new/location/superset.db?check_same_thread=false'
```

You can read more about customizing Superset using the configuration file
Expand Down
4 changes: 3 additions & 1 deletion docs/docs/installation/configuring-superset.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ SECRET_KEY = 'YOUR_OWN_RANDOM_GENERATED_SECRET_KEY'
# superset metadata (slices, connections, tables, dashboards, ...).
# Note that the connection information to connect to the datasources
# you want to explore are managed directly in the web UI
SQLALCHEMY_DATABASE_URI = 'sqlite:////path/to/superset.db'
# The check_same_thread=false property ensures the sqlite client does not attempt
# to enforce single-threaded access, which may be problematic in some edge cases
SQLALCHEMY_DATABASE_URI = 'sqlite:////path/to/superset.db?check_same_thread=false'
# Flask-WTF flag for CSRF
WTF_CSRF_ENABLED = True
Expand Down
5 changes: 4 additions & 1 deletion superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@ def _try_json_readsha(filepath: str, length: int) -> str | None:
SECRET_KEY = os.environ.get("SUPERSET_SECRET_KEY") or CHANGE_ME_SECRET_KEY

# The SQLAlchemy connection string.
SQLALCHEMY_DATABASE_URI = "sqlite:///" + os.path.join(DATA_DIR, "superset.db")
SQLALCHEMY_DATABASE_URI = (
f"""sqlite:///{os.path.join(DATA_DIR, "superset.db")}?check_same_thread=false"""
)

# SQLALCHEMY_DATABASE_URI = 'mysql://myapp@localhost/myapp'
# SQLALCHEMY_DATABASE_URI = 'postgresql://root:password@localhost/myapp'

Expand Down
18 changes: 18 additions & 0 deletions superset/db_engine_specs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,24 @@ def handle_cursor(cls, cursor: Any, query: Query, session: Session) -> None:
query object"""
# TODO: Fix circular import error caused by importing sql_lab.Query

@classmethod
def execute_with_cursor(
cls, cursor: Any, sql: str, query: Query, session: Session
) -> None:
"""
Trigger execution of a query and handle the resulting cursor.
For most implementations this just makes calls to `execute` and
`handle_cursor` consecutively, but in some engines (e.g. Trino) we may
need to handle client limitations such as lack of async support and
perform a more complicated operation to get information from the cursor
in a timely manner and facilitate operations such as query stop
"""
logger.debug("Query %d: Running query: %s", query.id, sql)
cls.execute(cursor, sql, async_=True)
logger.debug("Query %d: Handling cursor", query.id)
cls.handle_cursor(cursor, query, session)

@classmethod
def extract_error_message(cls, ex: Exception) -> str:
return f"{cls.engine} error: {cls._extract_error_message(ex)}"
Expand Down
66 changes: 60 additions & 6 deletions superset/db_engine_specs/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import contextlib
import logging
import threading
import time
from typing import Any, TYPE_CHECKING

import simplejson as json
Expand Down Expand Up @@ -151,14 +153,21 @@ def get_tracking_url(cls, cursor: Cursor) -> str | None:

@classmethod
def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:
if tracking_url := cls.get_tracking_url(cursor):
query.tracking_url = tracking_url
"""
Handle a trino client cursor.
WARNING: if you execute a query, it will block until complete and you
will not be able to handle the cursor until complete. Use
`execute_with_cursor` instead, to handle this asynchronously.
"""

# Adds the executed query id to the extra payload so the query can be cancelled
query.set_extra_json_key(
key=QUERY_CANCEL_KEY,
value=(cancel_query_id := cursor.stats["queryId"]),
)
cancel_query_id = cursor.query_id
logger.debug("Query %d: queryId %s found in cursor", query.id, cancel_query_id)
query.set_extra_json_key(key=QUERY_CANCEL_KEY, value=cancel_query_id)

if tracking_url := cls.get_tracking_url(cursor):
query.tracking_url = tracking_url

session.commit()

Expand All @@ -173,6 +182,51 @@ def handle_cursor(cls, cursor: Cursor, query: Query, session: Session) -> None:

super().handle_cursor(cursor=cursor, query=query, session=session)

@classmethod
def execute_with_cursor(
cls, cursor: Any, sql: str, query: Query, session: Session
) -> None:
"""
Trigger execution of a query and handle the resulting cursor.
Trino's client blocks until the query is complete, so we need to run it
in another thread and invoke `handle_cursor` to poll for the query ID
to appear on the cursor in parallel.
"""
execute_result: dict[str, Any] = {}

def _execute(results: dict[str, Any]) -> None:
logger.debug("Query %d: Running query: %s", query.id, sql)

# Pass result / exception information back to the parent thread
try:
cls.execute(cursor, sql)
results["complete"] = True
except Exception as ex: # pylint: disable=broad-except
results["complete"] = True
results["error"] = ex

execute_thread = threading.Thread(target=_execute, args=(execute_result,))
execute_thread.start()

# Wait for a query ID to be available before handling the cursor, as
# it's required by that method; it may never become available on error.
while not cursor.query_id and not execute_result.get("complete"):
time.sleep(0.1)

logger.debug("Query %d: Handling cursor", query.id)
cls.handle_cursor(cursor, query, session)

# Block until the query completes; same behaviour as the client itself
logger.debug("Query %d: Waiting for query to complete", query.id)
while not execute_result.get("complete"):
time.sleep(0.5)

# Unfortunately we'll mangle the stack trace due to the thread, but
# throwing the original exception allows mapping database errors as normal
if err := execute_result.get("error"):
raise err

@classmethod
def prepare_cancel_query(cls, query: Query, session: Session) -> None:
if QUERY_CANCEL_KEY not in query.extra:
Expand Down
7 changes: 2 additions & 5 deletions superset/sql_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def get_sql_results( # pylint: disable=too-many-arguments
return handle_query_error(ex, query, session)


def execute_sql_statement( # pylint: disable=too-many-arguments,too-many-statements
def execute_sql_statement( # pylint: disable=too-many-arguments
sql_statement: str,
query: Query,
session: Session,
Expand Down Expand Up @@ -271,10 +271,7 @@ def execute_sql_statement( # pylint: disable=too-many-arguments,too-many-statem
)
session.commit()
with stats_timing("sqllab.query.time_executing_query", stats_logger):
logger.debug("Query %d: Running query: %s", query.id, sql)
db_engine_spec.execute(cursor, sql, async_=True)
logger.debug("Query %d: Handling cursor", query.id)
db_engine_spec.handle_cursor(cursor, query, session)
db_engine_spec.execute_with_cursor(cursor, sql, query, session)

with stats_timing("sqllab.query.time_fetching_results", stats_logger):
logger.debug(
Expand Down
31 changes: 30 additions & 1 deletion tests/unit_tests/db_engine_specs/test_trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ def test_handle_cursor_early_cancel(
query_id = "myQueryId"

cursor_mock = engine_mock.return_value.__enter__.return_value
cursor_mock.stats = {"queryId": query_id}
cursor_mock.query_id = query_id
session_mock = mocker.MagicMock()

query = Query()
Expand All @@ -366,3 +366,32 @@ def test_handle_cursor_early_cancel(
assert cancel_query_mock.call_args[1]["cancel_query_id"] == query_id
else:
assert cancel_query_mock.call_args is None


def test_execute_with_cursor_in_parallel(mocker: MockerFixture):
"""Test that `execute_with_cursor` fetches query ID from the cursor"""
from superset.db_engine_specs.trino import TrinoEngineSpec

query_id = "myQueryId"

mock_cursor = mocker.MagicMock()
mock_cursor.query_id = None

mock_query = mocker.MagicMock()
mock_session = mocker.MagicMock()

def _mock_execute(*args, **kwargs):
mock_cursor.query_id = query_id

mock_cursor.execute.side_effect = _mock_execute

TrinoEngineSpec.execute_with_cursor(
cursor=mock_cursor,
sql="SELECT 1 FROM foo",
query=mock_query,
session=mock_session,
)

mock_query.set_extra_json_key.assert_called_once_with(
key=QUERY_CANCEL_KEY, value=query_id
)
10 changes: 4 additions & 6 deletions tests/unit_tests/sql_lab_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def test_execute_sql_statement(mocker: MockerFixture, app: None) -> None:
)

database.apply_limit_to_sql.assert_called_with("SELECT 42 AS answer", 2, force=True)
db_engine_spec.execute.assert_called_with(
cursor, "SELECT 42 AS answer LIMIT 2", async_=True
db_engine_spec.execute_with_cursor.assert_called_with(
cursor, "SELECT 42 AS answer LIMIT 2", query, session
)
SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)

Expand Down Expand Up @@ -106,10 +106,8 @@ def test_execute_sql_statement_with_rls(
101,
force=True,
)
db_engine_spec.execute.assert_called_with(
cursor,
"SELECT * FROM sales WHERE organization_id=42 LIMIT 101",
async_=True,
db_engine_spec.execute_with_cursor.assert_called_with(
cursor, "SELECT * FROM sales WHERE organization_id=42 LIMIT 101", query, session
)
SupersetResultSet.assert_called_with([(42,)], cursor.description, db_engine_spec)

Expand Down

0 comments on commit 6d4efca

Please sign in to comment.