diff --git a/cloud_functions/climateiq_export_to_aws_cf/main.py b/cloud_functions/climateiq_export_to_aws_cf/main.py index 3129d62..0f1112f 100644 --- a/cloud_functions/climateiq_export_to_aws_cf/main.py +++ b/cloud_functions/climateiq_export_to_aws_cf/main.py @@ -1,8 +1,10 @@ +from concurrent import futures from datetime import datetime import logging import os import boto3 +from botocore import config as boto_config import flask import functions_framework from google.cloud import storage @@ -52,14 +54,19 @@ def export_to_aws(request: flask.Request) -> tuple[str, int]: "s3", aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, + # Avoid "Connection pool is full" errors. max_workers of ThreadPoolExecutor + # will be 32 (the default value). + config=boto_config.Config(max_pool_connections=32), ) - for i, blob in enumerate(blobs_to_export): - logging.info(f"Exporting {blob.name} ({i + 1}/{total_blobs})...") - with blob.open("rb") as fd: - s3_client.upload_fileobj(fd, S3_BUCKET_NAME, f"{curr_time_str}/{blob.name}") - blob.metadata = {"export_time": curr_time_str} - blob.patch() - logging.info(f"Successfully exported {blob.name} ({i + 1}/{total_blobs}).") + upload_futures = [] + with futures.ThreadPoolExecutor() as executor: + for blob in blobs_to_export: + upload_futures.append( + executor.submit(_export_blob, blob, s3_client, curr_time_str) + ) + futures.wait(upload_futures, return_when=futures.FIRST_EXCEPTION) + for future in upload_futures: + future.result() return ( f"Successfully exported {total_blobs} CSV files to ClimaSens " f"({S3_BUCKET_NAME}/{curr_time_str}).\n", @@ -67,6 +74,13 @@ def export_to_aws(request: flask.Request) -> tuple[str, int]: ) +def _export_blob(blob: storage.Blob, s3_client: boto3.client, curr_time_str: str): + with blob.open("rb") as fd: + s3_client.upload_fileobj(fd, S3_BUCKET_NAME, f"{curr_time_str}/{blob.name}") + blob.metadata = {"export_time": curr_time_str} + blob.patch() + + def _get_prefix_id(request: flask.Request) -> str: req_json = request.get_json(silent=True) if req_json is None or "prefix" not in req_json: diff --git a/cloud_functions/climateiq_export_to_aws_cf/requirements.txt b/cloud_functions/climateiq_export_to_aws_cf/requirements.txt index 366eed2..8b7d112 100644 --- a/cloud_functions/climateiq_export_to_aws_cf/requirements.txt +++ b/cloud_functions/climateiq_export_to_aws_cf/requirements.txt @@ -1,6 +1,7 @@ -boto3==1.34.145 black==24.4.2 blinker==1.8.2 +boto3==1.34.153 +botocore==1.34.153 cachetools==5.4.0 certifi==2024.7.4 charset-normalizer==3.3.2 @@ -24,6 +25,7 @@ gunicorn==22.0.0 idna==3.7 itsdangerous==2.2.0 Jinja2==3.1.4 +jmespath==1.0.1 MarkupSafe==2.1.5 mccabe==0.7.0 mypy==1.10.1 @@ -37,8 +39,11 @@ pyasn1==0.6.0 pyasn1_modules==0.4.0 pycodestyle==2.12.0 pyflakes==3.2.0 +python-dateutil==2.9.0.post0 requests==2.32.3 rsa==4.9 +s3transfer==0.10.2 +six==1.16.0 typing_extensions==4.12.2 urllib3==2.2.2 watchdog==4.0.1