Skip to content

Commit

Permalink
Added DAGs for grant management of BCTS data in ODS
Browse files Browse the repository at this point in the history
  • Loading branch information
smunthik committed Oct 31, 2024
1 parent 495b228 commit 24d5d78
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 2 deletions.
77 changes: 77 additions & 0 deletions dags/bcts_apply_grants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from airflow import DAG
from pendulum import datetime
from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from datetime import timedelta
import os

LOB = 'lrm'
# For local development environment only.
ENV = os.getenv("AIRFLOW_ENV")

ods_secrets = Secret("env", None, f"{LOB}-ods-database")

if ENV == 'LOCAL':
default_args = {
'owner': 'BCTS',
"email": ["[email protected]"],
'retries': 2,
'retry_delay': timedelta(minutes=5),
"email_on_failure": False, # No alerts in local environment
"email_on_retry": False,
}
else:
default_args = {
'owner': 'BCTS',
"email": ["[email protected]"],
'retries': 2,
'retry_delay': timedelta(minutes=5),
"email_on_failure": True,
"email_on_retry": False,
}

with DAG(
start_date=datetime(2024, 11, 23),
catchup=False,
schedule='0 5 * * MON-FRI',
dag_id=f"apply-grants-{LOB}",
default_args=default_args,
description='DAG to apply grants to BCTS data in ODS',
) as dag:

if ENV == 'LOCAL':

run_replication = KubernetesPodOperator(
task_id=f"apply_{LOB}_grants",
image="nrids-bcts-data-pg-access:main",
cmds=["python3", "./bcts_acces_apply_grants.py"],
# Following configs are different in the local development environment
# image_pull_policy="Always",
# in_cluster=True,
# service_account_name="airflow-admin",
name=f"apply_{LOB}_access_grants",
labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"},
is_delete_operator_pod=True,
secrets=[ods_secrets],
container_resources= client.V1ResourceRequirements(
requests={"cpu": "50m", "memory": "512Mi"},
limits={"cpu": "100m", "memory": "1024Mi"})
)
else:
# In Dev, Test, and Prod Environments
run_replication = KubernetesPodOperator(
task_id=f"export_{LOB}_grants",
image="ghcr.io/bcgov/nr-dap-ods-bcts-pg-access:main",
cmds=["python3", "./bcts_acces_apply_grants.py"],
image_pull_policy="Always",
in_cluster=True,
service_account_name="airflow-admin",
name=f"apply_{LOB}_access_grants",
labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"},
is_delete_operator_pod=True,
secrets=[ods_secrets],
container_resources= client.V1ResourceRequirements(
requests={"cpu": "50m", "memory": "512Mi"},
limits={"cpu": "100m", "memory": "1024Mi"})
)
77 changes: 77 additions & 0 deletions dags/bcts_export_grants_master_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
from airflow import DAG
from pendulum import datetime
from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from datetime import timedelta
import os

LOB = 'lrm'
# For local development environment only.
ENV = os.getenv("AIRFLOW_ENV")

ods_secrets = Secret("env", None, f"{LOB}-ods-database")

if ENV == 'LOCAL':
default_args = {
'owner': 'BCTS',
"email": ["[email protected]"],
'retries': 2,
'retry_delay': timedelta(minutes=5),
"email_on_failure": False, # No alerts in local environment
"email_on_retry": False,
}
else:
default_args = {
'owner': 'BCTS',
"email": ["[email protected]"],
'retries': 2,
'retry_delay': timedelta(minutes=5),
"email_on_failure": True,
"email_on_retry": False,
}

with DAG(
start_date=datetime(2024, 11, 23),
catchup=False,
schedule=None,
dag_id=f"export-grants-{LOB}",
default_args=default_args,
description='DAG to export the grants master file to ODS',
) as dag:

if ENV == 'LOCAL':

run_replication = KubernetesPodOperator(
task_id=f"export_{LOB}_grants",
image="nrids-bcts-data-pg-access:main",
cmds=["python3", "./bcts_access_export_master_file.py"],
# Following configs are different in the local development environment
# image_pull_policy="Always",
# in_cluster=True,
# service_account_name="airflow-admin",
name=f"export_{LOB}_access_grants",
labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"},
is_delete_operator_pod=True,
secrets=[ods_secrets],
container_resources= client.V1ResourceRequirements(
requests={"cpu": "50m", "memory": "512Mi"},
limits={"cpu": "100m", "memory": "1024Mi"})
)
else:
# In Dev, Test, and Prod Environments
run_replication = KubernetesPodOperator(
task_id=f"export_{LOB}_grants",
image="ghcr.io/bcgov/nr-dap-ods-bcts-pg-access:main",
cmds=["python3", "./bcts_access_export_master_file.py"],
image_pull_policy="Always",
in_cluster=True,
service_account_name="airflow-admin",
name=f"export_{LOB}_access_grants",
labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"},
is_delete_operator_pod=True,
secrets=[ods_secrets],
container_resources= client.V1ResourceRequirements(
requests={"cpu": "50m", "memory": "512Mi"},
limits={"cpu": "100m", "memory": "1024Mi"})
)
80 changes: 80 additions & 0 deletions dags/bcts_lrm_annual_developed_volume_transformations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from airflow import DAG
from pendulum import datetime
from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from datetime import timedelta
import os

LOB = 'lrm'
sql_file_path = './Annual_Developed_Volume_Query.sql'
# For local development environment only.
ENV = os.getenv("AIRFLOW_ENV")

ods_secrets = Secret("env", None, f"{LOB}-ods-database")

if ENV == 'LOCAL':
default_args = {
'owner': 'BCTS',
"email": ["[email protected]"],
'retries': 2,
'retry_delay': timedelta(minutes=5),
"email_on_failure": False, # No alerts in local environment
"email_on_retry": False,
}
else:
default_args = {
'owner': 'BCTS',
"email": ["[email protected]"],
'retries': 2,
'retry_delay': timedelta(minutes=5),
"email_on_failure": True,
"email_on_retry": False,
}

with DAG(
start_date=datetime(2024, 11, 23),
catchup=False,
schedule='0 5 * * MON-FRI',
dag_id=f"transformations-{LOB}",
default_args=default_args,
description='DAG to run the transformations in ODS for BCTS Annual Developed Volume Dashboard',
) as dag:

if ENV == 'LOCAL':

run_replication = KubernetesPodOperator(
task_id="run_transformation",
image="nrids-bcts-data-pg-transformations:main",
cmds=["python3", "./bcts_etl.py"],
arguments=[sql_file_path],
# Following configs are different in the local development environment
# image_pull_policy="Always",
# in_cluster=True,
# service_account_name="airflow-admin",
name=f"run_{LOB}_transformation_annual_developed_volume",
labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"},
is_delete_operator_pod=True,
secrets=[ods_secrets],
container_resources= client.V1ResourceRequirements(
requests={"cpu": "50m", "memory": "512Mi"},
limits={"cpu": "100m", "memory": "1024Mi"})
)
else:
# In Dev, Test, and Prod Environments
run_replication = KubernetesPodOperator(
task_id="run_replication",
image="ghcr.io/bcgov/nr-dap-ods-pg-transformations:main",
cmds=["python3", "./bcts_etl.py"],
arguments=[sql_file_path],
image_pull_policy="Always",
in_cluster=True,
service_account_name="airflow-admin",
name=f"run_{LOB}_transformation_annual_developed_volume",
labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"},
is_delete_operator_pod=True,
secrets=[ods_secrets],
container_resources= client.V1ResourceRequirements(
requests={"cpu": "50m", "memory": "512Mi"},
limits={"cpu": "100m", "memory": "1024Mi"})
)
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import os

LOB = 'lrm'
# For local development environment.
# For local development environment only.
ENV = os.getenv("AIRFLOW_ENV")

ods_secrets = Secret("env", None, f"{LOB}-ods-database")
Expand All @@ -33,7 +33,7 @@
}

with DAG(
start_date=datetime(2024, 11, 22),
start_date=datetime(2024, 11, 23),
catchup=False,
schedule='0 4 * * MON-FRI',
dag_id=f"replication-pipeline-{LOB}",
Expand All @@ -46,6 +46,10 @@
run_replication = KubernetesPodOperator(
task_id="run_replication",
image="nrids-bcts-data-ora2pg:main",
# Following configs are different in the local development environment
# image_pull_policy="Always",
# in_cluster=True,
# service_account_name="airflow-admin",
name=f"run_{LOB}_replication",
labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"},
is_delete_operator_pod=True,
Expand Down

0 comments on commit 24d5d78

Please sign in to comment.