Skip to content

Commit

Permalink
Retry failed CloudFetch downloads
Browse files Browse the repository at this point in the history
Signed-off-by: Levko Kravets <[email protected]>
  • Loading branch information
kravets-levko committed Jul 2, 2024
1 parent 9536e8d commit cdebaf4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/databricks/sql/cloudfetch/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ def __init__(
if link.rowCount <= 0:
continue
logger.debug(
"ResultFileDownloadManager.add_file_links: start offset {}, row count: {}".format(
"ResultFileDownloadManager: adding file link, start offset {}, row count: {}".format(
link.startRowOffset, link.rowCount
)
)
self._pending_links.append(link)

self._download_tasks: List[Future[DownloadedFile]] = []
self._max_download_threads: int = max_download_threads + 1
self._max_download_threads: int = max_download_threads
self._thread_pool = ThreadPoolExecutor(max_workers=self._max_download_threads)

self._downloadable_result_settings = DownloadableResultSettings(lz4_compressed)
Expand Down
34 changes: 25 additions & 9 deletions src/databricks/sql/cloudfetch/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import dataclass

import requests
from requests.adapters import HTTPAdapter, Retry
import lz4.frame
import time

Expand All @@ -11,6 +12,17 @@

logger = logging.getLogger(__name__)

# TODO: Ideally, we should use a common retry policy (DatabricksRetryPolicy) for all the requests across the library.
# But DatabricksRetryPolicy should be updated first - currently it can work only with Thrift requests
retryPolicy = Retry(
total=5, # max retry attempts
backoff_factor=1, # min delay, 1 second
backoff_max=60, # max delay, 60 seconds
# retry all status codes below 100, 429 (Too Many Requests), and all codes above 500,
# excluding 501 Not implemented
status_forcelist=[*range(0, 101), 429, 500, *range(502, 1000)],
)


@dataclass
class DownloadedFile:
Expand Down Expand Up @@ -63,23 +75,26 @@ def run(self) -> DownloadedFile:
file, and signals to waiting threads that the download is finished and whether it was successful.
"""

logger.debug(
"ResultSetDownloadHandler: starting file download, offset {}, row count {}".format(
self.link.startRowOffset, self.link.rowCount
)
)

# Check if link is already expired or is expiring
ResultSetDownloadHandler._validate_link(
self.link, self.settings.link_expiry_buffer_secs
)

session = requests.Session()
session.timeout = self.settings.download_timeout
# TODO: Retry:
# from requests.adapters import HTTPAdapter, Retry
# retries = Retry(total=5,
# backoff_factor=0.1,
# status_forcelist=[ 500, 502, 503, 504 ])
# session.mount('http://', HTTPAdapter(max_retries=retries))
session.mount("http://", HTTPAdapter(max_retries=retryPolicy))
session.mount("https://", HTTPAdapter(max_retries=retryPolicy))

try:
# Get the file via HTTP request
response = session.get(self.link.fileLink)
response = session.get(
self.link.fileLink, timeout=self.settings.download_timeout
)
response.raise_for_status()

# Save (and decompress if needed) the downloaded file
Expand Down Expand Up @@ -110,7 +125,8 @@ def run(self) -> DownloadedFile:
self.link.rowCount,
)
finally:
session and session.close()
if session:
session.close()

@staticmethod
def _validate_link(link: TSparkArrowResultLink, expiry_buffer_secs: int):
Expand Down

0 comments on commit cdebaf4

Please sign in to comment.