Skip to content

Commit

Permalink
Enable v3 retries by default (#282)
Browse files Browse the repository at this point in the history
Signed-off-by: Jesse Whitehouse <[email protected]>
  • Loading branch information
Jesse authored Nov 17, 2023
1 parent 64924a6 commit 5ed45f5
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 13 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
- Caching metadata calls
- Enable cloud fetch by default. To disable, set `use_cloud_fetch=False` when building `databricks.sql.client`.
- Add integration tests for Databricks UC Volumes ingestion queries
- Add `_retry_max_redirects` config
- Retries:
- Add `_retry_max_redirects` config
- Set `_enable_v3_retries=True` and warn if users override it

## 2.9.3 (2023-08-24)

Expand Down
14 changes: 11 additions & 3 deletions src/databricks/sql/auth/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class CommandType(Enum):
EXECUTE_STATEMENT = "ExecuteStatement"
CLOSE_SESSION = "CloseSession"
CLOSE_OPERATION = "CloseOperation"
GET_OPERATION_STATUS = "GetOperationStatus"
OTHER = "Other"

@classmethod
Expand Down Expand Up @@ -314,9 +315,9 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
2. The request received a 501 (Not Implemented) status code
Because this request can never succeed.
3. The request received a 404 (Not Found) code and the request CommandType
was CloseSession or CloseOperation. This code indicates that the session
or cursor was already closed. Further retries will always return the same
code.
was GetOperationStatus, CloseSession or CloseOperation. This code indicates
that the command, session or cursor was already closed. Further retries will
always return the same code.
4. The request CommandType was ExecuteStatement and the HTTP code does not
appear in the default status_forcelist or force_dangerous_codes list. By
default, this means ExecuteStatement is only retried for codes 429 and 503.
Expand All @@ -343,6 +344,13 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
if not self._is_method_retryable(method): # type: ignore
return False, "Only POST requests are retried"

# Request failed with 404 and was a GetOperationStatus. This is not recoverable. Don't retry.
if status_code == 404 and self.command_type == CommandType.GET_OPERATION_STATUS:
return (
False,
"GetOperationStatus received 404 code from Databricks. Operation was canceled.",
)

# Request failed with 404 because CloseSession returns 404 if you repeat the request.
if (
status_code == 404
Expand Down
17 changes: 12 additions & 5 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def __init__(
# (defaults to 900)
# _enable_v3_retries
# Whether to use the DatabricksRetryPolicy implemented in urllib3
# (defaults to False)
# (defaults to True)
# _retry_max_redirects
# An integer representing the maximum number of redirects to follow for a request.
# This number must be <= _retry_stop_after_attempts_count.
Expand Down Expand Up @@ -185,7 +185,13 @@ def __init__(
self._auth_provider = auth_provider

# Connector version 3 retry approach
self.enable_v3_retries = kwargs.get("_enable_v3_retries", False)
self.enable_v3_retries = kwargs.get("_enable_v3_retries", True)

if not self.enable_v3_retries:
logger.warning(
"Legacy retry behavior is enabled for this connection."
" This behaviour is deprecated and will be removed in a future release."
)
self.force_dangerous_codes = kwargs.get("_retry_dangerous_codes", [])

additional_transport_args = {}
Expand Down Expand Up @@ -396,9 +402,6 @@ def attempt_request(attempt):

response = method(request)

# Calling `close()` here releases the active HTTP connection back to the pool
self._transport.close()

# We need to call type(response) here because thrift doesn't implement __name__ attributes for thrift responses
logger.debug(
"Received response: {}(<REDACTED>)".format(type(response).__name__)
Expand Down Expand Up @@ -460,6 +463,10 @@ def attempt_request(attempt):
error_message = ThriftBackend._extract_error_message_from_headers(
getattr(self._transport, "headers", {})
)
finally:
# Calling `close()` here releases the active HTTP connection back to the pool
self._transport.close()

return RequestErrorInfo(
error=error,
error_message=error_message,
Expand Down
17 changes: 16 additions & 1 deletion tests/e2e/common/retry_test_mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ class PySQLRetryTestsMixin:

# For testing purposes
_retry_policy = {
"_enable_v3_retries": True,
"_retry_delay_min": 0.1,
"_retry_delay_max": 5,
"_retry_stop_after_attempts_count": 5,
Expand Down Expand Up @@ -424,3 +423,19 @@ def test_retry_max_redirects_exceeds_max_attempts_count_warns_user(self):
expected_message_was_found = target in log

assert expected_message_was_found, "Did not find expected log messages"

def test_retry_legacy_behavior_warns_user(self):
with self.assertLogs(
"databricks.sql",
level="WARN",
) as cm:
with self.connection(
extra_params={**self._retry_policy, "_enable_v3_retries": False}
):
expected_message_was_found = False
for log in cm.output:
if expected_message_was_found:
break
target = "Legacy retry behavior is enabled for this connection."
expected_message_was_found = target in log
assert expected_message_was_found, "Did not find expected log messages"
7 changes: 4 additions & 3 deletions tests/e2e/test_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@

from tests.e2e.common.uc_volume_tests import PySQLUCVolumeTestSuiteMixin

from databricks.sql.exc import SessionAlreadyClosedError

log = logging.getLogger(__name__)

unsafe_logger = logging.getLogger("databricks.sql.unsafe")
Expand Down Expand Up @@ -699,10 +701,9 @@ def test_close_connection_closes_cursors(self):
conn.close()

# When connection closes, any cursor operations should no longer exist at the server
with self.assertRaises(thrift.Thrift.TApplicationException) as cm:
with self.assertRaises(SessionAlreadyClosedError) as cm:
op_status_at_server = ars.thrift_backend._client.GetOperationStatus(status_request)
if hasattr(cm, "exception"):
assert "RESOURCE_DOES_NOT_EXIST" in cm.exception.message



def test_closing_a_closed_connection_doesnt_fail(self):
Expand Down

0 comments on commit 5ed45f5

Please sign in to comment.