From 723eec9a6edb346f0fc18248b6dbd75105069c80 Mon Sep 17 00:00:00 2001 From: Munthikodu Date: Thu, 17 Oct 2024 12:48:21 -0700 Subject: [PATCH] Added DAG to replicate BCTS LRM data --- dags/bcts_lrm_forest.py | 72 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 dags/bcts_lrm_forest.py diff --git a/dags/bcts_lrm_forest.py b/dags/bcts_lrm_forest.py new file mode 100644 index 0000000..dcbf7fb --- /dev/null +++ b/dags/bcts_lrm_forest.py @@ -0,0 +1,72 @@ +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret +from datetime import timedelta +import os + +LOB = 'lrm' +# For local development environment. +ENV = os.getenv("AIRFLOW_ENV") + +ods_secrets = Secret("env", None, f"{LOB}-ods-database") +lob_secrets = Secret("env", None, f"{LOB}-database") + +if ENV == 'LOCAL': + default_args = { + 'owner': 'PMT', + "email": ["NRM.DataFoundations@gov.bc.ca"], + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + "email_on_failure": False, # No alerts in local environment + "email_on_retry": False, + } +else: + default_args = { + 'owner': 'PMT', + "email": ["NRM.DataFoundations@gov.bc.ca"], + 'retries': 1, + 'retry_delay': timedelta(minutes=5), + "email_on_failure": True, + "email_on_retry": False, + } + +with DAG( + start_date=datetime(2023, 11, 23), + catchup=False, + schedule='0 12 * * *', + dag_id=f"replication-pipeline-{LOB}", + default_args=default_args, + description='DAG to replicate LRM data to ODS for BCTS Annual Developed Volume Dashboard', +) as dag: + + if ENV == 'LOCAL': + + run_replication = KubernetesPodOperator( + task_id="run_replication", + image="nrids-bcts-data-ora2pg:main", + name=f"run_{LOB}_replication", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=True, + secrets=[lob_secrets, ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) + else: + # In Dev, Test, and Prod Environments + run_replication = KubernetesPodOperator( + task_id="run_replication", + image="ghcr.io/bcgov/nr-dap-ods-ora2pg:main", + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"run_{LOB}_replication", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=True, + secrets=[lob_secrets, ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file