Skip to content

Commit

Permalink
pushing new dag
Browse files Browse the repository at this point in the history
  • Loading branch information
vishreddy01 committed Feb 28, 2024
1 parent aa2c3c5 commit f8d32ee
Showing 1 changed file with 46 additions and 0 deletions.
46 changes: 46 additions & 0 deletions pmt/af/DAG/pmt_dlh_load_dal_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from airflow import DAG
from pendulum import datetime
from kubernetes import client
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.secret import Secret
from airflow.models import Variable
from airflow.operators.dummy import DummyOperator

ods_password = Variable.get("ods_password")
dlh_password = Variable.get("dlh_password")

with DAG(
start_date=datetime(2024, 2, 20),
catchup=False,
schedule=None,
dag_id="pmt_dlh_load_dal_extract",
) as dag:
start_task = DummyOperator(task_id="start_task")

load_dim_org = KubernetesPodOperator(
task_id="task1_load_extract_permits",
image="ghcr.io/bcgov/nr-dap-dlh-pmt:main",
image_pull_policy="Always",
in_cluster=True,
namespace="a1b9b0-dev",
service_account_name="airflow-admin",
name="task1_load_extract_permits",
random_name_suffix=True,
labels={"DataClass": "Low", "env": "dev", "ConnectionType": "database"},
env_vars={"ods_password": ods_password, "dlh_password": dlh_password},
reattach_on_restart=True,
is_delete_operator_pod=False,
get_logs=True,
log_events_on_failure=True,
container_resources= client.V1ResourceRequirements(
requests={"cpu": "50m", "memory": "256Mi"},
limits={"cpu": "1", "memory": "1Gi"}),
cmds=["dbt"],
arguments=["snapshot","--select","extract_permits","--profiles-dir","/usr/app/dbt/.dbt"]
# arguments=["test","--profiles-dir","/usr/app/dbt/.dbt"]
)

# Set task dependencies
start_task >> task1_load_extract_permits


0 comments on commit f8d32ee

Please sign in to comment.