Skip to content

Commit

Permalink
Change trigger CF to Cloud Run Job. (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
sylmak authored Aug 5, 2024
1 parent c438ddf commit 51a40c9
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 62 deletions.
34 changes: 34 additions & 0 deletions cloud_functions/climateiq_trigger_export_pipeline_cf/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2020 Google, LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Use the official Python image.
# https://hub.docker.com/_/python
FROM python:3.12-slim

# Allow statements and log messages to immediately appear in the Cloud Run logs
ENV PYTHONUNBUFFERED True

# Copy application dependency manifests to the container image.
# Copying this separately prevents re-running pip install on every code change.
COPY requirements.txt ./

# Install production dependencies.
RUN pip install -r requirements.txt

ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./
68 changes: 36 additions & 32 deletions cloud_functions/climateiq_trigger_export_pipeline_cf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
import pathlib
import json
import os
import sys
import time

from cloudevents import http
import functions_framework
from google.cloud import tasks_v2
from google.cloud.storage import client as gcs_client, retry

Expand All @@ -24,15 +23,18 @@
SPATIALIZE_CF_QUEUE = "spatialize-chunk-predictions-queue"


def _write_structured_log(message: str, severity: str = "INFO"):
print(json.dumps(dict(message=message, severity=severity)), flush=True)


def _write_file(line: str, output_filename: str, storage_client: gcs_client.Client):
output_blob = storage_client.bucket(OUTPUT_BUCKET_NAME).blob(output_filename)
# Specify retry here due to bug:
# https://github.com/googleapis/python-storage/issues/1242
output_blob.upload_from_string(line, retry=retry.DEFAULT_RETRY)


@functions_framework.cloud_event
def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None:
def trigger_export_pipeline(object_name: str) -> None:
"""Triggered by writes to the "climateiq-predictions" bucket.
Splits predictions into one file per chunk and kicks off
Expand All @@ -41,16 +43,13 @@ def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None:
Note: This function only runs once all output prediction files are written.
Additionally, the climateiq_spatialize_chunk_predictions cloud function is
only triggered once all prediction files per chunk are written since data
from neighboring chunks is required for spatializiation.
from neighboring chunks is required for spatialization.
Args:
cloud_event: The CloudEvent representing the storage event.
object_name: The name of the blob written to the bucket.
"""
start = time.time()

data = cloud_event.data
object_name = data["name"]

# Extract components from the object name and determine the total number of
# output prediction files.
expected_format = (
Expand Down Expand Up @@ -99,6 +98,8 @@ def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None:
# written yet.
return

_write_structured_log(f"[{object_name}] Starting process.", "DEBUG")

# Split predictions into one file per chunk and output to GCS.
output_filenames = []
write_futures = []
Expand All @@ -117,17 +118,17 @@ def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None:
)
write_futures.append(future)

futures.wait(write_futures)
print(
json.dumps(
dict(
severity="DEBUG",
message=(
f"[{object_name}] Created {len(output_filenames)} files in "
f"{time.time() - start} s."
),
)
)
futures.wait(write_futures, return_when=futures.FIRST_EXCEPTION)
# If any exceptions were raised in a thread, calling result() will raise it here.
for future in write_futures:
future.result()

_write_structured_log(
(
f"[{object_name}] Created {len(output_filenames)} files in "
f"{time.time() - start} s."
),
"DEBUG",
)

# Once all output files have been written, push tasks to Task Queue.
Expand Down Expand Up @@ -160,16 +161,19 @@ def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None:
),
)
)
futures.wait(queue_futures)

print(
json.dumps(
dict(
severity="DEBUG",
message=(
f"[{object_name}] Triggered export pipeline for "
f"{len(output_filenames)} chunks."
),
)
)
futures.wait(queue_futures, return_when=futures.FIRST_EXCEPTION)
# If any exceptions were raised in a thread, calling result() will raise it here.
for future in write_futures:
future.result()

_write_structured_log(
(
f"[{object_name}] Triggered export pipeline for "
f"{len(output_filenames)} chunks."
),
"DEBUG",
)


if __name__ == "__main__":
trigger_export_pipeline(sys.argv[1])
37 changes: 7 additions & 30 deletions cloud_functions/climateiq_trigger_export_pipeline_cf/main_test.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,27 @@
import json
from unittest import mock

from cloudevents import http
from google.cloud import storage, tasks_v2
from google.cloud.storage import blob, client as gcs_client
import pytest
import main


def _create_pubsub_event() -> http.CloudEvent:
attributes = {
"type": "google.cloud.storage.object.v1.finalized",
"source": "source",
}
data = {
"bucket": "climateiq-predictions",
"name": "id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5",
}
return http.CloudEvent(attributes, data)


def test_trigger_export_pipeline_invalid_object_name():
attributes = {
"type": "google.cloud.storage.object.v1.finalized",
"source": "source",
}
data = {
"bucket": "climateiq-predictions",
"name": "invalid_name", # Invalid object name
}
event = http.CloudEvent(attributes, data)

expected_error = (
"Invalid object name format. Expected format: '<id>/<prediction_type>/"
"<model_id>/<study_area_name>/<scenario_id>/prediction.results-"
"<file_number>-of-{number_of_files_generated}'\nActual name: 'invalid_name'"
)
with pytest.raises(ValueError, match=expected_error):
main.trigger_export_pipeline(event)
main.trigger_export_pipeline("invalid_name")


@mock.patch.object(tasks_v2, "CloudTasksClient", autospec=True)
@mock.patch.object(gcs_client, "Client", autospec=True)
def test_trigger_export_pipeline_missing_prediction_files(
mock_storage_client, mock_tasks_client
):
event = _create_pubsub_event()

# Missing predictions for chunks 2 and 4.
input_blobs = [
storage.Blob(
Expand All @@ -64,16 +39,16 @@ def test_trigger_export_pipeline_missing_prediction_files(
]
mock_storage_client().list_blobs.return_value = input_blobs

main.trigger_export_pipeline(event)
main.trigger_export_pipeline(
"id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5"
)

mock_tasks_client().create_task.assert_not_called()


@mock.patch.object(tasks_v2, "CloudTasksClient", autospec=True)
@mock.patch.object(gcs_client, "Client", autospec=True)
def test_trigger_export_pipeline(mock_storage_client, mock_tasks_client):
event = _create_pubsub_event()

# Input blobs setup
def create_mock_input_blob(name, start_chunk_id):
chunk_id = (start_chunk_id - 1) * 2 + 1
Expand Down Expand Up @@ -108,7 +83,9 @@ def create_mock_input_blob(name, start_chunk_id):
lambda name: mock_output_blobs.setdefault(name, mock.MagicMock())
)

main.trigger_export_pipeline(event)
main.trigger_export_pipeline(
"id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5"
)

# Confirm output blobs written
for i in range(1, 11):
Expand Down

0 comments on commit 51a40c9

Please sign in to comment.