From fae9cfa7601a0dda4da949304493e7fc94e65577 Mon Sep 17 00:00:00 2001 From: Shreya Keshive Date: Wed, 26 Jun 2024 14:35:46 +0000 Subject: [PATCH] Formatting changes --- ... => climateiq_trigger_export_pipeline.yml} | 20 +- .../main.py | 27 +-- .../main_test.py | 34 ++-- .../main.py | 11 +- .../main_test.py | 175 ++++++++++++++++++ .../requirements.txt | 0 .../main_test.py | 161 ---------------- 7 files changed, 226 insertions(+), 202 deletions(-) rename .github/workflows/{trigger_climateiq_export_pipeline_cf.yml => climateiq_trigger_export_pipeline.yml} (61%) rename cloud_functions/{trigger_climateiq_export_pipeline_cf => climateiq_trigger_export_pipeline_cf}/main.py (91%) create mode 100644 cloud_functions/climateiq_trigger_export_pipeline_cf/main_test.py rename cloud_functions/{trigger_climateiq_export_pipeline_cf => climateiq_trigger_export_pipeline_cf}/requirements.txt (100%) delete mode 100644 cloud_functions/trigger_climateiq_export_pipeline_cf/main_test.py diff --git a/.github/workflows/trigger_climateiq_export_pipeline_cf.yml b/.github/workflows/climateiq_trigger_export_pipeline.yml similarity index 61% rename from .github/workflows/trigger_climateiq_export_pipeline_cf.yml rename to .github/workflows/climateiq_trigger_export_pipeline.yml index 7177cd7..70cb3ff 100644 --- a/.github/workflows/trigger_climateiq_export_pipeline_cf.yml +++ b/.github/workflows/climateiq_trigger_export_pipeline.yml @@ -1,24 +1,24 @@ # This workflow will install Python dependencies, run tests and lint with a single version of Python # For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python -name: trigger_climateiq_export_pipeline_cf +name: climateiq_trigger_export_pipeline_cf on: push: branches: [ "main" ] paths: - - "cloud_functions/trigger_climateiq_export_pipeline_cf/**" + - "cloud_functions/climateiq_trigger_export_pipeline_cf/**" pull_request: branches: [ "main" ] paths: - - "cloud_functions/trigger_climateiq_export_pipeline_cf/**" + - "cloud_functions/climateiq_trigger_export_pipeline_cf/**" permissions: contents: read jobs: - trigger_climateiq_export_pipeline_cf: - name: trigger_climateiq_export_pipeline CI + climateiq_trigger_export_pipeline_cf: + name: climateiq_trigger_export_pipeline CI runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 @@ -29,17 +29,17 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - cd cloud_functions/trigger_climateiq_export_pipeline_cf + cd cloud_functions/climateiq_trigger_export_pipeline_cf pip install -r requirements.txt - name: Lint with flake8 run: | - flake8 cloud_functions/trigger_climateiq_export_pipeline_cf --show-source --statistics + flake8 cloud_functions/climateiq_trigger_export_pipeline_cf --show-source --statistics - name: Ensure black auto-formatter has run run: | - black cloud_functions/trigger_climateiq_export_pipeline_cf --check + black cloud_functions/climateiq_trigger_export_pipeline_cf --check - name: Test with pytest run: | - pytest cloud_functions/trigger_climateiq_export_pipeline_cf + pytest cloud_functions/climateiq_trigger_export_pipeline_cf - name: MyPy Type Checking run: | - mypy cloud_functions/trigger_climateiq_export_pipeline_cf \ No newline at end of file + mypy cloud_functions/climateiq_trigger_export_pipeline_cf \ No newline at end of file diff --git a/cloud_functions/climateiq_spatialize_chunk_predictions_cf/main.py b/cloud_functions/climateiq_spatialize_chunk_predictions_cf/main.py index 6cf3bef..717920d 100644 --- a/cloud_functions/climateiq_spatialize_chunk_predictions_cf/main.py +++ b/cloud_functions/climateiq_spatialize_chunk_predictions_cf/main.py @@ -13,7 +13,7 @@ import pathlib import numpy as np -CLIMATEIQ_PREDICTIONS_BUCKET = "climateiq-predictions" +CLIMATEIQ_CHUNK_PREDICTIONS_BUCKET = "climateiq-chunk-predictions" GLOBAL_CRS = "EPSG:4326" # CAUTION: Changing the H3 cell size may require updates to how many/which neighboring # chunks we process. @@ -30,9 +30,7 @@ def subscribe(cloud_event: http.CloudEvent) -> None: CSV file containing H3 indexes along with associated predictions. Args: - cloud_event: The CloudEvent representing the Pub/Sub message. The name - of the object should conform to the following pattern: - "////" + cloud_event: The CloudEvent representing the Pub/Sub message. Raises: ValueError: If the object name format, study area metadata, chunk / neighbor @@ -44,10 +42,13 @@ def subscribe(cloud_event: http.CloudEvent) -> None: # Extract components from the object name. path = pathlib.PurePosixPath(object_name) - if len(path.parts) != 5: - raise ValueError("Invalid object name format. Expected 5 components.") + if len(path.parts) != 6: + raise ValueError( + "Invalid object name format. Expected format: '//" + "///'" + ) - prediction_type, model_id, study_area_name, scenario_id, chunk_id = path.parts + id, prediction_type, model_id, study_area_name, scenario_id, chunk_id = path.parts predictions = _read_chunk_predictions(object_name) study_area_metadata, chunks_ref = _get_study_area_metadata(study_area_name) @@ -86,14 +87,13 @@ def _read_chunk_predictions(object_name: str) -> np.ndarray: ValueError: If the predictions file format is invalid. """ storage_client = storage.Client() - bucket = storage_client.bucket(CLIMATEIQ_PREDICTIONS_BUCKET) + bucket = storage_client.bucket(CLIMATEIQ_CHUNK_PREDICTIONS_BUCKET) blob = bucket.blob(object_name) with blob.open() as fd: fd_iter = iter(fd) line = next(fd_iter, None) - # Vertex AI will output one predictions file per chunk so the file is - # expected to contain only one prediction. + # The file is expected to contain only one prediction. if line is None: raise ValueError(f"Predictions file: {object_name} is missing.") @@ -123,8 +123,11 @@ def _read_neighbor_chunk_predictions( ValueError: If the predictions file format is invalid. """ path = pathlib.PurePosixPath(object_name) - if len(path.parts) != 5: - raise ValueError("Invalid object name format. Expected 5 components.") + if len(path.parts) != 6: + raise ValueError( + "Invalid object name format. Expected format: '//" + "///" + ) *prefix, current_chunk_id = path.parts neighbor_object_name = pathlib.PurePosixPath(*prefix, neighbor_chunk_id) return _read_chunk_predictions(str(neighbor_object_name)) diff --git a/cloud_functions/climateiq_spatialize_chunk_predictions_cf/main_test.py b/cloud_functions/climateiq_spatialize_chunk_predictions_cf/main_test.py index a0d8208..3b6eabb 100644 --- a/cloud_functions/climateiq_spatialize_chunk_predictions_cf/main_test.py +++ b/cloud_functions/climateiq_spatialize_chunk_predictions_cf/main_test.py @@ -28,7 +28,11 @@ def test_spatialize_chunk_predictions_invalid_object_name() -> None: with pytest.raises(ValueError) as exc_info: main.subscribe(event) - assert "Invalid object name format. Expected 5 components." in str(exc_info.value) + assert ( + "Invalid object name format. Expected format: '//" + "///'" + in str(exc_info.value) + ) @mock.patch.object(storage, "Client", autospec=True) @@ -44,7 +48,7 @@ def test_spatialize_chunk_predictions_missing_study_area( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -79,7 +83,7 @@ def test_spatialize_chunk_predictions_invalid_study_area( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -133,7 +137,7 @@ def test_spatialize_chunk_predictions_missing_chunk( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -185,7 +189,7 @@ def test_spatialize_chunk_predictions_invalid_chunk( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -239,7 +243,7 @@ def test_spatialize_chunk_predictions_missing_predictions( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -276,7 +280,7 @@ def test_spatialize_chunk_predictions_missing_predictions( main.subscribe(event) assert ( - "Predictions file: prediction-type/model-id/study-area-name/scenario-id/" + "Predictions file: id/prediction-type/model-id/study-area-name/scenario-id/" "chunk-id is missing." in str(exc_info.value) ) @@ -294,7 +298,7 @@ def test_spatialize_chunk_predictions_too_many_predictions( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -349,7 +353,7 @@ def test_spatialize_chunk_predictions_missing_expected_neighbor_chunk( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -408,7 +412,7 @@ def test_spatialize_chunk_predictions_invalid_neighbor_chunk( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -474,7 +478,7 @@ def test_spatialize_chunk_predictions_neighbor_chunk_missing_predictions( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -522,7 +526,7 @@ def test_spatialize_chunk_predictions_neighbor_chunk_missing_predictions( main.subscribe(event) assert ( - "Predictions file: prediction-type/model-id/study-area-name/scenario-id/" + "Predictions file: id/prediction-type/model-id/study-area-name/scenario-id/" "neighbor-chunk-id is missing." ) in str(exc_info.value) @@ -540,7 +544,7 @@ def test_spatialize_chunk_predictions_h3_centroids_within_chunk( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -618,7 +622,7 @@ def test_spatialize_chunk_predictions_h3_centroids_outside_chunk( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, @@ -710,7 +714,7 @@ def test_spatialize_chunk_predictions_overlapping_neighbors( { "message": { "data": base64.b64encode( - b"prediction-type/model-id/study-area-name/scenario-id/chunk-id" + b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id" ), } }, diff --git a/cloud_functions/trigger_climateiq_export_pipeline_cf/main.py b/cloud_functions/climateiq_trigger_export_pipeline_cf/main.py similarity index 91% rename from cloud_functions/trigger_climateiq_export_pipeline_cf/main.py rename to cloud_functions/climateiq_trigger_export_pipeline_cf/main.py index 39a7ae5..6d82df0 100644 --- a/cloud_functions/trigger_climateiq_export_pipeline_cf/main.py +++ b/cloud_functions/climateiq_trigger_export_pipeline_cf/main.py @@ -11,7 +11,7 @@ @functions_framework.cloud_event -def trigger_climateiq_export_pipeline(cloud_event: http.CloudEvent) -> None: +def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None: """Triggered by writes to the "climateiq-predictions" bucket. Splits predictions into one file per chunk and kicks off @@ -21,7 +21,7 @@ def trigger_climateiq_export_pipeline(cloud_event: http.CloudEvent) -> None: Additionally, the climateiq_spatialize_chunk_predictions cloud function is only triggered once all prediction files per chunk are written since data from neighboring chunks is required for spatializiation. - + Args: cloud_event: The CloudEvent representing the storage event. @@ -36,7 +36,9 @@ def trigger_climateiq_export_pipeline(cloud_event: http.CloudEvent) -> None: path = pathlib.PurePosixPath(object_name) if len(path.parts) != 6: raise ValueError( - "Invalid object name format. Expected format: '/////prediction.results--of-{number_of_files_generated}'" + "Invalid object name format. Expected format: '//" + "///prediction.results-" + "-of-{number_of_files_generated}'" ) id, prediction_type, model_id, study_area_name, scenario_id, filename = path.parts _, _, _, file_count = filename.split("-") @@ -60,7 +62,8 @@ def trigger_climateiq_export_pipeline(cloud_event: http.CloudEvent) -> None: with blob.open() as fd: for line in fd: chunk_id = json.loads(line)["instance"]["key"] - output_filename = f"{id}/{prediction_type}/{model_id}/{study_area_name}/{scenario_id}/{chunk_id}" + output_filename = f"{id}/{prediction_type}/{model_id}/" + f"{study_area_name}/{scenario_id}/{chunk_id}" output_files.append(output_filename) output_blob = storage_client.bucket( CLIMATEIQ_CHUNK_PREDICTIONS_BUCKET diff --git a/cloud_functions/climateiq_trigger_export_pipeline_cf/main_test.py b/cloud_functions/climateiq_trigger_export_pipeline_cf/main_test.py new file mode 100644 index 0000000..d52b359 --- /dev/null +++ b/cloud_functions/climateiq_trigger_export_pipeline_cf/main_test.py @@ -0,0 +1,175 @@ +import main +import pytest +from google.cloud import storage, pubsub_v1 +from unittest.mock import patch, MagicMock, call +from cloudevents import http + + +def test_trigger_export_pipeline_invalid_object_name(): + attributes = { + "type": "google.cloud.storage.object.v1.finalized", + "source": "source", + } + data = { + "bucket": "climateiq-predictions", + "name": "invalid_name", # Invalid object name + } + event = http.CloudEvent(attributes, data) + + with pytest.raises(ValueError) as exc_info: + main.trigger_export_pipeline(event) + + assert ( + "Invalid object name format. Expected format: '//" + "///prediction.results-" + "-of-{number_of_files_generated}" in str(exc_info.value) + ) + + +@patch.object(pubsub_v1, "PublisherClient", autospec=True) +@patch.object(storage, "Client", autospec=True) +def test_trigger_export_pipeline_missing_prediction_files( + mock_storage_client, mock_publisher +): + attributes = { + "type": "google.cloud.storage.object.v1.finalized", + "source": "source", + } + data = { + "bucket": "climateiq-predictions", + "name": "id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5", + } + event = http.CloudEvent(attributes, data) + + input_blobs = [ + storage.Blob( + name="id1/flood/v1.0/manhattan/extreme/prediction.results-1-of-5", + bucket=storage.Bucket(mock_storage_client, "climateiq-predcitions"), + ), + storage.Blob( + name="id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5", + bucket=storage.Bucket(mock_storage_client, "climateiq-predcitions"), + ), + storage.Blob( + name="id1/flood/v1.0/manhattan/extreme/prediction.results-5-of-5", + bucket=storage.Bucket(mock_storage_client, "climateiq-predcitions"), + ), + ] + mock_storage_client().list_blobs.return_value = input_blobs + + main.trigger_export_pipeline(event) + + mock_publisher().topic_path.assert_not_called() + + +@patch.object(pubsub_v1, "PublisherClient", autospec=True) +@patch.object(storage, "Client", autospec=True) +def test_trigger_export_pipeline(mock_storage_client, mock_publisher): + attributes = { + "type": "google.cloud.storage.object.v1.finalized", + "source": "source", + } + data = { + "bucket": "climateiq-predictions", + "name": "id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5", + } + event = http.CloudEvent(attributes, data) + + # Create 5 mock blobs with predictions for 2 chunks each + def create_mock_blob(name, num): + chunk_id = (num - 1) * 2 + 1 + predictions = "\n".join( + [ + f'{{"instance": {{"values": [{i}], "key": {chunk_id + i}}},' + f'"prediction": [[1, 2, 3], [4, 5, 6]]}}' + for i in range(2) + ] + ) + mock_blob = MagicMock(spec=storage.Blob) + mock_blob.name = name + mock_file = MagicMock() + mock_file.__enter__.return_value = predictions.splitlines() + mock_blob.open.return_value = mock_file + return mock_blob + + input_blobs = [ + create_mock_blob( + f"id1/flood/v1.0/manhattan/extreme/prediction.results-{i}-of-5", i + ) + for i in range(1, 6) + ] + mock_storage_client.return_value.list_blobs.return_value = input_blobs + + mock_publisher().topic_path.return_value = ( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions" + ) + mock_future = MagicMock() + mock_future.result.return_value = "message_id" + mock_publisher().publish.return_value = mock_future + + main.trigger_export_pipeline(event) + + mock_publisher().publish.assert_has_calls( + [ + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/1", + origin="trigger_export_pipeline_cf", + ), + call().result(), + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/2", + origin="trigger_export_pipeline_cf", + ), + call().result(), + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/3", + origin="trigger_export_pipeline_cf", + ), + call().result(), + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/4", + origin="trigger_export_pipeline_cf", + ), + call().result(), + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/5", + origin="trigger_export_pipeline_cf", + ), + call().result(), + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/6", + origin="trigger_export_pipeline_cf", + ), + call().result(), + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/7", + origin="trigger_export_pipeline_cf", + ), + call().result(), + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/8", + origin="trigger_export_pipeline_cf", + ), + call().result(), + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/9", + origin="trigger_export_pipeline_cf", + ), + call().result(), + call( + "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", + data=b"id1/flood/v1.0/manhattan/extreme/10", + origin="trigger_export_pipeline_cf", + ), + call().result(), + ] + ) diff --git a/cloud_functions/trigger_climateiq_export_pipeline_cf/requirements.txt b/cloud_functions/climateiq_trigger_export_pipeline_cf/requirements.txt similarity index 100% rename from cloud_functions/trigger_climateiq_export_pipeline_cf/requirements.txt rename to cloud_functions/climateiq_trigger_export_pipeline_cf/requirements.txt diff --git a/cloud_functions/trigger_climateiq_export_pipeline_cf/main_test.py b/cloud_functions/trigger_climateiq_export_pipeline_cf/main_test.py deleted file mode 100644 index f145e3f..0000000 --- a/cloud_functions/trigger_climateiq_export_pipeline_cf/main_test.py +++ /dev/null @@ -1,161 +0,0 @@ -import main -import pytest -from google.cloud import storage, pubsub_v1 -from unittest.mock import patch, MagicMock, call -from cloudevents import http - -def test_trigger_climateiq_export_pipeline_invalid_object_name(): - attributes = { - "type": "google.cloud.storage.object.v1.finalized", - "source": "source", - } - data = { - "bucket": "climateiq-predictions", - "name": "invalid_name", # Invalid object name - } - event = http.CloudEvent(attributes, data) - - with pytest.raises(ValueError) as exc_info: - main.trigger_climateiq_export_pipeline(event) - - assert "Invalid object name format. Expected format: '/////prediction.results--of-{number_of_files_generated}" in str(exc_info.value) - -@patch.object(pubsub_v1, "PublisherClient", autospec=True) -@patch.object(storage, "Client", autospec=True) -def test_trigger_climateiq_export_pipeline_missing_prediction_files(mock_storage_client, mock_publisher): - attributes = { - "type": "google.cloud.storage.object.v1.finalized", - "source": "source", - } - data = { - "bucket": "climateiq-predictions", - "name": "id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5", - } - event = http.CloudEvent(attributes, data) - - input_blobs = [ - storage.Blob( - name="id1/flood/v1.0/manhattan/extreme/prediction.results-1-of-5", - bucket=storage.Bucket(mock_storage_client, "climateiq-predcitions"), - ), - storage.Blob( - name="id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5", - bucket=storage.Bucket(mock_storage_client, "climateiq-predcitions"), - ), - storage.Blob( - name="id1/flood/v1.0/manhattan/extreme/prediction.results-5-of-5", - bucket=storage.Bucket(mock_storage_client, "climateiq-predcitions"), - ), - ] - mock_storage_client().list_blobs.return_value = input_blobs - - main.trigger_climateiq_export_pipeline(event) - - mock_publisher().topic_path.assert_not_called() - - -@patch.object(pubsub_v1, "PublisherClient", autospec=True) -@patch.object(storage, "Client", autospec=True) -def test_trigger_climateiq_export_pipeline(mock_storage_client, mock_publisher): - attributes = { - "type": "google.cloud.storage.object.v1.finalized", - "source": "source", - } - data = { - "bucket": "climateiq-predictions", - "name": "id1/flood/v1.0/manhattan/extreme/prediction.results-3-of-5", - } - event = http.CloudEvent(attributes, data) - - # Create 5 mock blobs with predictions for 2 chunks each - def create_mock_blob(name, num): - chunk_id = (num - 1) * 2 + 1 - predictions = "\n".join([ - f'{{"instance": {{"values": [{i}], "key": {chunk_id + i}}}, "prediction": [[1, 2, 3], [4, 5, 6]]}}' - for i in range(2) - ]) - mock_blob = MagicMock(spec=storage.Blob) - mock_blob.name = name - mock_file = MagicMock() - mock_file.__enter__.return_value = predictions.splitlines() - mock_blob.open.return_value = mock_file - return mock_blob - input_blobs = [ - create_mock_blob(f"id1/flood/v1.0/manhattan/extreme/prediction.results-{i}-of-5", i) - for i in range(1, 6) - ] - mock_storage_client.return_value.list_blobs.return_value = input_blobs - - - mock_publisher().topic_path.return_value = ( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions" - ) - mock_future = MagicMock() - mock_future.result.return_value = "message_id" - mock_publisher().publish.return_value = mock_future - - main.trigger_climateiq_export_pipeline(event) - - mock_publisher().publish.assert_has_calls( - [ - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/1", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/2", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/3", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/4", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/5", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/6", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/7", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/8", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/9", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - call( - "projects/climateiq/topics/climateiq-spatialize-and-export-predictions", - data=b"id1/flood/v1.0/manhattan/extreme/10", - origin="trigger_climateiq_export_pipeline_cf", - ), - call().result(), - ]) \ No newline at end of file