diff --git a/cloud_functions/climateiq_trigger_export_pipeline_cf/Dockerfile b/cloud_functions/climateiq_trigger_export_pipeline_cf/Dockerfile new file mode 100644 index 0000000..26a76fa --- /dev/null +++ b/cloud_functions/climateiq_trigger_export_pipeline_cf/Dockerfile @@ -0,0 +1,34 @@ +# Copyright 2020 Google, LLC. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Use the official Python image. +# https://hub.docker.com/_/python +FROM python:3.12-slim + +# Allow statements and log messages to immediately appear in the Cloud Run logs +ENV PYTHONUNBUFFERED True + +# Copy application dependency manifests to the container image. +# Copying this separately prevents re-running pip install on every code change. +COPY requirements.txt ./ + +# Install production dependencies. +RUN pip install -r requirements.txt + +ENV PYTHONUNBUFFERED True + +# Copy local code to the container image. +ENV APP_HOME /app +WORKDIR $APP_HOME +COPY . ./ diff --git a/cloud_functions/climateiq_trigger_export_pipeline_cf/main.py b/cloud_functions/climateiq_trigger_export_pipeline_cf/main.py index e9bd81a..0a49de8 100644 --- a/cloud_functions/climateiq_trigger_export_pipeline_cf/main.py +++ b/cloud_functions/climateiq_trigger_export_pipeline_cf/main.py @@ -2,10 +2,9 @@ import pathlib import json import os +import sys import time -from cloudevents import http -import functions_framework from google.cloud import tasks_v2 from google.cloud.storage import client as gcs_client, retry @@ -24,6 +23,10 @@ SPATIALIZE_CF_QUEUE = "spatialize-chunk-predictions-queue" +def _write_structured_log(message: str, severity: str = "INFO"): + print(json.dumps(dict(message=message, severity=severity)), flush=True) + + def _write_file(line: str, output_filename: str, storage_client: gcs_client.Client): output_blob = storage_client.bucket(OUTPUT_BUCKET_NAME).blob(output_filename) # Specify retry here due to bug: @@ -31,8 +34,7 @@ def _write_file(line: str, output_filename: str, storage_client: gcs_client.Clie output_blob.upload_from_string(line, retry=retry.DEFAULT_RETRY) -@functions_framework.cloud_event -def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None: +def trigger_export_pipeline(object_name: str) -> None: """Triggered by writes to the "climateiq-predictions" bucket. Splits predictions into one file per chunk and kicks off @@ -41,16 +43,13 @@ def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None: Note: This function only runs once all output prediction files are written. Additionally, the climateiq_spatialize_chunk_predictions cloud function is only triggered once all prediction files per chunk are written since data - from neighboring chunks is required for spatializiation. + from neighboring chunks is required for spatialization. Args: - cloud_event: The CloudEvent representing the storage event. + object_name: The name of the blob written to the bucket. """ start = time.time() - data = cloud_event.data - object_name = data["name"] - # Extract components from the object name and determine the total number of # output prediction files. expected_format = ( @@ -99,6 +98,8 @@ def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None: # written yet. return + _write_structured_log(f"[{object_name}] Starting process.", "DEBUG") + # Split predictions into one file per chunk and output to GCS. output_filenames = [] write_futures = [] @@ -117,17 +118,17 @@ def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None: ) write_futures.append(future) - futures.wait(write_futures) - print( - json.dumps( - dict( - severity="DEBUG", - message=( - f"[{object_name}] Created {len(output_filenames)} files in " - f"{time.time() - start} s." - ), - ) - ) + futures.wait(write_futures, return_when=futures.FIRST_EXCEPTION) + # If any exceptions were raised in a thread, calling result() will raise it here. + for future in write_futures: + future.result() + + _write_structured_log( + ( + f"[{object_name}] Created {len(output_filenames)} files in " + f"{time.time() - start} s." + ), + "DEBUG", ) # Once all output files have been written, push tasks to Task Queue. @@ -160,16 +161,19 @@ def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None: ), ) ) - futures.wait(queue_futures) - - print( - json.dumps( - dict( - severity="DEBUG", - message=( - f"[{object_name}] Triggered export pipeline for " - f"{len(output_filenames)} chunks." - ), - ) - ) + futures.wait(queue_futures, return_when=futures.FIRST_EXCEPTION) + # If any exceptions were raised in a thread, calling result() will raise it here. + for future in write_futures: + future.result() + + _write_structured_log( + ( + f"[{object_name}] Triggered export pipeline for " + f"{len(output_filenames)} chunks." + ), + "DEBUG", ) + + +if __name__ == "__main__": + trigger_export_pipeline(sys.argv[1]) diff --git a/cloud_functions/climateiq_trigger_export_pipeline_cf/main_test.py b/cloud_functions/climateiq_trigger_export_pipeline_cf/main_test.py index 153dc3f..b9cca53 100644 --- a/cloud_functions/climateiq_trigger_export_pipeline_cf/main_test.py +++ b/cloud_functions/climateiq_trigger_export_pipeline_cf/main_test.py @@ -1,43 +1,20 @@ import json from unittest import mock -from cloudevents import http from google.cloud import storage, tasks_v2 from google.cloud.storage import blob, client as gcs_client import pytest import main -def _create_pubsub_event() -> http.CloudEvent: - attributes = { - "type": "google.cloud.storage.object.v1.finalized", - "source": "source", - } - data = { - "bucket": "climateiq-predictions", - "name": "id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5", - } - return http.CloudEvent(attributes, data) - - def test_trigger_export_pipeline_invalid_object_name(): - attributes = { - "type": "google.cloud.storage.object.v1.finalized", - "source": "source", - } - data = { - "bucket": "climateiq-predictions", - "name": "invalid_name", # Invalid object name - } - event = http.CloudEvent(attributes, data) - expected_error = ( "Invalid object name format. Expected format: '//" "///prediction.results-" "-of-{number_of_files_generated}'\nActual name: 'invalid_name'" ) with pytest.raises(ValueError, match=expected_error): - main.trigger_export_pipeline(event) + main.trigger_export_pipeline("invalid_name") @mock.patch.object(tasks_v2, "CloudTasksClient", autospec=True) @@ -45,8 +22,6 @@ def test_trigger_export_pipeline_invalid_object_name(): def test_trigger_export_pipeline_missing_prediction_files( mock_storage_client, mock_tasks_client ): - event = _create_pubsub_event() - # Missing predictions for chunks 2 and 4. input_blobs = [ storage.Blob( @@ -64,7 +39,9 @@ def test_trigger_export_pipeline_missing_prediction_files( ] mock_storage_client().list_blobs.return_value = input_blobs - main.trigger_export_pipeline(event) + main.trigger_export_pipeline( + "id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5" + ) mock_tasks_client().create_task.assert_not_called() @@ -72,8 +49,6 @@ def test_trigger_export_pipeline_missing_prediction_files( @mock.patch.object(tasks_v2, "CloudTasksClient", autospec=True) @mock.patch.object(gcs_client, "Client", autospec=True) def test_trigger_export_pipeline(mock_storage_client, mock_tasks_client): - event = _create_pubsub_event() - # Input blobs setup def create_mock_input_blob(name, start_chunk_id): chunk_id = (start_chunk_id - 1) * 2 + 1 @@ -108,7 +83,9 @@ def create_mock_input_blob(name, start_chunk_id): lambda name: mock_output_blobs.setdefault(name, mock.MagicMock()) ) - main.trigger_export_pipeline(event) + main.trigger_export_pipeline( + "id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5" + ) # Confirm output blobs written for i in range(1, 11):