Skip to content

Commit

Permalink
Added DAG to replicate BCTS LRM data
Browse files Browse the repository at this point in the history
  • Loading branch information
smunthik committed Oct 17, 2024
1 parent 236827a commit 723eec9
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions dags/bcts_lrm_forest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from airflow import DAG
from pendulum import datetime
from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from datetime import timedelta
import os

LOB = 'lrm'
# For local development environment.
ENV = os.getenv("AIRFLOW_ENV")

ods_secrets = Secret("env", None, f"{LOB}-ods-database")
lob_secrets = Secret("env", None, f"{LOB}-database")

if ENV == 'LOCAL':
default_args = {
'owner': 'PMT',
"email": ["[email protected]"],
'retries': 1,
'retry_delay': timedelta(minutes=5),
"email_on_failure": False, # No alerts in local environment
"email_on_retry": False,
}
else:
default_args = {
'owner': 'PMT',
"email": ["[email protected]"],
'retries': 1,
'retry_delay': timedelta(minutes=5),
"email_on_failure": True,
"email_on_retry": False,
}

with DAG(
start_date=datetime(2023, 11, 23),
catchup=False,
schedule='0 12 * * *',
dag_id=f"replication-pipeline-{LOB}",
default_args=default_args,
description='DAG to replicate LRM data to ODS for BCTS Annual Developed Volume Dashboard',
) as dag:

if ENV == 'LOCAL':

run_replication = KubernetesPodOperator(
task_id="run_replication",
image="nrids-bcts-data-ora2pg:main",
name=f"run_{LOB}_replication",
labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"},
is_delete_operator_pod=True,
secrets=[lob_secrets, ods_secrets],
container_resources= client.V1ResourceRequirements(
requests={"cpu": "50m", "memory": "512Mi"},
limits={"cpu": "100m", "memory": "1024Mi"})
)
else:
# In Dev, Test, and Prod Environments
run_replication = KubernetesPodOperator(
task_id="run_replication",
image="ghcr.io/bcgov/nr-dap-ods-ora2pg:main",
image_pull_policy="Always",
in_cluster=True,
service_account_name="airflow-admin",
name=f"run_{LOB}_replication",
labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"},
is_delete_operator_pod=True,
secrets=[lob_secrets, ods_secrets],
container_resources= client.V1ResourceRequirements(
requests={"cpu": "50m", "memory": "512Mi"},
limits={"cpu": "100m", "memory": "1024Mi"})
)

0 comments on commit 723eec9

Please sign in to comment.