Skip to content

Commit

Permalink
Formatting changes
Browse files Browse the repository at this point in the history
  • Loading branch information
skeshive committed Jun 26, 2024
1 parent cbf2cd7 commit fae9cfa
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 202 deletions.
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: trigger_climateiq_export_pipeline_cf
name: climateiq_trigger_export_pipeline_cf

on:
push:
branches: [ "main" ]
paths:
- "cloud_functions/trigger_climateiq_export_pipeline_cf/**"
- "cloud_functions/climateiq_trigger_export_pipeline_cf/**"
pull_request:
branches: [ "main" ]
paths:
- "cloud_functions/trigger_climateiq_export_pipeline_cf/**"
- "cloud_functions/climateiq_trigger_export_pipeline_cf/**"

permissions:
contents: read

jobs:
trigger_climateiq_export_pipeline_cf:
name: trigger_climateiq_export_pipeline CI
climateiq_trigger_export_pipeline_cf:
name: climateiq_trigger_export_pipeline CI
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand All @@ -29,17 +29,17 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
cd cloud_functions/trigger_climateiq_export_pipeline_cf
cd cloud_functions/climateiq_trigger_export_pipeline_cf
pip install -r requirements.txt
- name: Lint with flake8
run: |
flake8 cloud_functions/trigger_climateiq_export_pipeline_cf --show-source --statistics
flake8 cloud_functions/climateiq_trigger_export_pipeline_cf --show-source --statistics
- name: Ensure black auto-formatter has run
run: |
black cloud_functions/trigger_climateiq_export_pipeline_cf --check
black cloud_functions/climateiq_trigger_export_pipeline_cf --check
- name: Test with pytest
run: |
pytest cloud_functions/trigger_climateiq_export_pipeline_cf
pytest cloud_functions/climateiq_trigger_export_pipeline_cf
- name: MyPy Type Checking
run: |
mypy cloud_functions/trigger_climateiq_export_pipeline_cf
mypy cloud_functions/climateiq_trigger_export_pipeline_cf
27 changes: 15 additions & 12 deletions cloud_functions/climateiq_spatialize_chunk_predictions_cf/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import pathlib
import numpy as np

CLIMATEIQ_PREDICTIONS_BUCKET = "climateiq-predictions"
CLIMATEIQ_CHUNK_PREDICTIONS_BUCKET = "climateiq-chunk-predictions"
GLOBAL_CRS = "EPSG:4326"
# CAUTION: Changing the H3 cell size may require updates to how many/which neighboring
# chunks we process.
Expand All @@ -30,9 +30,7 @@ def subscribe(cloud_event: http.CloudEvent) -> None:
CSV file containing H3 indexes along with associated predictions.
Args:
cloud_event: The CloudEvent representing the Pub/Sub message. The name
of the object should conform to the following pattern:
"<prediction_type>/<model_id>/<study_area_name>/<scenario_id>/<chunk_id>"
cloud_event: The CloudEvent representing the Pub/Sub message.
Raises:
ValueError: If the object name format, study area metadata, chunk / neighbor
Expand All @@ -44,10 +42,13 @@ def subscribe(cloud_event: http.CloudEvent) -> None:

# Extract components from the object name.
path = pathlib.PurePosixPath(object_name)
if len(path.parts) != 5:
raise ValueError("Invalid object name format. Expected 5 components.")
if len(path.parts) != 6:
raise ValueError(
"Invalid object name format. Expected format: '<id>/<prediction_type>/"
"<model_id>/<study_area_name>/<scenario_id>/<chunk_id>'"
)

prediction_type, model_id, study_area_name, scenario_id, chunk_id = path.parts
id, prediction_type, model_id, study_area_name, scenario_id, chunk_id = path.parts

predictions = _read_chunk_predictions(object_name)
study_area_metadata, chunks_ref = _get_study_area_metadata(study_area_name)
Expand Down Expand Up @@ -86,14 +87,13 @@ def _read_chunk_predictions(object_name: str) -> np.ndarray:
ValueError: If the predictions file format is invalid.
"""
storage_client = storage.Client()
bucket = storage_client.bucket(CLIMATEIQ_PREDICTIONS_BUCKET)
bucket = storage_client.bucket(CLIMATEIQ_CHUNK_PREDICTIONS_BUCKET)
blob = bucket.blob(object_name)

with blob.open() as fd:
fd_iter = iter(fd)
line = next(fd_iter, None)
# Vertex AI will output one predictions file per chunk so the file is
# expected to contain only one prediction.
# The file is expected to contain only one prediction.
if line is None:
raise ValueError(f"Predictions file: {object_name} is missing.")

Expand Down Expand Up @@ -123,8 +123,11 @@ def _read_neighbor_chunk_predictions(
ValueError: If the predictions file format is invalid.
"""
path = pathlib.PurePosixPath(object_name)
if len(path.parts) != 5:
raise ValueError("Invalid object name format. Expected 5 components.")
if len(path.parts) != 6:
raise ValueError(
"Invalid object name format. Expected format: '<id>/<prediction_type>/"
"<model_id>/<study_area_name>/<scenario_id>/<chunk_id>"
)
*prefix, current_chunk_id = path.parts
neighbor_object_name = pathlib.PurePosixPath(*prefix, neighbor_chunk_id)
return _read_chunk_predictions(str(neighbor_object_name))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@ def test_spatialize_chunk_predictions_invalid_object_name() -> None:
with pytest.raises(ValueError) as exc_info:
main.subscribe(event)

assert "Invalid object name format. Expected 5 components." in str(exc_info.value)
assert (
"Invalid object name format. Expected format: '<id>/<prediction_type>/"
"<model_id>/<study_area_name>/<scenario_id>/<chunk_id>'"
in str(exc_info.value)
)


@mock.patch.object(storage, "Client", autospec=True)
Expand All @@ -44,7 +48,7 @@ def test_spatialize_chunk_predictions_missing_study_area(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -79,7 +83,7 @@ def test_spatialize_chunk_predictions_invalid_study_area(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -133,7 +137,7 @@ def test_spatialize_chunk_predictions_missing_chunk(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -185,7 +189,7 @@ def test_spatialize_chunk_predictions_invalid_chunk(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -239,7 +243,7 @@ def test_spatialize_chunk_predictions_missing_predictions(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -276,7 +280,7 @@ def test_spatialize_chunk_predictions_missing_predictions(
main.subscribe(event)

assert (
"Predictions file: prediction-type/model-id/study-area-name/scenario-id/"
"Predictions file: id/prediction-type/model-id/study-area-name/scenario-id/"
"chunk-id is missing." in str(exc_info.value)
)

Expand All @@ -294,7 +298,7 @@ def test_spatialize_chunk_predictions_too_many_predictions(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -349,7 +353,7 @@ def test_spatialize_chunk_predictions_missing_expected_neighbor_chunk(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -408,7 +412,7 @@ def test_spatialize_chunk_predictions_invalid_neighbor_chunk(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -474,7 +478,7 @@ def test_spatialize_chunk_predictions_neighbor_chunk_missing_predictions(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -522,7 +526,7 @@ def test_spatialize_chunk_predictions_neighbor_chunk_missing_predictions(
main.subscribe(event)

assert (
"Predictions file: prediction-type/model-id/study-area-name/scenario-id/"
"Predictions file: id/prediction-type/model-id/study-area-name/scenario-id/"
"neighbor-chunk-id is missing."
) in str(exc_info.value)

Expand All @@ -540,7 +544,7 @@ def test_spatialize_chunk_predictions_h3_centroids_within_chunk(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -618,7 +622,7 @@ def test_spatialize_chunk_predictions_h3_centroids_outside_chunk(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down Expand Up @@ -710,7 +714,7 @@ def test_spatialize_chunk_predictions_overlapping_neighbors(
{
"message": {
"data": base64.b64encode(
b"prediction-type/model-id/study-area-name/scenario-id/chunk-id"
b"id/prediction-type/model-id/study-area-name/scenario-id/chunk-id"
),
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@


@functions_framework.cloud_event
def trigger_climateiq_export_pipeline(cloud_event: http.CloudEvent) -> None:
def trigger_export_pipeline(cloud_event: http.CloudEvent) -> None:
"""Triggered by writes to the "climateiq-predictions" bucket.
Splits predictions into one file per chunk and kicks off
Expand All @@ -21,7 +21,7 @@ def trigger_climateiq_export_pipeline(cloud_event: http.CloudEvent) -> None:
Additionally, the climateiq_spatialize_chunk_predictions cloud function is
only triggered once all prediction files per chunk are written since data
from neighboring chunks is required for spatializiation.
Args:
cloud_event: The CloudEvent representing the storage event.
Expand All @@ -36,7 +36,9 @@ def trigger_climateiq_export_pipeline(cloud_event: http.CloudEvent) -> None:
path = pathlib.PurePosixPath(object_name)
if len(path.parts) != 6:
raise ValueError(
"Invalid object name format. Expected format: '<id>/<prediction_type>/<model_id>/<study_area_name>/<scenario_id>/prediction.results-<file_number>-of-{number_of_files_generated}'"
"Invalid object name format. Expected format: '<id>/<prediction_type>/"
"<model_id>/<study_area_name>/<scenario_id>/prediction.results-"
"<file_number>-of-{number_of_files_generated}'"
)
id, prediction_type, model_id, study_area_name, scenario_id, filename = path.parts
_, _, _, file_count = filename.split("-")
Expand All @@ -60,7 +62,8 @@ def trigger_climateiq_export_pipeline(cloud_event: http.CloudEvent) -> None:
with blob.open() as fd:
for line in fd:
chunk_id = json.loads(line)["instance"]["key"]
output_filename = f"{id}/{prediction_type}/{model_id}/{study_area_name}/{scenario_id}/{chunk_id}"
output_filename = f"{id}/{prediction_type}/{model_id}/"
f"{study_area_name}/{scenario_id}/{chunk_id}"
output_files.append(output_filename)
output_blob = storage_client.bucket(
CLIMATEIQ_CHUNK_PREDICTIONS_BUCKET
Expand Down
Loading

0 comments on commit fae9cfa

Please sign in to comment.