From 2304c2fd104a7da106eeb746318132873ca6944c Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Mon, 20 Jan 2025 22:29:51 +0000 Subject: [PATCH] fix(repo): Use just one code location --- Containerfile | 5 +- infrastructure/docker-compose.yaml | 12 ++--- src/cloud_archives/__init__.py | 17 ------- src/dagster_dags/__init__.py | 4 ++ .../air/__init__.py | 0 .../air/cams_eu.py | 0 src/dagster_dags/definitions.py | 48 +++++++++++++++++++ .../nwp/__init__.py | 0 .../nwp/ceda_mo_um_global.py | 0 .../nwp/ecmwf_ens_stat_india.py | 0 .../nwp/ecmwf_hres_ifs_india.py | 0 .../nwp/ecmwf_hres_ifs_west_europe.py | 0 .../nwp/noaa-gfs-global.py | 0 .../pv/__init__.py | 0 .../pv/passiv/__init__.py | 0 .../pv/passiv/filenames.py | 0 .../pv/passiv/passiv_monthly.py | 0 .../pv/passiv/passiv_year.py | 0 .../pv/passiv/ss_rawdata_api.py | 0 .../sat/__init__.py | 0 .../sat/eumetsat_iodc_lrv.py | 0 src/local_archives/__init__.py | 28 ----------- tests/__init__.py | 1 + tests/cloud_archives/__init__.py | 0 tests/compile_test.py | 11 +++-- tests/{cloud_archives/pv => }/test_passiv.py | 2 +- 26 files changed, 66 insertions(+), 62 deletions(-) delete mode 100644 src/cloud_archives/__init__.py create mode 100644 src/dagster_dags/__init__.py rename src/{local_archives => dagster_dags}/air/__init__.py (100%) rename src/{local_archives => dagster_dags}/air/cams_eu.py (100%) create mode 100644 src/dagster_dags/definitions.py rename src/{local_archives => dagster_dags}/nwp/__init__.py (100%) rename src/{local_archives => dagster_dags}/nwp/ceda_mo_um_global.py (100%) rename src/{local_archives => dagster_dags}/nwp/ecmwf_ens_stat_india.py (100%) rename src/{local_archives => dagster_dags}/nwp/ecmwf_hres_ifs_india.py (100%) rename src/{local_archives => dagster_dags}/nwp/ecmwf_hres_ifs_west_europe.py (100%) rename src/{local_archives => dagster_dags}/nwp/noaa-gfs-global.py (100%) rename src/{cloud_archives => dagster_dags}/pv/__init__.py (100%) rename src/{cloud_archives => dagster_dags}/pv/passiv/__init__.py (100%) rename src/{cloud_archives => dagster_dags}/pv/passiv/filenames.py (100%) rename src/{cloud_archives => dagster_dags}/pv/passiv/passiv_monthly.py (100%) rename src/{cloud_archives => dagster_dags}/pv/passiv/passiv_year.py (100%) rename src/{cloud_archives => dagster_dags}/pv/passiv/ss_rawdata_api.py (100%) rename src/{local_archives => dagster_dags}/sat/__init__.py (100%) rename src/{local_archives => dagster_dags}/sat/eumetsat_iodc_lrv.py (100%) delete mode 100644 src/local_archives/__init__.py delete mode 100644 tests/cloud_archives/__init__.py rename tests/{cloud_archives/pv => }/test_passiv.py (84%) diff --git a/Containerfile b/Containerfile index b31263d..0c039ea 100644 --- a/Containerfile +++ b/Containerfile @@ -13,12 +13,11 @@ COPY pyproject.toml /opt/dagster/app # DagsterInstance. RUN uv sync -# Set the code location module to be loaded by the gRPC server -ENV MODULE_NAME=local_archives +EXPOSE 4000 # Using CMD rather than RUN allows the command to be overridden in # run launchers or executors to run other commands using this image. # This is important as runs are executed inside this container. ENTRYPOINT ["uv", "run"] -CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "local_archives"] +CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000", "-m", "dagster_dags"] diff --git a/infrastructure/docker-compose.yaml b/infrastructure/docker-compose.yaml index c4ad54e..f77a9fb 100644 --- a/infrastructure/docker-compose.yaml +++ b/infrastructure/docker-compose.yaml @@ -25,8 +25,6 @@ services: PGDATA: "/var/lib/postgresql/data" volumes: - dagster-pgdata-vol:/var/lib/postgresql/data - expose: - - "5432" # Publishes on network but not to host networks: ["dagster-network"] healthcheck: test: pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB @@ -37,16 +35,14 @@ services: # This service runs the gRPC server that loads user code, used by both dagster-webserver # and dagster-daemon. By setting DAGSTER_CURRENT_IMAGE to its own image, we tell the # run launcher to use this same image when launching runs in a new container as well. - dagster-codeserver_local-archives: - container_name: dagster-codeserver_local-archives + dagster-codeserver: + container_name: dagster-codeserver image: ghcr.io/openclimatefix/dagster-dags:devsjc-code-container restart: always environment: <<: *postgres-variables DAGSTER_CURRENT_IMAGE: "ghcr.io/openclimatefix/dagster-dags" DAGSTER_HOME: "/opt/dagster/home" - expose: - - "4000" configs: *dagster-configs networks: ["dagster-network"] @@ -111,9 +107,9 @@ configs: content: | load_from: - grpc_server: - host: "dagster-codeserver_local-archives" + host: "dagster-codeserver" port: 4000 - location_name: "local_archives" + location_name: "dagster_dags" dagster.yaml: content: | diff --git a/src/cloud_archives/__init__.py b/src/cloud_archives/__init__.py deleted file mode 100644 index 03d2700..0000000 --- a/src/cloud_archives/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -import os - -import dagster as dg - -from . import pv - -pv_assets = dg.load_assets_from_package_module( - package_module=pv, - group_name="pv", - key_prefix="pv", -) - -defs = dg.Definitions( - assets=[*pv_assets], - jobs=[], - schedules=[], -) diff --git a/src/dagster_dags/__init__.py b/src/dagster_dags/__init__.py new file mode 100644 index 0000000..3b9c2b5 --- /dev/null +++ b/src/dagster_dags/__init__.py @@ -0,0 +1,4 @@ +from .definitions import defs + +__all__ = ['defs'] + diff --git a/src/local_archives/air/__init__.py b/src/dagster_dags/air/__init__.py similarity index 100% rename from src/local_archives/air/__init__.py rename to src/dagster_dags/air/__init__.py diff --git a/src/local_archives/air/cams_eu.py b/src/dagster_dags/air/cams_eu.py similarity index 100% rename from src/local_archives/air/cams_eu.py rename to src/dagster_dags/air/cams_eu.py diff --git a/src/dagster_dags/definitions.py b/src/dagster_dags/definitions.py new file mode 100644 index 0000000..f146b6f --- /dev/null +++ b/src/dagster_dags/definitions.py @@ -0,0 +1,48 @@ +import dagster as dg +from dagster_docker import PipesDockerClient + +from . import nwp, pv, sat + + +class BaseStorageResource(dg.ConfigurableResource): + """A resource defining where to store external data. + + Consists of a location on disk accessible to the Dagster instance. + Where possible all data not handled by Dagster's IO managers (e.g. + data downloaded via dagster pipes docker instances) will use this + path as a base location. + """ + + location: str + +nwp_assets = dg.load_assets_from_package_module( + package_module=nwp, + group_name="nwp", + key_prefix="nwp", +) + +sat_assets = dg.load_assets_from_package_module( + package_module=sat, + group_name="sat", + key_prefix="sat", +) + +pv_assets = dg.load_assets_from_package_module( + package_module=pv, + group_name="pv", + key_prefix="pv", +) + +defs = dg.Definitions( + assets=[*nwp_assets, *sat_assets, *pv_assets], + resources={ + "pipes_subprocess_client": dg.PipesSubprocessClient(), + "pipes_docker_client": PipesDockerClient(), + "base_storage_resource": BaseStorageResource( + location=dg.EnvVar("BASE_STORAGE_PATH"), + ), + }, + jobs=[], + schedules=[], +) + diff --git a/src/local_archives/nwp/__init__.py b/src/dagster_dags/nwp/__init__.py similarity index 100% rename from src/local_archives/nwp/__init__.py rename to src/dagster_dags/nwp/__init__.py diff --git a/src/local_archives/nwp/ceda_mo_um_global.py b/src/dagster_dags/nwp/ceda_mo_um_global.py similarity index 100% rename from src/local_archives/nwp/ceda_mo_um_global.py rename to src/dagster_dags/nwp/ceda_mo_um_global.py diff --git a/src/local_archives/nwp/ecmwf_ens_stat_india.py b/src/dagster_dags/nwp/ecmwf_ens_stat_india.py similarity index 100% rename from src/local_archives/nwp/ecmwf_ens_stat_india.py rename to src/dagster_dags/nwp/ecmwf_ens_stat_india.py diff --git a/src/local_archives/nwp/ecmwf_hres_ifs_india.py b/src/dagster_dags/nwp/ecmwf_hres_ifs_india.py similarity index 100% rename from src/local_archives/nwp/ecmwf_hres_ifs_india.py rename to src/dagster_dags/nwp/ecmwf_hres_ifs_india.py diff --git a/src/local_archives/nwp/ecmwf_hres_ifs_west_europe.py b/src/dagster_dags/nwp/ecmwf_hres_ifs_west_europe.py similarity index 100% rename from src/local_archives/nwp/ecmwf_hres_ifs_west_europe.py rename to src/dagster_dags/nwp/ecmwf_hres_ifs_west_europe.py diff --git a/src/local_archives/nwp/noaa-gfs-global.py b/src/dagster_dags/nwp/noaa-gfs-global.py similarity index 100% rename from src/local_archives/nwp/noaa-gfs-global.py rename to src/dagster_dags/nwp/noaa-gfs-global.py diff --git a/src/cloud_archives/pv/__init__.py b/src/dagster_dags/pv/__init__.py similarity index 100% rename from src/cloud_archives/pv/__init__.py rename to src/dagster_dags/pv/__init__.py diff --git a/src/cloud_archives/pv/passiv/__init__.py b/src/dagster_dags/pv/passiv/__init__.py similarity index 100% rename from src/cloud_archives/pv/passiv/__init__.py rename to src/dagster_dags/pv/passiv/__init__.py diff --git a/src/cloud_archives/pv/passiv/filenames.py b/src/dagster_dags/pv/passiv/filenames.py similarity index 100% rename from src/cloud_archives/pv/passiv/filenames.py rename to src/dagster_dags/pv/passiv/filenames.py diff --git a/src/cloud_archives/pv/passiv/passiv_monthly.py b/src/dagster_dags/pv/passiv/passiv_monthly.py similarity index 100% rename from src/cloud_archives/pv/passiv/passiv_monthly.py rename to src/dagster_dags/pv/passiv/passiv_monthly.py diff --git a/src/cloud_archives/pv/passiv/passiv_year.py b/src/dagster_dags/pv/passiv/passiv_year.py similarity index 100% rename from src/cloud_archives/pv/passiv/passiv_year.py rename to src/dagster_dags/pv/passiv/passiv_year.py diff --git a/src/cloud_archives/pv/passiv/ss_rawdata_api.py b/src/dagster_dags/pv/passiv/ss_rawdata_api.py similarity index 100% rename from src/cloud_archives/pv/passiv/ss_rawdata_api.py rename to src/dagster_dags/pv/passiv/ss_rawdata_api.py diff --git a/src/local_archives/sat/__init__.py b/src/dagster_dags/sat/__init__.py similarity index 100% rename from src/local_archives/sat/__init__.py rename to src/dagster_dags/sat/__init__.py diff --git a/src/local_archives/sat/eumetsat_iodc_lrv.py b/src/dagster_dags/sat/eumetsat_iodc_lrv.py similarity index 100% rename from src/local_archives/sat/eumetsat_iodc_lrv.py rename to src/dagster_dags/sat/eumetsat_iodc_lrv.py diff --git a/src/local_archives/__init__.py b/src/local_archives/__init__.py deleted file mode 100644 index dd790fe..0000000 --- a/src/local_archives/__init__.py +++ /dev/null @@ -1,28 +0,0 @@ -import os - -import dagster as dg -from dagster_docker import PipesDockerClient - -from . import nwp, sat - -nwp_assets = dg.load_assets_from_package_module( - package_module=nwp, - group_name="nwp", - key_prefix="nwp", -) - -sat_assets = dg.load_assets_from_package_module( - package_module=sat, - group_name="sat", - key_prefix="sat", -) - -defs = dg.Definitions( - assets=[*nwp_assets, *sat_assets], - resources={ - "pipes_subprocess_client": dg.PipesSubprocessClient(), - "pipes_docker_client": PipesDockerClient(), - }, - jobs=[], - schedules=[], -) diff --git a/tests/__init__.py b/tests/__init__.py index e69de29..8b13789 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/tests/cloud_archives/__init__.py b/tests/cloud_archives/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/compile_test.py b/tests/compile_test.py index e0cee6c..2223517 100644 --- a/tests/compile_test.py +++ b/tests/compile_test.py @@ -2,14 +2,15 @@ import dagster as dg -from src.local_archives import nwp_assets, sat_assets +from src.dagster_dags import defs class TestAssetKeyPrefixes(unittest.TestCase): def test_nwp_asset_key_prefixes(self) -> None: """Test asset keys for all nwp assets have the correct key structure.""" - for asset in [*nwp_assets, *sat_assets]: - if isinstance(asset, dg.AssetsDefinition): - # Ensure that the prefix is one of the expected flavours - self.assertIn( asset.key.path[0], ["nwp", "sat", "air"]) + if defs.assets is not None: + for asset in defs.assets: + if isinstance(asset, dg.AssetsDefinition): + # Ensure that the prefix is one of the expected flavours + self.assertIn( asset.key.path[0], ["nwp", "sat", "air", "pv"]) diff --git a/tests/cloud_archives/pv/test_passiv.py b/tests/test_passiv.py similarity index 84% rename from tests/cloud_archives/pv/test_passiv.py rename to tests/test_passiv.py index 5cbe73f..f66b5cb 100644 --- a/tests/cloud_archives/pv/test_passiv.py +++ b/tests/test_passiv.py @@ -1,7 +1,7 @@ import unittest from datetime import UTC, datetime -from src.cloud_archives.pv.passiv.passiv_monthly import get_monthly_passiv_data +from src.dagster_dags.pv.passiv.passiv_monthly import get_monthly_passiv_data @unittest.skip("rawdata endpoint not on new URL")