Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Jan 24, 2024
2 parents c656b0f + 17f6d5d commit e4b803d
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@
- Fix: Python models authentication could be overridden by a `.netrc` file in the user's home directory ([338](https://github.com/databricks/dbt-databricks/pull/338))
- Fix: MV/ST REST api authentication could be overriden by a `.netrc` file in the user's home directory ([555](https://github.com/databricks/dbt-databricks/pull/555))
- Show details in connection errors ([562](https://github.com/databricks/dbt-databricks/pull/562))
- Updated connection debugging logging and setting connection last used time on session open.([565](https://github.com/databricks/dbt-databricks/pull/565))

### Under the Hood

- Adding retries around API calls in python model submission ([549](https://github.com/databricks/dbt-databricks/pull/549))
- Upgrade to databricks-sql-connector 3.0.0 ([554](https://github.com/databricks/dbt-databricks/pull/554))
- Pinning pandas to < 2.2.0 to keep from breaking multiple tests ([564](https://github.com/databricks/dbt-databricks/pull/554))

## dbt-databricks 1.7.3 (Dec 12, 2023)

Expand Down
55 changes: 39 additions & 16 deletions dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -771,9 +771,10 @@ class DatabricksDBTConnection(Connection):
# the next time it is used.
language: Optional[str] = None

session_id: Optional[str] = None

def _acquire(self, node: Optional[ResultNode]) -> None:
"""Indicate that this connection is in use."""
logger.debug(f"DatabricksDBTConnection._acquire: {self._get_conn_info_str()}")
self._log_usage(node)
self.acquire_release_count += 1
if self.last_used_time is None:
Expand All @@ -783,9 +784,10 @@ def _acquire(self, node: Optional[ResultNode]) -> None:
else:
self.language = None

self._log_info("_acquire")

def _release(self) -> None:
"""Indicate that this connection is not in use."""
logger.debug(f"DatabricksDBTConnection._release: {self._get_conn_info_str()}")
# Need to check for > 0 because in some situations the dbt code will make an extra
# release call on a connection.
if self.acquire_release_count > 0:
Expand All @@ -796,6 +798,8 @@ def _release(self) -> None:
if self.acquire_release_count == 0 and self.language != "python":
self.last_used_time = time.time()

self._log_info("_release")

def _get_idle_time(self) -> float:
return 0 if self.last_used_time is None else time.time() - self.last_used_time

Expand All @@ -805,11 +809,15 @@ def _idle_too_long(self) -> bool:
def _get_conn_info_str(self) -> str:
"""Generate a string describing this connection."""
return (
f"name: {self.name}, thread: {self.thread_identifier}, "
f"compute: `{self.compute_name}`, acquire_release_count: {self.acquire_release_count},"
f" idle time: {self._get_idle_time()}s, language: {self.language}"
f"sess: {self.session_id}, name: {self.name}, "
f"idle: {self._get_idle_time()}s, acqrelcnt: {self.acquire_release_count}, "
f"lang: {self.language}, thrd: {self.thread_identifier}, "
f"cmpt: `{self.compute_name}`, lut: {self.last_used_time}"
)

def _log_info(self, caller: Optional[str]) -> None:
logger.debug(f"conn: {id(self)}: {caller} {self._get_conn_info_str()}")

def _log_usage(self, node: Optional[ResultNode]) -> None:
if node:
if not self.compute_name:
Expand All @@ -826,8 +834,9 @@ def _log_usage(self, node: Optional[ResultNode]) -> None:
logger.debug(f"Thread {self.thread_identifier} using default compute resource.")

def _reset_handle(self, open: Callable[[Connection], Connection]) -> None:
logger.debug(f"DatabricksDBTConnection._reset_handle: {self._get_conn_info_str()}")
self._log_info("_reset_handle")
self.handle = LazyHandle(open)
self.session_id = None
# Reset last_used_time to None because by refreshing this connection becomes associated
# with a new session that hasn't been used yet.
self.last_used_time = None
Expand Down Expand Up @@ -1016,10 +1025,11 @@ def _update_compute_connection(
# Found a connection and nothing to do, so just return it
return conn

orig_conn_name: str = conn.name or ""

if conn.state != ConnectionState.OPEN:
conn.handle = LazyHandle(self._open2)
if conn.name != new_name:
orig_conn_name: str = conn.name or ""
conn.name = new_name
fire_event(ConnectionReused(orig_conn_name=orig_conn_name, conn_name=new_name))

Expand All @@ -1028,7 +1038,7 @@ def _update_compute_connection(
self.clear_thread_connection()
self.set_thread_connection(conn)

logger.debug(f"Reusing DatabricksDBTConnection. {conn._get_conn_info_str()}")
conn._log_info(f"reusing connection {orig_conn_name}")

return conn

Expand All @@ -1044,10 +1054,7 @@ def _create_compute_connection(

# Create a new connection
compute_name = _get_compute_name(node=node) or ""
logger.debug(
f"Creating DatabricksDBTConnection. name: {conn_name}, "
f"thread: {self.get_thread_identifier()}, compute: `{compute_name}`"
)

conn = DatabricksDBTConnection(
type=Identifier(self.TYPE),
name=conn_name,
Expand All @@ -1063,6 +1070,9 @@ def _create_compute_connection(
conn.max_idle_time = _get_max_idle_time(node=node, creds=creds)

conn.handle = LazyHandle(self._open2)

conn._log_info("Creating DatabricksDBTConnection")

# Add this connection to the thread/compute connection pool.
self._add_compute_connection(conn)
# Remove the connection currently in use by this thread from the thread connection pool.
Expand Down Expand Up @@ -1129,6 +1139,8 @@ def _cleanup_idle_connections(self) -> None:
# if different models use different compute resources
thread_conns = self._get_compute_connections()
for conn in thread_conns.values():
conn._log_info("idle check connection:")

# Generally speaking we only want to close/refresh the connection if the
# acquire_release_count is zero. i.e. the connection is not currently in use.
# However python models acquire a connection then run the pyton model, which
Expand All @@ -1142,15 +1154,18 @@ def _cleanup_idle_connections(self) -> None:
if (
conn.acquire_release_count == 0 or conn.language == "python"
) and conn._idle_too_long():
logger.debug(f"closing idle connection: {conn._get_conn_info_str()}")
conn._log_info("closing idle connection")
self.close(conn)
conn._reset_handle(self._open2)

def get_thread_connection(self) -> Connection:
conn = super().get_thread_connection()
dbr_conn = cast(DatabricksDBTConnection, conn)
dbr_conn._log_info("get_thread_connection:")
if USE_LONG_SESSIONS:
self._cleanup_idle_connections()

return super().get_thread_connection()
return conn

def add_query(
self,
Expand Down Expand Up @@ -1380,8 +1395,10 @@ def _open2(cls, connection: Connection) -> Connection:
USE_LONG_SESSIONS
), "This path, '_open2', should only be reachable with USE_LONG_SESSIONS"

databricks_connection = cast(DatabricksDBTConnection, connection)

if connection.state == ConnectionState.OPEN:
logger.debug("Connection is already open, skipping open.")
databricks_connection._log_info("Connection is already open, skipping open.")
return connection

creds: DatabricksCredentials = connection.credentials
Expand All @@ -1404,7 +1421,7 @@ def _open2(cls, connection: Connection) -> Connection:

# If a model specifies a compute resource the http path
# may be different than the http_path property of creds.
http_path = cast(DatabricksDBTConnection, connection).http_path
http_path = databricks_connection.http_path

def connect() -> DatabricksSQLConnectionWrapper:
try:
Expand All @@ -1421,6 +1438,12 @@ def connect() -> DatabricksSQLConnectionWrapper:
_user_agent_entry=user_agent_entry,
**connection_parameters,
)

if conn:
databricks_connection.session_id = conn.get_session_id_hex()
databricks_connection.last_used_time = time.time()
databricks_connection._log_info("session opened")

return DatabricksSQLConnectionWrapper(
conn,
is_cluster=creds.cluster_id is not None,
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ databricks-sql-connector>=3.0.0, <3.1.0
dbt-spark~=1.7.1
databricks-sdk>=0.9.0, <0.16.0
keyring>=23.13.0
pandas<2.2.0
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def _get_plugin_version() -> str:
"databricks-sql-connector>=3.0.0, <3.1.0",
"databricks-sdk>=0.9.0, <0.16.0",
"keyring>=23.13.0",
"pandas<2.2.0",
],
zip_safe=False,
classifiers=[
Expand Down

0 comments on commit e4b803d

Please sign in to comment.