From 24d5d781d758a08ce007777f030b25714497640b Mon Sep 17 00:00:00 2001 From: Munthikodu Date: Thu, 31 Oct 2024 14:13:01 -0700 Subject: [PATCH] Added DAGs for grant management of BCTS data in ODS --- dags/bcts_apply_grants.py | 77 ++++++++++++++++++ dags/bcts_export_grants_master_file.py | 77 ++++++++++++++++++ ...annual_developed_volume_transformations.py | 80 +++++++++++++++++++ ...rest.py => bcts_lrm_forest_replication.py} | 8 +- 4 files changed, 240 insertions(+), 2 deletions(-) create mode 100644 dags/bcts_apply_grants.py create mode 100644 dags/bcts_export_grants_master_file.py create mode 100644 dags/bcts_lrm_annual_developed_volume_transformations.py rename dags/{bcts_lrm_forest.py => bcts_lrm_forest_replication.py} (87%) diff --git a/dags/bcts_apply_grants.py b/dags/bcts_apply_grants.py new file mode 100644 index 0000000..698f2a5 --- /dev/null +++ b/dags/bcts_apply_grants.py @@ -0,0 +1,77 @@ +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret +from datetime import timedelta +import os + +LOB = 'lrm' +# For local development environment only. +ENV = os.getenv("AIRFLOW_ENV") + +ods_secrets = Secret("env", None, f"{LOB}-ods-database") + +if ENV == 'LOCAL': + default_args = { + 'owner': 'BCTS', + "email": ["sreejith.munthikodu@gov.bc.ca"], + 'retries': 2, + 'retry_delay': timedelta(minutes=5), + "email_on_failure": False, # No alerts in local environment + "email_on_retry": False, + } +else: + default_args = { + 'owner': 'BCTS', + "email": ["sreejith.munthikodu@gov.bc.ca"], + 'retries': 2, + 'retry_delay': timedelta(minutes=5), + "email_on_failure": True, + "email_on_retry": False, + } + +with DAG( + start_date=datetime(2024, 11, 23), + catchup=False, + schedule='0 5 * * MON-FRI', + dag_id=f"apply-grants-{LOB}", + default_args=default_args, + description='DAG to apply grants to BCTS data in ODS', +) as dag: + + if ENV == 'LOCAL': + + run_replication = KubernetesPodOperator( + task_id=f"apply_{LOB}_grants", + image="nrids-bcts-data-pg-access:main", + cmds=["python3", "./bcts_acces_apply_grants.py"], + # Following configs are different in the local development environment + # image_pull_policy="Always", + # in_cluster=True, + # service_account_name="airflow-admin", + name=f"apply_{LOB}_access_grants", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=True, + secrets=[ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) + else: + # In Dev, Test, and Prod Environments + run_replication = KubernetesPodOperator( + task_id=f"export_{LOB}_grants", + image="ghcr.io/bcgov/nr-dap-ods-bcts-pg-access:main", + cmds=["python3", "./bcts_acces_apply_grants.py"], + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"apply_{LOB}_access_grants", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=True, + secrets=[ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file diff --git a/dags/bcts_export_grants_master_file.py b/dags/bcts_export_grants_master_file.py new file mode 100644 index 0000000..6b58482 --- /dev/null +++ b/dags/bcts_export_grants_master_file.py @@ -0,0 +1,77 @@ +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret +from datetime import timedelta +import os + +LOB = 'lrm' +# For local development environment only. +ENV = os.getenv("AIRFLOW_ENV") + +ods_secrets = Secret("env", None, f"{LOB}-ods-database") + +if ENV == 'LOCAL': + default_args = { + 'owner': 'BCTS', + "email": ["sreejith.munthikodu@gov.bc.ca"], + 'retries': 2, + 'retry_delay': timedelta(minutes=5), + "email_on_failure": False, # No alerts in local environment + "email_on_retry": False, + } +else: + default_args = { + 'owner': 'BCTS', + "email": ["sreejith.munthikodu@gov.bc.ca"], + 'retries': 2, + 'retry_delay': timedelta(minutes=5), + "email_on_failure": True, + "email_on_retry": False, + } + +with DAG( + start_date=datetime(2024, 11, 23), + catchup=False, + schedule=None, + dag_id=f"export-grants-{LOB}", + default_args=default_args, + description='DAG to export the grants master file to ODS', +) as dag: + + if ENV == 'LOCAL': + + run_replication = KubernetesPodOperator( + task_id=f"export_{LOB}_grants", + image="nrids-bcts-data-pg-access:main", + cmds=["python3", "./bcts_access_export_master_file.py"], + # Following configs are different in the local development environment + # image_pull_policy="Always", + # in_cluster=True, + # service_account_name="airflow-admin", + name=f"export_{LOB}_access_grants", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=True, + secrets=[ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) + else: + # In Dev, Test, and Prod Environments + run_replication = KubernetesPodOperator( + task_id=f"export_{LOB}_grants", + image="ghcr.io/bcgov/nr-dap-ods-bcts-pg-access:main", + cmds=["python3", "./bcts_access_export_master_file.py"], + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"export_{LOB}_access_grants", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=True, + secrets=[ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file diff --git a/dags/bcts_lrm_annual_developed_volume_transformations.py b/dags/bcts_lrm_annual_developed_volume_transformations.py new file mode 100644 index 0000000..70687f1 --- /dev/null +++ b/dags/bcts_lrm_annual_developed_volume_transformations.py @@ -0,0 +1,80 @@ +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret +from datetime import timedelta +import os + +LOB = 'lrm' +sql_file_path = './Annual_Developed_Volume_Query.sql' +# For local development environment only. +ENV = os.getenv("AIRFLOW_ENV") + +ods_secrets = Secret("env", None, f"{LOB}-ods-database") + +if ENV == 'LOCAL': + default_args = { + 'owner': 'BCTS', + "email": ["sreejith.munthikodu@gov.bc.ca"], + 'retries': 2, + 'retry_delay': timedelta(minutes=5), + "email_on_failure": False, # No alerts in local environment + "email_on_retry": False, + } +else: + default_args = { + 'owner': 'BCTS', + "email": ["sreejith.munthikodu@gov.bc.ca"], + 'retries': 2, + 'retry_delay': timedelta(minutes=5), + "email_on_failure": True, + "email_on_retry": False, + } + +with DAG( + start_date=datetime(2024, 11, 23), + catchup=False, + schedule='0 5 * * MON-FRI', + dag_id=f"transformations-{LOB}", + default_args=default_args, + description='DAG to run the transformations in ODS for BCTS Annual Developed Volume Dashboard', +) as dag: + + if ENV == 'LOCAL': + + run_replication = KubernetesPodOperator( + task_id="run_transformation", + image="nrids-bcts-data-pg-transformations:main", + cmds=["python3", "./bcts_etl.py"], + arguments=[sql_file_path], + # Following configs are different in the local development environment + # image_pull_policy="Always", + # in_cluster=True, + # service_account_name="airflow-admin", + name=f"run_{LOB}_transformation_annual_developed_volume", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=True, + secrets=[ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) + else: + # In Dev, Test, and Prod Environments + run_replication = KubernetesPodOperator( + task_id="run_replication", + image="ghcr.io/bcgov/nr-dap-ods-pg-transformations:main", + cmds=["python3", "./bcts_etl.py"], + arguments=[sql_file_path], + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"run_{LOB}_transformation_annual_developed_volume", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=True, + secrets=[ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file diff --git a/dags/bcts_lrm_forest.py b/dags/bcts_lrm_forest_replication.py similarity index 87% rename from dags/bcts_lrm_forest.py rename to dags/bcts_lrm_forest_replication.py index 250b09f..bd175e5 100644 --- a/dags/bcts_lrm_forest.py +++ b/dags/bcts_lrm_forest_replication.py @@ -7,7 +7,7 @@ import os LOB = 'lrm' -# For local development environment. +# For local development environment only. ENV = os.getenv("AIRFLOW_ENV") ods_secrets = Secret("env", None, f"{LOB}-ods-database") @@ -33,7 +33,7 @@ } with DAG( - start_date=datetime(2024, 11, 22), + start_date=datetime(2024, 11, 23), catchup=False, schedule='0 4 * * MON-FRI', dag_id=f"replication-pipeline-{LOB}", @@ -46,6 +46,10 @@ run_replication = KubernetesPodOperator( task_id="run_replication", image="nrids-bcts-data-ora2pg:main", + # Following configs are different in the local development environment + # image_pull_policy="Always", + # in_cluster=True, + # service_account_name="airflow-admin", name=f"run_{LOB}_replication", labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, is_delete_operator_pod=True,