Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incorrect rows in inline fetch result #479

Merged
merged 12 commits into from
Dec 22, 2024
7 changes: 7 additions & 0 deletions src/databricks/sql/auth/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class CommandType(Enum):
CLOSE_SESSION = "CloseSession"
CLOSE_OPERATION = "CloseOperation"
GET_OPERATION_STATUS = "GetOperationStatus"
FETCH_RESULTS_INLINE_FETCH_NEXT = "FetchResultsInline_FETCH_NEXT"
OTHER = "Other"

@classmethod
Expand Down Expand Up @@ -362,6 +363,12 @@ def should_retry(self, method: str, status_code: int) -> Tuple[bool, str]:
if status_code == 501:
raise NonRecoverableNetworkError("Received code 501 from server.")

if self.command_type == CommandType.FETCH_RESULTS_INLINE_FETCH_NEXT:
return (
False,
"FetchResults in INLINE mode with FETCH_NEXT orientation are not idempotent and is not retried",
)

jprakash-db marked this conversation as resolved.
Show resolved Hide resolved
# Request failed and this method is not retryable. We only retry POST requests.
if not self._is_method_retryable(method):
return False, "Only POST requests are retried"
Expand Down
18 changes: 16 additions & 2 deletions src/databricks/sql/thrift_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ def __init__(
)

# Cloud fetch
self._use_cloud_fetch = kwargs.get("use_cloud_fetch", True)
jprakash-db marked this conversation as resolved.
Show resolved Hide resolved

self.max_download_threads = kwargs.get("max_download_threads", 10)

self._ssl_options = ssl_options
Expand Down Expand Up @@ -374,6 +376,18 @@ def attempt_request(attempt):

# These three lines are no-ops if the v3 retry policy is not in use
if self.enable_v3_retries:
# Not to retry when FetchResults in INLINE mode when it has orientation as FETCH_NEXT as it is not idempotent
if (
this_method_name == "FetchResults"
and self._use_cloud_fetch == False
):
this_method_name += (
"Inline_"
+ ttypes.TFetchOrientation._VALUES_TO_NAMES[
request.orientation
]
)

this_command_type = CommandType.get(this_method_name)
self._transport.set_retry_command_type(this_command_type)
self._transport.startRetryTimer()
Expand Down Expand Up @@ -1046,8 +1060,8 @@ def fetch_results(

resp = self.make_request(self._client.FetchResults, req)
if resp.results.startRowOffset > expected_row_start_offset:
logger.warning(
"Expected results to start from {} but they instead start at {}".format(
raise DataError(
"fetch_results failed due to inconsistency in the state between the client and the server. Expected results to start from {} but they instead start at {}, some result batches must have been skipped".format(
expected_row_start_offset, resp.results.startRowOffset
)
)
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,9 @@ def test_sleep__retry_after_present(self, t_mock, retry_policy, error_history):
retry_policy.history = [error_history, error_history, error_history]
retry_policy.sleep(HTTPResponse(status=503, headers={"Retry-After": "3"}))
t_mock.assert_called_with(3)

def test_not_retryable__fetch_results_orientation_fetch_next(self, retry_policy):
HTTP_STATUS_CODES = [200, 429, 503, 504]
retry_policy.command_type = CommandType.FETCH_RESULTS_INLINE_FETCH_NEXT
for status_code in HTTP_STATUS_CODES:
assert not retry_policy.is_retry("METHOD_NAME", status_code=status_code)
Loading