Skip to content

Commit

Permalink
Upload to AWS with multiple threads for speed. (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
sylmak authored Aug 5, 2024
1 parent 5616eb1 commit f211625
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 8 deletions.
28 changes: 21 additions & 7 deletions cloud_functions/climateiq_export_to_aws_cf/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from concurrent import futures
from datetime import datetime
import logging
import os

import boto3
from botocore import config as boto_config
import flask
import functions_framework
from google.cloud import storage
Expand Down Expand Up @@ -52,21 +54,33 @@ def export_to_aws(request: flask.Request) -> tuple[str, int]:
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
# Avoid "Connection pool is full" errors. max_workers of ThreadPoolExecutor
# will be 32 (the default value).
config=boto_config.Config(max_pool_connections=32),
)
for i, blob in enumerate(blobs_to_export):
logging.info(f"Exporting {blob.name} ({i + 1}/{total_blobs})...")
with blob.open("rb") as fd:
s3_client.upload_fileobj(fd, S3_BUCKET_NAME, f"{curr_time_str}/{blob.name}")
blob.metadata = {"export_time": curr_time_str}
blob.patch()
logging.info(f"Successfully exported {blob.name} ({i + 1}/{total_blobs}).")
upload_futures = []
with futures.ThreadPoolExecutor() as executor:
for blob in blobs_to_export:
upload_futures.append(
executor.submit(_export_blob, blob, s3_client, curr_time_str)
)
futures.wait(upload_futures, return_when=futures.FIRST_EXCEPTION)
for future in upload_futures:
future.result()
return (
f"Successfully exported {total_blobs} CSV files to ClimaSens "
f"({S3_BUCKET_NAME}/{curr_time_str}).\n",
200,
)


def _export_blob(blob: storage.Blob, s3_client: boto3.client, curr_time_str: str):
with blob.open("rb") as fd:
s3_client.upload_fileobj(fd, S3_BUCKET_NAME, f"{curr_time_str}/{blob.name}")
blob.metadata = {"export_time": curr_time_str}
blob.patch()


def _get_prefix_id(request: flask.Request) -> str:
req_json = request.get_json(silent=True)
if req_json is None or "prefix" not in req_json:
Expand Down
7 changes: 6 additions & 1 deletion cloud_functions/climateiq_export_to_aws_cf/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
boto3==1.34.145
black==24.4.2
blinker==1.8.2
boto3==1.34.153
botocore==1.34.153
cachetools==5.4.0
certifi==2024.7.4
charset-normalizer==3.3.2
Expand All @@ -24,6 +25,7 @@ gunicorn==22.0.0
idna==3.7
itsdangerous==2.2.0
Jinja2==3.1.4
jmespath==1.0.1
MarkupSafe==2.1.5
mccabe==0.7.0
mypy==1.10.1
Expand All @@ -37,8 +39,11 @@ pyasn1==0.6.0
pyasn1_modules==0.4.0
pycodestyle==2.12.0
pyflakes==3.2.0
python-dateutil==2.9.0.post0
requests==2.32.3
rsa==4.9
s3transfer==0.10.2
six==1.16.0
typing_extensions==4.12.2
urllib3==2.2.2
watchdog==4.0.1
Expand Down

0 comments on commit f211625

Please sign in to comment.