From 58a73e4c3350ca426a1a518825e5a0bf20eab573 Mon Sep 17 00:00:00 2001 From: abimichel <136123575+abimichel@users.noreply.github.com> Date: Wed, 7 Feb 2024 11:16:08 -0800 Subject: [PATCH 01/11] add vault_example DAG --- dags/vault_example.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 dags/vault_example.py diff --git a/dags/vault_example.py b/dags/vault_example.py new file mode 100644 index 0000000..f52db0d --- /dev/null +++ b/dags/vault_example.py @@ -0,0 +1,31 @@ +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret + +vault_jwt = Secret("env", None, "nr-vault-jwt") + +with DAG( + start_date=datetime(2023, 11, 23), + catchup=False, + schedule=None, + dag_id="vault_example", +) as dag: + run_ats_replication = KubernetesPodOperator( + task_id="get_ods_host", + image="ghcr.io/bcgov/nr-vault-patterns:main", + in_cluster=True, + namespace="a1b9b0-dev", + name="get_ods_host", + random_name_suffix=True, + labels={"DataClass": "High", "Release": "test-release-af"}, # network policies + reattach_on_restart=True, + is_delete_operator_pod=True, + get_logs=True, + log_events_on_failure=True, + secrets=[vault_jwt], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "10m", "memory": "256Mi"}, + limits={"cpu": "50m", "memory": "500Mi"}) + ) From 81270fd41e076af84817afcc938036ac5aeca2d4 Mon Sep 17 00:00:00 2001 From: abimichel <136123575+abimichel@users.noreply.github.com> Date: Wed, 7 Feb 2024 11:35:29 -0800 Subject: [PATCH 02/11] Update vault_example.py --- dags/vault_example.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dags/vault_example.py b/dags/vault_example.py index f52db0d..1edc454 100644 --- a/dags/vault_example.py +++ b/dags/vault_example.py @@ -12,7 +12,7 @@ schedule=None, dag_id="vault_example", ) as dag: - run_ats_replication = KubernetesPodOperator( + vault_action = KubernetesPodOperator( task_id="get_ods_host", image="ghcr.io/bcgov/nr-vault-patterns:main", in_cluster=True, From 61bc1500e9ea8187cdcf44658ed3c566abeae119 Mon Sep 17 00:00:00 2001 From: abimichel <136123575+abimichel@users.noreply.github.com> Date: Fri, 16 Feb 2024 12:33:51 -0800 Subject: [PATCH 03/11] DAG to trigger Oracle -> S3 Airbyte job --- dags/airbyte_trigger_job.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dags/airbyte_trigger_job.py b/dags/airbyte_trigger_job.py index 4208636..b936648 100644 --- a/dags/airbyte_trigger_job.py +++ b/dags/airbyte_trigger_job.py @@ -3,13 +3,14 @@ import pendulum import json -# API is located at http://dev-release-ab-airbyte-api-server-svc -# Job is called "Airbyte Tests as JSON" -# job trigger sometimes does not working due to competing resources when running airflow/airbyte at the same time +# For Oracle -> S3 replication of ORG_UNIT table +# API service: http://dev-release-ab-airbyte-api-server-svc +# API route: https://nr-airbyte-api.apps.emerald.devops.gov.bc.ca +# Beware of competing resources when running airflow/airbyte at the same time airbyte_job_type = "sync" -airbyte_connection_id = "92c71303-a1f8-4ab6-a6be-28a47f000319" -airbyte_workspace_id = "ea2db1ec-0357-4868-b216-4ca7333f5df4" +airbyte_connection_id = "46f2508b-0759-41c6-8f8b-33dd2910dd37" +airbyte_workspace_id = "c7528958-f674-4c2c-b91b-95030f0c4513" with DAG(dag_id='airbyte_trigger_job', schedule=None, From 394a1ffa839f3143deaae29a6bf4930cc9936b56 Mon Sep 17 00:00:00 2001 From: abimichel <136123575+abimichel@users.noreply.github.com> Date: Fri, 16 Feb 2024 14:28:31 -0800 Subject: [PATCH 04/11] set sync interval to 30 seconds --- airflow/values.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/values.yaml b/airflow/values.yaml index 42758c6..12b457b 100644 --- a/airflow/values.yaml +++ b/airflow/values.yaml @@ -1123,7 +1123,7 @@ git: ## @param git.sync.resources Sync sidecar container resource requests and limits ## sync: - interval: 90 # Abi: can increase in prod + interval: 30 # Abi: increased command: [] args: [] extraVolumeMounts: [] From 8592c27aaba9a239612149e3c52fb7eff0666093 Mon Sep 17 00:00:00 2001 From: abimichel <136123575+abimichel@users.noreply.github.com> Date: Fri, 16 Feb 2024 14:46:49 -0800 Subject: [PATCH 05/11] this is an example --- dags/dag_demo.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 dags/dag_demo.py diff --git a/dags/dag_demo.py b/dags/dag_demo.py new file mode 100644 index 0000000..94fcb2d --- /dev/null +++ b/dags/dag_demo.py @@ -0,0 +1,31 @@ +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator + +with DAG( + start_date=datetime(2023, 12, 28), + catchup=False, + schedule=None, + dag_id="demo_dag_example" +) as dag: + python_container = KubernetesPodOperator( + task_id="run_container", + image="artifacts.developer.gov.bc.ca/docker-remote/python", + image_pull_policy="IfNotPresent", + image_pull_secrets="artifactory-pull", + in_cluster=True, + namespace="a1b9b0-dev", + service_account_name="airflow-admin", + name="run_container", + random_name_suffix=True, + labels={"DataClass": "Low", "env": "dev"}, + reattach_on_restart=True, + is_delete_operator_pod=False, + get_logs=True, + log_events_on_failure=True, + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "256Mi"}, + limits={"cpu": "1", "memory": "1Gi"}), + cmds=["python3"] + ) From 75f778a3344a99966e0472b6926bb331b309bf46 Mon Sep 17 00:00:00 2001 From: abimichel <136123575+abimichel@users.noreply.github.com> Date: Tue, 20 Feb 2024 16:37:15 -0800 Subject: [PATCH 06/11] add sync from nr-dap-dlh --- airflow/values.yaml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/airflow/values.yaml b/airflow/values.yaml index 12b457b..761df59 100644 --- a/airflow/values.yaml +++ b/airflow/values.yaml @@ -1056,6 +1056,11 @@ git: branch: a1b9b0-dev name: nr-airflow-dags path: /dags + + - repository: https://github.com/bcgov/nr-dap-dlh + branch: main + name: nr-dap-dlh-dags + path: /pmt/af/DAG/ # - repository: "" ## Branch from repository to checkout From eff98d2f9cc2887aa35cb62a5cde2287afb1e1c1 Mon Sep 17 00:00:00 2001 From: abimichel <136123575+abimichel@users.noreply.github.com> Date: Wed, 20 Mar 2024 14:09:07 -0700 Subject: [PATCH 07/11] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1e71747..a6685f7 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Airflow Set Up +# Airflow Set Up for NRM Data Analytics Platform ## Helm chart sourced from ```sh From f29a3f82f869e207c1247368ca10b54f78f13dcd Mon Sep 17 00:00:00 2001 From: abimichel Date: Fri, 22 Mar 2024 11:24:25 -0700 Subject: [PATCH 08/11] Merge branch 'vault' of https://github.com/bcgov/nr-airflow into vault --- README.md | 23 ++-- airflow/charts/postgresql/values.yaml | 8 +- airflow/values.yaml | 16 +-- dags/airbyte_list_connections.py | 21 --- dags/airbyte_trigger_job.py | 30 ----- dags/connection_secrets_example.py | 29 ---- dags/dag_demo.py | 31 ----- dags/data_pipeline_dag_fta.py | 18 --- dags/data_pipeline_dag_rrs.py | 18 --- dags/dbt_dlh_example.py | 38 ------ dags/dbt_example.py | 37 ----- dags/dbt_rrs_snapshot.py | 33 ----- dags/example_bash_operator.py | 76 ----------- dags/example_sensors.py | 126 ------------------ dags/kubernetes_example.py | 33 ----- dags/oracle_example_print.py | 34 ----- dags/permitting_ats.py | 32 +++++ dags/permitting_fta.py | 32 +++++ dags/permitting_fta_replication.py | 32 +++++ dags/permitting_lexis.py | 31 +++++ dags/permitting_pipeline_ats.py | 34 ----- dags/permitting_pipeline_controller.py | 52 -------- dags/permitting_pipeline_controller_seq.py | 48 ------- ...rmitting_pipeline_etl_batch_id_creation.py | 94 ------------- ...permitting_pipeline_etl_batch_id_update.py | 43 ------ dags/permitting_pipeline_fta.py | 34 ----- dags/permitting_pipeline_rrs.py | 34 ----- dags/permitting_rrs_rp.py | 32 +++++ dags/permitting_rrs_rup.py | 32 +++++ dags/postgres_example_insert.py | 20 --- dags/postgres_example_print.py | 33 ----- dags/vault_example.py | 31 ----- oc/add-label-airflow.yaml | 23 ++++ oc/airflow-admin-role.yaml | 1 - oc/airflow-role-binding.yaml | 3 +- oc/allow-egress-airflow.yaml | 14 -- ...irflow.yaml => allow-traffic-airflow.yaml} | 8 +- oc/scheduler-hpa.yaml | 20 --- oc/webapp-hpa.yaml | 6 +- oc/webapp-route.yaml | 9 +- 40 files changed, 249 insertions(+), 1020 deletions(-) delete mode 100644 dags/airbyte_list_connections.py delete mode 100644 dags/airbyte_trigger_job.py delete mode 100644 dags/connection_secrets_example.py delete mode 100644 dags/dag_demo.py delete mode 100644 dags/data_pipeline_dag_fta.py delete mode 100644 dags/data_pipeline_dag_rrs.py delete mode 100644 dags/dbt_dlh_example.py delete mode 100644 dags/dbt_example.py delete mode 100644 dags/dbt_rrs_snapshot.py delete mode 100644 dags/example_bash_operator.py delete mode 100644 dags/example_sensors.py delete mode 100644 dags/kubernetes_example.py delete mode 100644 dags/oracle_example_print.py create mode 100644 dags/permitting_ats.py create mode 100644 dags/permitting_fta.py create mode 100644 dags/permitting_fta_replication.py create mode 100644 dags/permitting_lexis.py delete mode 100644 dags/permitting_pipeline_ats.py delete mode 100644 dags/permitting_pipeline_controller.py delete mode 100644 dags/permitting_pipeline_controller_seq.py delete mode 100644 dags/permitting_pipeline_etl_batch_id_creation.py delete mode 100644 dags/permitting_pipeline_etl_batch_id_update.py delete mode 100644 dags/permitting_pipeline_fta.py delete mode 100644 dags/permitting_pipeline_rrs.py create mode 100644 dags/permitting_rrs_rp.py create mode 100644 dags/permitting_rrs_rup.py delete mode 100644 dags/postgres_example_insert.py delete mode 100644 dags/postgres_example_print.py delete mode 100644 dags/vault_example.py create mode 100644 oc/add-label-airflow.yaml delete mode 100644 oc/allow-egress-airflow.yaml rename oc/{allow-ingress-airflow.yaml => allow-traffic-airflow.yaml} (65%) delete mode 100644 oc/scheduler-hpa.yaml diff --git a/README.md b/README.md index a6685f7..7d2923f 100644 --- a/README.md +++ b/README.md @@ -6,24 +6,25 @@ helm pull oci://registry-1.docker.io/bitnamicharts/airflow ``` ## Deploying to OpenShift +Create OpenShift ConfigMap for requirements.txt: ```sh -helm install test-release-af . +oc create configmap airflow-requirements --from-file=requirements.txt +``` +Navigate to the 'oc' folder then: +```sh +oc apply -f . +``` +Navigate to the 'airflow' folder then: +```sh +helm install airflow . ``` ## Visit the application here: -http://nr-airflow-dev.apps.emerald.devops.gov.bc.ca/ +http://nr-airflow.apps.emerald.devops.gov.bc.ca/ ## Upgrade OpenShift Deployment ```sh -helm upgrade -f values.yaml test-release-af . --version 16.1.2 +helm upgrade -f values.yaml airflow . ``` -## Create OpenShift ConfigMap for requirements.txt: -```sh -oc create configmap airflow-requirements --from-file=requirements.txt -``` -Delete if already exists - - More info: https://apps.nrs.gov.bc.ca/int/confluence/x/zQ09Cg - diff --git a/airflow/charts/postgresql/values.yaml b/airflow/charts/postgresql/values.yaml index 7489a1e..586589a 100644 --- a/airflow/charts/postgresql/values.yaml +++ b/airflow/charts/postgresql/values.yaml @@ -65,7 +65,7 @@ extraDeploy: [] ## commonLabels: DataClass: Medium # Abi: 'Medium' for now, need to look deeper into the metadata being stored - Release: test-release-af # Abi: Release label is attached to nwp + Release: airflow # Abi: Release label is attached to nwp ## @param commonAnnotations Add annotations to all the deployed resources ## commonAnnotations: {} @@ -683,7 +683,7 @@ primary: - ReadWriteOnce ## @param primary.persistence.size PVC Storage Request for PostgreSQL volume ## - size: 4Gi # Abi: lowered for dev and test environments + size: 2Gi # Abi: lowered ## @param primary.persistence.annotations Annotations for the PVC ## annotations: {} @@ -1047,7 +1047,7 @@ readReplicas: - ReadWriteOnce ## @param readReplicas.persistence.size PVC Storage Request for PostgreSQL volume ## - size: 8Gi + size: 2Gi # Abi: previously 8Gi ## @param readReplicas.persistence.annotations Annotations for the PVC ## annotations: {} @@ -1164,7 +1164,7 @@ backup: - ReadWriteOnce ## @param backup.cronjob.storage.size PVC Storage Request for the backup data volume ## - size: 8Gi + size: 2Gi # Abi: previously 8Gi ## @param backup.cronjob.storage.annotations PVC annotations ## annotations: {} diff --git a/airflow/values.yaml b/airflow/values.yaml index 761df59..404dce8 100644 --- a/airflow/values.yaml +++ b/airflow/values.yaml @@ -41,8 +41,8 @@ extraDeploy: [] ## @param commonLabels Add labels to all the deployed resources ## commonLabels: - DataClass: Low # Abi: Not the same as the DataClass for the jobs. 'Low' for app pods, usually 'Medium' or 'High' for jobs. - Release: test-release-af # Abi: Release label is attached to nwp + DataClass: Low # Abi: Not the same as the DataClass for the jobs. 'Low' for app pods, usually 'Medium' for jobs. + Release: airflow # Abi: Release label is attached to nwp # Internet-Ingress: ALLOW ## @param commonAnnotations Add annotations to all the deployed resources @@ -328,10 +328,10 @@ web: resources: #Abi: keep requests low but be careful with limits as it crashes when memory limit is hit requests: memory: 600Mi - cpu: 10m + cpu: 50m limits: memory: 2Gi - cpu: '2' + cpu: 1000m ## Configure Airflow web pods Security Context ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod ## @param web.podSecurityContext.enabled Enabled Airflow web pods' Security Context @@ -556,7 +556,7 @@ scheduler: cpu: 10m limits: memory: 1Gi - cpu: '1' + cpu: 1000m ## Configure Airflow scheduler pods Security Context ## ref: https://kubernetes.io/docs/tasks/configure-pod-container/security-context/#set-the-security-context-for-a-pod ## @param scheduler.podSecurityContext.enabled Enabled Airflow scheduler pods' Security Context @@ -1053,10 +1053,10 @@ git: # git checkout repositories: - repository: https://github.com/bcgov/nr-airflow - branch: a1b9b0-dev + branch: a1b9b0-prod name: nr-airflow-dags path: /dags - + - repository: https://github.com/bcgov/nr-dap-dlh branch: main name: nr-dap-dlh-dags @@ -1096,7 +1096,6 @@ git: command: [] args: [] extraVolumeMounts: [] - # extraEnvVars: [] # Abi: values found in Emerald documentation here: https://digital.gov.bc.ca/cloud/services/private/internal-resources/emerald/ extraEnvVars: - name: http_proxy @@ -1132,7 +1131,6 @@ git: command: [] args: [] extraVolumeMounts: [] - # extraEnvVars: [] # Abi: values found in Emerald documentation here: https://digital.gov.bc.ca/cloud/services/private/internal-resources/emerald/ extraEnvVars: - name: http_proxy diff --git a/dags/airbyte_list_connections.py b/dags/airbyte_list_connections.py deleted file mode 100644 index c21ce10..0000000 --- a/dags/airbyte_list_connections.py +++ /dev/null @@ -1,21 +0,0 @@ -from airflow import DAG -from airflow.providers.http.operators.http import SimpleHttpOperator -import pendulum - -# API is located at http://dev-release-ab-airbyte-api-server-svc - -airbyte_job_type = "sync" -airbyte_connection_id = "2b9f5022-51d3-4fdf-ba83-82d1b5d80ce8" -airbyte_workspace_id = "ea2db1ec-0357-4868-b216-4ca7333f5df4" - -with DAG(dag_id='airbyte_list_connections', - schedule=None, - start_date=pendulum.today('UTC') - ) as dag: - airbyte_connections = SimpleHttpOperator( - method="GET", - task_id='get_airbyte_connections', - http_conn_id='airbyte-api', - headers={"Accept": "application/json"}, - endpoint= f'/v1/connections?workspaceIds={airbyte_workspace_id}&includeDeleted=false', - log_response=True) \ No newline at end of file diff --git a/dags/airbyte_trigger_job.py b/dags/airbyte_trigger_job.py deleted file mode 100644 index b936648..0000000 --- a/dags/airbyte_trigger_job.py +++ /dev/null @@ -1,30 +0,0 @@ -from airflow import DAG -from airflow.providers.http.operators.http import SimpleHttpOperator -import pendulum -import json - -# For Oracle -> S3 replication of ORG_UNIT table -# API service: http://dev-release-ab-airbyte-api-server-svc -# API route: https://nr-airbyte-api.apps.emerald.devops.gov.bc.ca -# Beware of competing resources when running airflow/airbyte at the same time - -airbyte_job_type = "sync" -airbyte_connection_id = "46f2508b-0759-41c6-8f8b-33dd2910dd37" -airbyte_workspace_id = "c7528958-f674-4c2c-b91b-95030f0c4513" - -with DAG(dag_id='airbyte_trigger_job', - schedule=None, - start_date=pendulum.today('UTC') - ) as dag: - trigger_airbyte_sync = SimpleHttpOperator( - method="POST", - task_id='airbyte_sync', - http_conn_id='airbyte-api', - headers={ - "Content-Type":"application/json", - "Accept": "application/json", - "Content-Length": "72"}, - endpoint='/v1/jobs', - data=json.dumps({"connectionId": airbyte_connection_id, "jobType": airbyte_job_type}), - log_response=True) - diff --git a/dags/connection_secrets_example.py b/dags/connection_secrets_example.py deleted file mode 100644 index be189b0..0000000 --- a/dags/connection_secrets_example.py +++ /dev/null @@ -1,29 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from airflow.operators.python import PythonOperator -from airflow.hooks.base import BaseHook - -def print_connection_details(**kwargs): - fta_connection = BaseHook.get_connection('oracle_fta_dev_conn') - - conn_type = fta_connection.conn_type - host = fta_connection.host - database = fta_connection.schema - username = fta_connection.login - password = fta_connection.password - port = fta_connection.port - - print(conn_type, host, database, username, password, port) - -with DAG( - start_date=datetime(2023, 11, 30), - catchup=False, - schedule=None, - dag_id="connection_secrets_example", -) as dag: - my_python_task = PythonOperator( - task_id='print_connection_details', - python_callable=print_connection_details, - provide_context=True, - dag=dag, - ) diff --git a/dags/dag_demo.py b/dags/dag_demo.py deleted file mode 100644 index 94fcb2d..0000000 --- a/dags/dag_demo.py +++ /dev/null @@ -1,31 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from kubernetes import client -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator - -with DAG( - start_date=datetime(2023, 12, 28), - catchup=False, - schedule=None, - dag_id="demo_dag_example" -) as dag: - python_container = KubernetesPodOperator( - task_id="run_container", - image="artifacts.developer.gov.bc.ca/docker-remote/python", - image_pull_policy="IfNotPresent", - image_pull_secrets="artifactory-pull", - in_cluster=True, - namespace="a1b9b0-dev", - service_account_name="airflow-admin", - name="run_container", - random_name_suffix=True, - labels={"DataClass": "Low", "env": "dev"}, - reattach_on_restart=True, - is_delete_operator_pod=False, - get_logs=True, - log_events_on_failure=True, - container_resources= client.V1ResourceRequirements( - requests={"cpu": "50m", "memory": "256Mi"}, - limits={"cpu": "1", "memory": "1Gi"}), - cmds=["python3"] - ) diff --git a/dags/data_pipeline_dag_fta.py b/dags/data_pipeline_dag_fta.py deleted file mode 100644 index 1de4e42..0000000 --- a/dags/data_pipeline_dag_fta.py +++ /dev/null @@ -1,18 +0,0 @@ -from datetime import datetime -from airflow import DAG -from airflow.operators.dummy_operator import DummyOperator -from airflow.operators.python_operator import PythonOperator -from data_replication_cls import data_replication - -dag = DAG('data_pipeline_dag_fta', description='Data Pipeline to execute replication and ETL for FTA tables', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False) - -with dag: - dummy_task = DummyOperator(task_id='ETL_Start', retries = 3), - - start_replication_fta = data_replication(mstr_schema = 'app_rrs1', - app_name = 'fta', - env = 'dev', - task_id='start_replication_fta' - ) - -dummy_task >> start_replication_fta \ No newline at end of file diff --git a/dags/data_pipeline_dag_rrs.py b/dags/data_pipeline_dag_rrs.py deleted file mode 100644 index 521a130..0000000 --- a/dags/data_pipeline_dag_rrs.py +++ /dev/null @@ -1,18 +0,0 @@ -from datetime import datetime -from airflow import DAG -from airflow.operators.dummy_operator import DummyOperator -from airflow.operators.python_operator import PythonOperator -from data_replication_cls import data_replication - -dag = DAG('data_pipeline_dag_rrs', description='Data Pipeline to execute replication and ETL for RRS tables', schedule_interval='0 12 * * *', start_date=datetime(2017, 3, 20), catchup=False) - -with dag: - dummy_task = DummyOperator(task_id='ETL_Start', retries = 3), - - start_replication_rrs = data_replication(mstr_schema = 'app_rrs1', - app_name = 'rrs', - env = 'dev', - task_id='start_replication_rrs' - ) - -dummy_task >> start_replication_rrs \ No newline at end of file diff --git a/dags/dbt_dlh_example.py b/dags/dbt_dlh_example.py deleted file mode 100644 index 12c963e..0000000 --- a/dags/dbt_dlh_example.py +++ /dev/null @@ -1,38 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from kubernetes import client -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.secret import Secret - -ods_secrets = Secret("env", None, "ods-database") -lh_secrets = Secret("env", None, "foriegn-data-wrapper-ods-dlh-dev") - -with DAG( - start_date=datetime(2023, 11, 23), - catchup=False, - schedule=None, - dag_id="dbt_dlh_example", -) as dag: - run_ats_replication = KubernetesPodOperator( - task_id="init_dbt_container", - secrets=[ods_secrets, lh_secrets], - image="ghcr.io/bcgov/nr-dbt-project:main", - image_pull_policy="Always", - in_cluster=True, - namespace="a1b9b0-dev", - service_account_name="airflow-admin", - name="run_dbt_container", - random_name_suffix=True, - labels={"DataClass": "Low", "env": "dev", "ConnectionType": "database"}, - reattach_on_restart=True, - is_delete_operator_pod=False, - get_logs=True, - log_events_on_failure=True, - container_resources= client.V1ResourceRequirements( - requests={"cpu": "50m", "memory": "256Mi"}, - limits={"cpu": "1", "memory": "1Gi"}), - cmds=["dbt"], - arguments=["run","--select","road_tenure_type_code_agg","--profiles-dir","/usr/app/dbt/.dbt"] - # arguments=["test","--profiles-dir","/usr/app/dbt/.dbt"] - ) - diff --git a/dags/dbt_example.py b/dags/dbt_example.py deleted file mode 100644 index c3928db..0000000 --- a/dags/dbt_example.py +++ /dev/null @@ -1,37 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from kubernetes import client -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.secret import Secret - -ods_secrets = Secret("env", None, "ods-database") -lh_secrets = Secret("env", None, "lh-database") - -with DAG( - start_date=datetime(2023, 11, 23), - catchup=False, - schedule=None, - dag_id="dbt_example", -) as dag: - run_ats_replication = KubernetesPodOperator( - task_id="init_dbt_container", - # Abi: the GHCR container below is from the main Dockerfile - image="ghcr.io/bcgov/nr-dbt-project:main", - secrets=[ods_secrets, lh_secrets], - in_cluster=True, - namespace="a1b9b0-dev", - service_account_name="airflow-admin", - name="run_dbt_container", - random_name_suffix=True, - labels={"DataClass": "Low", "env": "dev", "ConnectionType": "database"}, - reattach_on_restart=True, - is_delete_operator_pod=False, - get_logs=True, - log_events_on_failure=True, - container_resources= client.V1ResourceRequirements( - requests={"cpu": "50m", "memory": "256Mi"}, - limits={"cpu": "1", "memory": "1Gi"}), - cmds=["dbt"], - arguments=["snapshot", "--profiles-dir", "/usr/app/dbt/.dbt"] - ) - diff --git a/dags/dbt_rrs_snapshot.py b/dags/dbt_rrs_snapshot.py deleted file mode 100644 index 4272c25..0000000 --- a/dags/dbt_rrs_snapshot.py +++ /dev/null @@ -1,33 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from kubernetes import client -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.secret import Secret - -with DAG( - start_date=datetime(2023, 11, 23), - catchup=False, - schedule=None, - dag_id="dbt_rrs_snapshot", -) as dag: - run_rrs_snapshot = KubernetesPodOperator( - task_id="run_rrs_snapshot", - # Abi: the GHCR container below is a WIP - need to set up containers for each folder - # image="ghcr.io/bcgov/nr-dbt-project:main", - image="image-registry.openshift-image-registry.svc:5000/a1b9b0-dev/dbt-container-snapshot-test@sha256:235ba0140c551ff912d353e51891724db4308316d08736d8c219a49a622ef85a", - in_cluster=True, - namespace="a1b9b0-test", - service_account_name="airflow-admin", - name="run_rrs_snapshot", - random_name_suffix=True, - labels={"DataClass": "Medium", "ConnectionType": "database"}, # network policies - reattach_on_restart=True, - is_delete_operator_pod=False, - get_logs=True, - log_events_on_failure=True, - container_resources= client.V1ResourceRequirements( - requests={"cpu": "10m", "memory": "256Mi"}, - limits={"cpu": "50m", "memory": "500Mi"}), - cmds=["dbt"], - arguments=["snapshot", "--profiles-dir", "/usr/app/dbt/.dbt"] - ) \ No newline at end of file diff --git a/dags/example_bash_operator.py b/dags/example_bash_operator.py deleted file mode 100644 index 2988479..0000000 --- a/dags/example_bash_operator.py +++ /dev/null @@ -1,76 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -"""Example DAG demonstrating the usage of the BashOperator.""" -from __future__ import annotations - -import datetime - -import pendulum - -from airflow.models.dag import DAG -from airflow.operators.bash import BashOperator -from airflow.operators.empty import EmptyOperator - -with DAG( - dag_id="example_bash_operator", - schedule="0 0 * * *", - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - dagrun_timeout=datetime.timedelta(minutes=60), - tags=["example", "example2"], - params={"example_key": "example_value"}, -) as dag: - run_this_last = EmptyOperator( - task_id="run_this_last", - ) - - # [START howto_operator_bash] - run_this = BashOperator( - task_id="run_after_loop", - bash_command="echo 1", - ) - # [END howto_operator_bash] - - run_this >> run_this_last - - for i in range(3): - task = BashOperator( - task_id=f"runme_{i}", - bash_command='echo "{{ task_instance_key_str }}" && sleep 1', - ) - task >> run_this - - # [START howto_operator_bash_template] - also_run_this = BashOperator( - task_id="also_run_this", - bash_command='echo "ti_key={{ task_instance_key_str }}"', - ) - # [END howto_operator_bash_template] - also_run_this >> run_this_last - -# [START howto_operator_bash_skip] -this_will_skip = BashOperator( - task_id="this_will_skip", - bash_command='echo "hello world"; exit 99;', - dag=dag, -) -# [END howto_operator_bash_skip] -this_will_skip >> run_this_last - -if __name__ == "__main__": - dag.test() diff --git a/dags/example_sensors.py b/dags/example_sensors.py deleted file mode 100644 index 9e3fc02..0000000 --- a/dags/example_sensors.py +++ /dev/null @@ -1,126 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -from __future__ import annotations - -import datetime - -import pendulum - -from airflow.models.dag import DAG -from airflow.operators.bash import BashOperator -from airflow.sensors.bash import BashSensor -from airflow.sensors.filesystem import FileSensor -from airflow.sensors.python import PythonSensor -from airflow.sensors.time_delta import TimeDeltaSensor, TimeDeltaSensorAsync -from airflow.sensors.time_sensor import TimeSensor, TimeSensorAsync -from airflow.sensors.weekday import DayOfWeekSensor -from airflow.utils.trigger_rule import TriggerRule -from airflow.utils.weekday import WeekDay - - -# [START example_callables] -def success_callable(): - return True - - -def failure_callable(): - return False - - -# [END example_callables] - - -with DAG( - dag_id="example_sensors", - schedule=None, - start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), - catchup=False, - tags=["example"], -) as dag: - # [START example_time_delta_sensor] - t0 = TimeDeltaSensor(task_id="wait_some_seconds", delta=datetime.timedelta(seconds=2)) - # [END example_time_delta_sensor] - - # [START example_time_delta_sensor_async] - t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async", delta=datetime.timedelta(seconds=2)) - # [END example_time_delta_sensor_async] - - # [START example_time_sensors] - t1 = TimeSensor( - task_id="fire_immediately", target_time=datetime.datetime.now(tz=datetime.timezone.utc).time() - ) - - t2 = TimeSensor( - task_id="timeout_after_second_date_in_the_future", - timeout=1, - soft_fail=True, - target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), - ) - # [END example_time_sensors] - - # [START example_time_sensors_async] - t1a = TimeSensorAsync( - task_id="fire_immediately_async", target_time=datetime.datetime.now(tz=datetime.timezone.utc).time() - ) - - t2a = TimeSensorAsync( - task_id="timeout_after_second_date_in_the_future_async", - timeout=1, - soft_fail=True, - target_time=(datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta(hours=1)).time(), - ) - # [END example_time_sensors_async] - - # [START example_bash_sensors] - t3 = BashSensor(task_id="Sensor_succeeds", bash_command="exit 0") - - t4 = BashSensor(task_id="Sensor_fails_after_3_seconds", timeout=3, soft_fail=True, bash_command="exit 1") - # [END example_bash_sensors] - - t5 = BashOperator(task_id="remove_file", bash_command="rm -rf /tmp/temporary_file_for_testing") - - # [START example_file_sensor] - t6 = FileSensor(task_id="wait_for_file", filepath="/tmp/temporary_file_for_testing") - # [END example_file_sensor] - - t7 = BashOperator( - task_id="create_file_after_3_seconds", bash_command="sleep 3; touch /tmp/temporary_file_for_testing" - ) - - # [START example_python_sensors] - t8 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable) - - t9 = PythonSensor( - task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable - ) - # [END example_python_sensors] - - # [START example_day_of_week_sensor] - t10 = DayOfWeekSensor( - task_id="week_day_sensor_failing_on_timeout", timeout=3, soft_fail=True, week_day=WeekDay.MONDAY - ) - # [END example_day_of_week_sensor] - - tx = BashOperator(task_id="print_date_in_bash", bash_command="date") - - tx.trigger_rule = TriggerRule.NONE_FAILED - [t0, t0a, t1, t1a, t2, t2a, t3, t4] >> tx - t5 >> t6 >> tx - t7 >> tx - [t8, t9] >> tx - t10 >> tx diff --git a/dags/kubernetes_example.py b/dags/kubernetes_example.py deleted file mode 100644 index 1d2b90b..0000000 --- a/dags/kubernetes_example.py +++ /dev/null @@ -1,33 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from kubernetes import client -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator - -# Work in progress - -with DAG( - start_date=datetime(2023, 12, 28), - catchup=False, - schedule=None, - dag_id="kubernetes_example" -) as dag: - run_ats_replication = KubernetesPodOperator( - task_id="run_container", - image="artifacts.developer.gov.bc.ca/docker-remote/python", - image_pull_policy="IfNotPresent", - image_pull_secrets="artifactory-pull", - in_cluster=True, - namespace="a1b9b0-dev", - service_account_name="airflow-admin", - name="run_container", - random_name_suffix=True, - labels={"DataClass": "Low", "env": "dev"}, - reattach_on_restart=True, - is_delete_operator_pod=False, - get_logs=True, - log_events_on_failure=True, - container_resources= client.V1ResourceRequirements( - requests={"cpu": "50m", "memory": "256Mi"}, - limits={"cpu": "1", "memory": "1Gi"}), - cmds=["python3"] - ) diff --git a/dags/oracle_example_print.py b/dags/oracle_example_print.py deleted file mode 100644 index e15a1cb..0000000 --- a/dags/oracle_example_print.py +++ /dev/null @@ -1,34 +0,0 @@ -from airflow import DAG -from airflow.providers.oracle.operators.oracle import OracleOperator -from airflow.utils.dates import datetime -from airflow.operators.python import PythonOperator - -default_args = { - 'owner': 'airflow', - 'start_date': datetime(2023, 8, 23), -} - -with DAG('oracle_example_print', - default_args=default_args, - schedule_interval=None) as dag: - t1 = OracleOperator( - task_id='execute_sql', - sql="SELECT count(*) FROM THE.TENURE_APPLICATION_STATE_CODE", - oracle_conn_id="oracle_fta_dev_conn", - autocommit=True, - ) - - def print_result(**context): - ti = context['task_instance'] - result = ti.xcom_pull(task_ids='execute_sql') - print("Result of SQL query:") - for row in result: - print(row) - - t2 = PythonOperator( - task_id='print_result', - python_callable=print_result, - provide_context=True, - ) - - t1 >> t2 diff --git a/dags/permitting_ats.py b/dags/permitting_ats.py new file mode 100644 index 0000000..a962ff7 --- /dev/null +++ b/dags/permitting_ats.py @@ -0,0 +1,32 @@ + +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret + +LOB = 'ats' + +ods_secrets = Secret("env", None, "ods-database") +lob_secrets = Secret("env", None, f"{LOB}-database") + +with DAG( + start_date=datetime(2023, 11, 23), + catchup=False, + schedule='0 6 * * *', + dag_id=f"permitting-pipeline-{LOB}", +) as dag: + run_lexis_replication = KubernetesPodOperator( + task_id="run_replication", + image="ghcr.io/bcgov/nr-permitting-pipelines:main", + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"run_{LOB}_replication", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=False, + secrets=[lob_secrets, ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file diff --git a/dags/permitting_fta.py b/dags/permitting_fta.py new file mode 100644 index 0000000..cbb33cc --- /dev/null +++ b/dags/permitting_fta.py @@ -0,0 +1,32 @@ + +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret + +LOB = 'fta' # ats, fta, rrs, or lexis + +ods_secrets = Secret("env", None, "ods-database") +lob_secrets = Secret("env", None, f"{LOB}-database") + +with DAG( + start_date=datetime(2023, 11, 23), + catchup=False, + schedule='5 6 * * *', + dag_id=f"permitting-pipeline-{LOB}", +) as dag: + run_lexis_replication = KubernetesPodOperator( + task_id="run_replication", + image="ghcr.io/bcgov/nr-permitting-pipelines:main", + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"run_{LOB}replication", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=False, + secrets=[lob_secrets, ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file diff --git a/dags/permitting_fta_replication.py b/dags/permitting_fta_replication.py new file mode 100644 index 0000000..d0a019c --- /dev/null +++ b/dags/permitting_fta_replication.py @@ -0,0 +1,32 @@ + +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret + +LOB = 'fta' + +ods_secrets = Secret("env", None, "ods-database") +lob_secrets = Secret("env", None, f"{LOB}-replication-database") + +with DAG( + start_date=datetime(2023, 11, 23), + catchup=False, + schedule='25 6 * * *', + dag_id=f"permitting-pipeline-{LOB}-replication", +) as dag: + run_lexis_replication = KubernetesPodOperator( + task_id="run_replication", + image="ghcr.io/bcgov/nr-permitting-pipelines:main", + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"run_{LOB}_replication", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=False, + secrets=[lob_secrets, ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file diff --git a/dags/permitting_lexis.py b/dags/permitting_lexis.py new file mode 100644 index 0000000..6c07eaa --- /dev/null +++ b/dags/permitting_lexis.py @@ -0,0 +1,31 @@ +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret + +LOB = 'lexis' # ats, fta, rrs, or lexis + +ods_secrets = Secret("env", None, "ods-database") +lob_secrets = Secret("env", None, f"{LOB}-database") + +with DAG( + start_date=datetime(2023, 11, 23), + catchup=False, + schedule='10 6 * * *', + dag_id=f"permitting-pipeline-{LOB}", +) as dag: + run_lexis_replication = KubernetesPodOperator( + task_id="run_replication", + image="ghcr.io/bcgov/nr-permitting-pipelines:main", + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"run_{LOB}replication", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=False, + secrets=[lob_secrets, ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file diff --git a/dags/permitting_pipeline_ats.py b/dags/permitting_pipeline_ats.py deleted file mode 100644 index 3a87aa6..0000000 --- a/dags/permitting_pipeline_ats.py +++ /dev/null @@ -1,34 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from kubernetes import client -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.secret import Secret - -ods_secrets = Secret("env", None, "ods-database") -ats_secrets = Secret("env", None, "ats-database") - -with DAG( - start_date=datetime(2023, 11, 23), - catchup=False, - schedule=None, - dag_id="permitting_pipeline_ats", -) as dag: - run_ats_replication = KubernetesPodOperator( - task_id="run_ats_replication", - image="ghcr.io/bcgov/nr-permitting-pipelines:main", - # image="image-registry.openshift-image-registry.svc:5000/a1b9b0-dev/data-replication-parametrized-audit1@sha256:8c51ee820434e4f5d06a91deda645bcd0a943b8c87bc3c8a8e67dead1c18a786", - in_cluster=True, - namespace="a1b9b0-dev", - service_account_name="airflow-admin", - name="run_ats_replication", - random_name_suffix=True, - labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "test-release-af"}, # network policies - reattach_on_restart=True, - is_delete_operator_pod=True, - get_logs=True, - log_events_on_failure=True, - secrets=[ats_secrets, ods_secrets], - container_resources= client.V1ResourceRequirements( - requests={"cpu": "10m", "memory": "256Mi"}, - limits={"cpu": "50m", "memory": "500Mi"}) - ) diff --git a/dags/permitting_pipeline_controller.py b/dags/permitting_pipeline_controller.py deleted file mode 100644 index a9967af..0000000 --- a/dags/permitting_pipeline_controller.py +++ /dev/null @@ -1,52 +0,0 @@ -from airflow import DAG -from airflow.operators.dagrun_operator import TriggerDagRunOperator -from datetime import datetime, timedelta - -# Define default_args dictionary -default_args = { - 'owner': 'airflow', - 'start_date': datetime(2023, 11, 27), - 'retries': 1, - 'retry_delay': timedelta(minutes=5), -} - -# Define the controller DAG -controller_dag = DAG( - 'permitting_pipeline_controller', - default_args=default_args, - description='Controller DAG to run other DAGs in order', - schedule_interval='0 6 * * *', # 10pm PST - 6am UTC - catchup=False, # Set to False to skip historical runs -) - -# Define the list of sub-DAGs in the desired order -sub_dags_in_order = [ - 'permitting_pipeline_etl_batch_id_creation', - 'permitting_pipeline_fta', - 'permitting_pipeline_rrs', - 'permitting_pipeline_ats', - 'permitting_pipeline_etl_batch_id_update', -] - -# Create TriggerDagRunOperator for each sub-DAG -trigger_operators = {} -for sub_dag_id in sub_dags_in_order: - trigger_operator = TriggerDagRunOperator( - task_id=f'trigger_{sub_dag_id}', - trigger_dag_id=sub_dag_id, - # conf={'batch_id': 'your_batch_id_value'}, # Pass any necessary configuration - dag=controller_dag, - ) - trigger_operators[sub_dag_id] = trigger_operator - -# Set up task dependencies -trigger_operators['permitting_pipeline_etl_batch_id_creation'] >> [ - trigger_operators['permitting_pipeline_fta'], - trigger_operators['permitting_pipeline_rrs'], - trigger_operators['permitting_pipeline_ats'] -] - -# Set up parallel execution for FTA, RRS, and ATS -[trigger_operators['permitting_pipeline_fta'], - trigger_operators['permitting_pipeline_rrs'], - trigger_operators['permitting_pipeline_ats']] >> trigger_operators['permitting_pipeline_etl_batch_id_update'] diff --git a/dags/permitting_pipeline_controller_seq.py b/dags/permitting_pipeline_controller_seq.py deleted file mode 100644 index 4efe78d..0000000 --- a/dags/permitting_pipeline_controller_seq.py +++ /dev/null @@ -1,48 +0,0 @@ -from airflow import DAG -from airflow.operators.dagrun_operator import TriggerDagRunOperator -from datetime import datetime, timedelta - -# Define default_args dictionary -default_args = { - 'owner': 'airflow', - 'start_date': datetime(2023, 11, 28), - 'retries': 1, - 'retry_delay': timedelta(minutes=5), -} - -# Define the controller DAG -controller_dag = DAG( - 'permitting_pipeline_controller_seq', - default_args=default_args, - description='Controller DAG to run other DAGs in order', - schedule_interval=None, # Set your desired schedule_interval - catchup=False, # Set to False to skip historical runs -) - -# Define the list of sub-DAGs in the desired order -sub_dags_in_order = [ - 'permitting_pipeline_etl_batch_id_creation', - 'permitting_pipeline_fta', - 'permitting_pipeline_rrs', - 'permitting_pipeline_ats', - 'permitting_pipeline_etl_batch_id_update', -] - -# Create TriggerDagRunOperator for each sub-DAG -trigger_operators = {} -for sub_dag_id in sub_dags_in_order: - trigger_operator = TriggerDagRunOperator( - task_id=f'trigger_{sub_dag_id}', - trigger_dag_id=sub_dag_id, - # conf={'batch_id': 'your_batch_id_value'}, # Pass any necessary configuration - dag=controller_dag, - ) - trigger_operators[sub_dag_id] = trigger_operator - -# Set up task dependencies -trigger_operators['permitting_pipeline_etl_batch_id_creation'] >> trigger_operators['permitting_pipeline_fta'] >> trigger_operators['permitting_pipeline_rrs'] >> trigger_operators['permitting_pipeline_ats'] >> trigger_operators['permitting_pipeline_etl_batch_id_update'] - -##trigger_operators['permitting_pipeline_etl_batch_id_creation'] >> trigger_operators['permitting_pipeline_fta'] -##trigger_operators['permitting_pipeline_fta'] >> trigger_operators['permitting_pipeline_rrs'] -##trigger_operators['permitting_pipeline_rrs'] >> trigger_operators['permitting_pipeline_ats'] -##trigger_operators['permitting_pipeline_ats'] >> trigger_operators['permitting_pipeline_etl_batch_id_update'] diff --git a/dags/permitting_pipeline_etl_batch_id_creation.py b/dags/permitting_pipeline_etl_batch_id_creation.py deleted file mode 100644 index 80b2cf5..0000000 --- a/dags/permitting_pipeline_etl_batch_id_creation.py +++ /dev/null @@ -1,94 +0,0 @@ -from airflow import DAG -from airflow.operators.postgres_operator import PostgresOperator -from airflow.operators.python_operator import PythonOperator, BranchPythonOperator -from datetime import datetime, timedelta -from airflow.hooks.postgres_hook import PostgresHook -from datetime import datetime - -# Define default_args dictionary -default_args = { - 'owner': 'airflow', - 'start_date': datetime(2023, 11, 23), - 'retries': 1, - 'retry_delay': timedelta(minutes=5), -} - -# Define the DAG -dag = DAG( - 'permitting_pipeline_etl_batch_id_creation', - default_args=default_args, - description='DAG to insert a etl_batch_id record into a ODS table', - schedule_interval=None, # Set your desired schedule_interval - catchup=False, # Set to False to skip historical runs -) - -current_date = datetime.now().strftime('%Y-%m-%d') -current_datetime = datetime.now() - -def check_record_existence(): - - # PostgreSQL connection ID configured in Airflow - postgres_conn_id = 'postgres_ods_conn' - - # SQL query to check if the record with the given batch_id exists - select_query = f"""SELECT COUNT(*) FROM ods_data_management.audit_batch_id WHERE etl_batch_id = '{current_date}';""" - - try: - # Use PostgresHook to connect to PostgreSQL - postgres_hook = PostgresHook(postgres_conn_id) - result = postgres_hook.get_first(select_query) - - # If the count is greater than 0, the record exists - record_exists = result[0] > 0 - - return 'skip_insert' if record_exists else 'insert_into_postgres' - - except Exception as e: - print(f"Error connecting to PostgreSQL: {e}") - return 'insert_into_postgres' # Assume insertion in case of an error - - -# Define the SQL statement for insertion -insert_sql = f""" - INSERT INTO ods_data_management.audit_batch_id (etl_batch_id, etl_batch_name, etl_batch_status,etl_batch_start_time,etl_batch_end_time) - VALUES ('{current_date}', 'permitting_data_pipeline', 'started','{current_datetime}',null); -""" - -# Define the task to check record existence -check_existence_task = PythonOperator( - task_id='check_record_existence', - python_callable=check_record_existence, - provide_context=True, - dag=dag, -) - -# Define the branching task -branch_task = BranchPythonOperator( - task_id='branch_task', - python_callable=check_record_existence, - provide_context=True, - dag=dag, -) - -# Define the task to skip the insertion -skip_insert_task = PythonOperator( - task_id='skip_insert', - python_callable=lambda **kwargs: print("Skipping insertion task."), - dag=dag, -) - -# Define the task using PostgresOperator for insertion -insert_task = PostgresOperator( - task_id='insert_into_postgres', - postgres_conn_id='postgres_ods_conn', # Specify your PostgreSQL connection ID - sql=insert_sql, - autocommit=True, - dag=dag, -) - -# Set up task dependencies -check_existence_task >> branch_task -branch_task >> [skip_insert_task, insert_task] - -if __name__ == "__main__": - dag.cli() diff --git a/dags/permitting_pipeline_etl_batch_id_update.py b/dags/permitting_pipeline_etl_batch_id_update.py deleted file mode 100644 index 36da666..0000000 --- a/dags/permitting_pipeline_etl_batch_id_update.py +++ /dev/null @@ -1,43 +0,0 @@ -from airflow import DAG -from airflow.operators.postgres_operator import PostgresOperator -from airflow.utils.dates import datetime, timedelta - -# Define default_args dictionary -default_args = { - 'owner': 'airflow', - 'start_date': datetime(2023, 11, 27), - 'retries': 1, - 'retry_delay': timedelta(minutes=5), -} - -# Define the DAG -dag = DAG( - 'permitting_pipeline_etl_batch_id_update', - default_args=default_args, - description='DAG to update etl_batch_id in PostgreSQL', - schedule_interval=None, # Set your desired schedule_interval - catchup=False, # Set to False to skip historical runs -) - -# Define the SQL query -update_sql_query = """ - UPDATE ods_data_management.audit_batch_id - SET etl_batch_status='success', etl_batch_end_time=current_timestamp - WHERE etl_batch_id=( - SELECT MAX(etl_batch_id) - FROM ods_data_management.audit_batch_id - WHERE etl_batch_status='started' AND etl_batch_name='permitting_data_pipeline' - ); -""" - -# Define the task using PostgresOperator for the update query -update_task = PostgresOperator( - task_id='update_etl_batch_id', - postgres_conn_id='postgres_ods_conn', # Specify your PostgreSQL connection ID - sql=update_sql_query, - autocommit=True, - dag=dag, -) - -if __name__ == "__main__": - dag.cli() diff --git a/dags/permitting_pipeline_fta.py b/dags/permitting_pipeline_fta.py deleted file mode 100644 index 11358c8..0000000 --- a/dags/permitting_pipeline_fta.py +++ /dev/null @@ -1,34 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from kubernetes import client -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.secret import Secret - -ods_secrets = Secret("env", None, "ods-database") -fta_secrets = Secret("env", None, "fta-database") - -with DAG( - start_date=datetime(2023, 11, 23), - catchup=False, - schedule=None, - dag_id="permitting_pipeline_fta", -) as dag: - run_fta_replication = KubernetesPodOperator( - task_id="run_fta_replication", - image="ghcr.io/bcgov/nr-permitting-pipelines:main", - # image="image-registry.openshift-image-registry.svc:5000/a1b9b0-dev/data-replication-parametrized-audit1@sha256:8c51ee820434e4f5d06a91deda645bcd0a943b8c87bc3c8a8e67dead1c18a786", - in_cluster=True, - namespace="a1b9b0-dev", - service_account_name="airflow-admin", - name="run_fta_replication", - random_name_suffix=True, - labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "test-release-af"}, # network policies - reattach_on_restart=True, - is_delete_operator_pod=True, - get_logs=True, - log_events_on_failure=True, - secrets=[fta_secrets, ods_secrets], - container_resources= client.V1ResourceRequirements( - requests={"cpu": "10m", "memory": "256Mi"}, - limits={"cpu": "50m", "memory": "500Mi"}) - ) diff --git a/dags/permitting_pipeline_rrs.py b/dags/permitting_pipeline_rrs.py deleted file mode 100644 index 54bcf9b..0000000 --- a/dags/permitting_pipeline_rrs.py +++ /dev/null @@ -1,34 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from kubernetes import client -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.secret import Secret - -ods_secrets = Secret("env", None, "ods-database") -rrs_secrets = Secret("env", None, "rrs-database1") # Temporary - -with DAG( - start_date=datetime(2023, 11, 23), - catchup=False, - schedule=None, - dag_id="permitting_pipeline_rrs", -) as dag: - run_rrs_replication = KubernetesPodOperator( - task_id="run_rrs_replication", - image="ghcr.io/bcgov/nr-permitting-pipelines:main", - # image="image-registry.openshift-image-registry.svc:5000/a1b9b0-dev/data-replication-parametrized-audit1@sha256:8c51ee820434e4f5d06a91deda645bcd0a943b8c87bc3c8a8e67dead1c18a786", - in_cluster=True, - namespace="a1b9b0-dev", - service_account_name="airflow-admin", - name="run_rrs_replication", - random_name_suffix=True, - labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "test-release-af"}, # network policies - reattach_on_restart=True, - is_delete_operator_pod=True, - get_logs=True, - log_events_on_failure=True, - secrets=[rrs_secrets, ods_secrets], - container_resources= client.V1ResourceRequirements( - requests={"cpu": "10m", "memory": "256Mi"}, - limits={"cpu": "50m", "memory": "500Mi"}) - ) diff --git a/dags/permitting_rrs_rp.py b/dags/permitting_rrs_rp.py new file mode 100644 index 0000000..ddc16e1 --- /dev/null +++ b/dags/permitting_rrs_rp.py @@ -0,0 +1,32 @@ + +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret + +LOB = 'rrs-rp' # ats, fta, rrs, or lexis + +ods_secrets = Secret("env", None, "ods-database") +lob_secrets = Secret("env", None, f"{LOB}-database") + +with DAG( + start_date=datetime(2023, 11, 23), + catchup=False, + schedule='15 6 * * *', + dag_id=f"permitting-pipeline-{LOB}", +) as dag: + run_lexis_replication = KubernetesPodOperator( + task_id="run_replication", + image="ghcr.io/bcgov/nr-permitting-pipelines:main", + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"run_{LOB}_replication", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=False, + secrets=[lob_secrets, ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file diff --git a/dags/permitting_rrs_rup.py b/dags/permitting_rrs_rup.py new file mode 100644 index 0000000..1453553 --- /dev/null +++ b/dags/permitting_rrs_rup.py @@ -0,0 +1,32 @@ + +from airflow import DAG +from pendulum import datetime +from kubernetes import client +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from airflow.providers.cncf.kubernetes.secret import Secret + +LOB = 'rrs-rup' + +ods_secrets = Secret("env", None, "ods-database") +lob_secrets = Secret("env", None, f"{LOB}-database") + +with DAG( + start_date=datetime(2023, 11, 23), + catchup=False, + schedule='20 6 * * *', + dag_id=f"permitting-pipeline-{LOB}", +) as dag: + run_lexis_replication = KubernetesPodOperator( + task_id="run_replication", + image="ghcr.io/bcgov/nr-permitting-pipelines:main", + image_pull_policy="Always", + in_cluster=True, + service_account_name="airflow-admin", + name=f"run_{LOB}_replication", + labels={"DataClass": "Medium", "ConnectionType": "database", "Release": "airflow"}, + is_delete_operator_pod=False, + secrets=[lob_secrets, ods_secrets], + container_resources= client.V1ResourceRequirements( + requests={"cpu": "50m", "memory": "512Mi"}, + limits={"cpu": "100m", "memory": "1024Mi"}) + ) \ No newline at end of file diff --git a/dags/postgres_example_insert.py b/dags/postgres_example_insert.py deleted file mode 100644 index e2d86ef..0000000 --- a/dags/postgres_example_insert.py +++ /dev/null @@ -1,20 +0,0 @@ -from airflow import DAG -from airflow.operators.postgres_operator import PostgresOperator -from airflow.utils.dates import datetime -from airflow.operators.python_operator import PythonOperator - -default_args = { - 'owner': 'airflow', - 'start_date': datetime(2023, 8, 23), - # Add other necessary default arguments -} - -with DAG('postgres_example_insert', default_args=default_args, schedule_interval=None) as dag: - t1 = PostgresOperator( - task_id='execute_sql', - sql="INSERT INTO public.tenure_application_state_code SELECT tenure_application_state_code, description,effective_date,expiry_date,update_timestamp FROM fta_replication.tenure_application_state_code", - postgres_conn_id="postgres_ods_conn", # Update connection ID - autocommit=True, - ) - - t1 diff --git a/dags/postgres_example_print.py b/dags/postgres_example_print.py deleted file mode 100644 index 2435196..0000000 --- a/dags/postgres_example_print.py +++ /dev/null @@ -1,33 +0,0 @@ -from airflow import DAG -from airflow.operators.postgres_operator import PostgresOperator -from airflow.utils.dates import datetime -from airflow.operators.python_operator import PythonOperator - -default_args = { - 'owner': 'airflow', - 'start_date': datetime(2023, 8, 23), - # Add other necessary default arguments -} - -with DAG('postgres_example_print', default_args=default_args, schedule_interval=None) as dag: - t1 = PostgresOperator( - task_id='execute_sql', - sql="SELECT count(*) FROM fta_replication.tenure_application_state_code", - postgres_conn_id="postgres_ods_conn", # Update connection ID - autocommit=True, - ) - - def print_result(**context): - ti = context['task_instance'] - result = ti.xcom_pull(task_ids='execute_sql') - print("Result of SQL query:") - for row in result: - print(row) - - t2 = PythonOperator( - task_id='print_result', - python_callable=print_result, - provide_context=True, - ) - - t1 >> t2 diff --git a/dags/vault_example.py b/dags/vault_example.py deleted file mode 100644 index 1edc454..0000000 --- a/dags/vault_example.py +++ /dev/null @@ -1,31 +0,0 @@ -from airflow import DAG -from pendulum import datetime -from kubernetes import client -from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator -from airflow.providers.cncf.kubernetes.secret import Secret - -vault_jwt = Secret("env", None, "nr-vault-jwt") - -with DAG( - start_date=datetime(2023, 11, 23), - catchup=False, - schedule=None, - dag_id="vault_example", -) as dag: - vault_action = KubernetesPodOperator( - task_id="get_ods_host", - image="ghcr.io/bcgov/nr-vault-patterns:main", - in_cluster=True, - namespace="a1b9b0-dev", - name="get_ods_host", - random_name_suffix=True, - labels={"DataClass": "High", "Release": "test-release-af"}, # network policies - reattach_on_restart=True, - is_delete_operator_pod=True, - get_logs=True, - log_events_on_failure=True, - secrets=[vault_jwt], - container_resources= client.V1ResourceRequirements( - requests={"cpu": "10m", "memory": "256Mi"}, - limits={"cpu": "50m", "memory": "500Mi"}) - ) diff --git a/oc/add-label-airflow.yaml b/oc/add-label-airflow.yaml new file mode 100644 index 0000000..6fdf6f8 --- /dev/null +++ b/oc/add-label-airflow.yaml @@ -0,0 +1,23 @@ +apiVersion: kyverno.io/v1 +kind: Policy +metadata: + name: add-airflow-labels +spec: + background: false + failurePolicy: Ignore + rules: + - match: + any: + - resources: + kinds: + - Pod + selector: + matchLabels: + app: airflow + mutate: + patchStrategicMerge: + metadata: + labels: + DataClass: Medium + name: add-data-class-label + validationFailureAction: audit \ No newline at end of file diff --git a/oc/airflow-admin-role.yaml b/oc/airflow-admin-role.yaml index 9c4df36..c56f9d2 100644 --- a/oc/airflow-admin-role.yaml +++ b/oc/airflow-admin-role.yaml @@ -2,7 +2,6 @@ kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: airflow-and-airbyte-admin - namespace: a1b9b0-test rules: - verbs: - get diff --git a/oc/airflow-role-binding.yaml b/oc/airflow-role-binding.yaml index 9774885..6556b88 100644 --- a/oc/airflow-role-binding.yaml +++ b/oc/airflow-role-binding.yaml @@ -2,11 +2,10 @@ kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: airflow-admin-binding - namespace: a1b9b0-test subjects: - kind: ServiceAccount name: airflow-admin - namespace: a1b9b0-test + namespace: a1b9b0-prod roleRef: apiGroup: rbac.authorization.k8s.io kind: Role diff --git a/oc/allow-egress-airflow.yaml b/oc/allow-egress-airflow.yaml deleted file mode 100644 index d084338..0000000 --- a/oc/allow-egress-airflow.yaml +++ /dev/null @@ -1,14 +0,0 @@ -kind: NetworkPolicy -apiVersion: networking.k8s.io/v1 -metadata: - name: allow-egress-airflow - namespace: a1b9b0-dev -spec: - podSelector: - matchLabels: - Release: test-release-af - egress: - - {} - policyTypes: - - Egress -status: {} \ No newline at end of file diff --git a/oc/allow-ingress-airflow.yaml b/oc/allow-traffic-airflow.yaml similarity index 65% rename from oc/allow-ingress-airflow.yaml rename to oc/allow-traffic-airflow.yaml index e7ab655..530e7d0 100644 --- a/oc/allow-ingress-airflow.yaml +++ b/oc/allow-traffic-airflow.yaml @@ -1,14 +1,16 @@ kind: NetworkPolicy apiVersion: networking.k8s.io/v1 metadata: - name: allow-ingress-airflow - namespace: a1b9b0-dev + name: allow-traffic-airflow spec: podSelector: matchLabels: - Release: test-release-af + Release: airflow ingress: - {} + egress: + - {} policyTypes: - Ingress + - Egress status: {} \ No newline at end of file diff --git a/oc/scheduler-hpa.yaml b/oc/scheduler-hpa.yaml deleted file mode 100644 index 74c48ae..0000000 --- a/oc/scheduler-hpa.yaml +++ /dev/null @@ -1,20 +0,0 @@ -apiVersion: autoscaling/v2 -kind: HorizontalPodAutoscaler -metadata: - name: airflow-scheduler - namespace: a1b9b0-dev -spec: - scaleTargetRef: - kind: Deployment - name: dev-release-af-airflow-scheduler - apiVersion: apps/v1 - minReplicas: 1 - maxReplicas: 2 - metrics: - - type: Resource - resource: - name: memory - target: - type: Utilization - averageUtilization: 80 - diff --git a/oc/webapp-hpa.yaml b/oc/webapp-hpa.yaml index 2f886c0..9978a6e 100644 --- a/oc/webapp-hpa.yaml +++ b/oc/webapp-hpa.yaml @@ -2,11 +2,10 @@ apiVersion: autoscaling/v2 kind: HorizontalPodAutoscaler metadata: name: airflow-web - namespace: a1b9b0-dev spec: scaleTargetRef: kind: Deployment - name: dev-release-af-airflow-web + name: airflow-web apiVersion: apps/v1 minReplicas: 1 maxReplicas: 2 @@ -16,5 +15,4 @@ spec: name: memory target: type: Utilization - averageUtilization: 80 - + averageUtilization: 80 \ No newline at end of file diff --git a/oc/webapp-route.yaml b/oc/webapp-route.yaml index 2622697..ee427a2 100644 --- a/oc/webapp-route.yaml +++ b/oc/webapp-route.yaml @@ -2,17 +2,16 @@ kind: Route apiVersion: route.openshift.io/v1 metadata: name: route-for-airflow-web - namespace: a1b9b0-dev labels: - DataClass: Public + DataClass: Low spec: - host: nr-airflow-test.apps.emerald.devops.gov.bc.ca + host: nr-airflow.apps.emerald.devops.gov.bc.ca to: kind: Service - name: test-release-af-airflow + name: airflow weight: 100 port: targetPort: http tls: termination: edge - wildcardPolicy: None \ No newline at end of file + wildcardPolicy: None From 47d445e5c4f69bad5a58c145768d8ed95a95e212 Mon Sep 17 00:00:00 2001 From: abimichel Date: Wed, 27 Mar 2024 14:22:41 -0700 Subject: [PATCH 09/11] fix typo --- dags/permitting_ats.py | 2 +- dags/permitting_fta.py | 2 +- dags/permitting_fta_replication.py | 2 +- dags/permitting_lexis.py | 2 +- dags/permitting_rrs_rp.py | 2 +- dags/permitting_rrs_rup.py | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dags/permitting_ats.py b/dags/permitting_ats.py index a962ff7..fcb320d 100644 --- a/dags/permitting_ats.py +++ b/dags/permitting_ats.py @@ -16,7 +16,7 @@ schedule='0 6 * * *', dag_id=f"permitting-pipeline-{LOB}", ) as dag: - run_lexis_replication = KubernetesPodOperator( + run_replication = KubernetesPodOperator( task_id="run_replication", image="ghcr.io/bcgov/nr-permitting-pipelines:main", image_pull_policy="Always", diff --git a/dags/permitting_fta.py b/dags/permitting_fta.py index cbb33cc..a3b42cc 100644 --- a/dags/permitting_fta.py +++ b/dags/permitting_fta.py @@ -16,7 +16,7 @@ schedule='5 6 * * *', dag_id=f"permitting-pipeline-{LOB}", ) as dag: - run_lexis_replication = KubernetesPodOperator( + run_replication = KubernetesPodOperator( task_id="run_replication", image="ghcr.io/bcgov/nr-permitting-pipelines:main", image_pull_policy="Always", diff --git a/dags/permitting_fta_replication.py b/dags/permitting_fta_replication.py index d0a019c..22698af 100644 --- a/dags/permitting_fta_replication.py +++ b/dags/permitting_fta_replication.py @@ -16,7 +16,7 @@ schedule='25 6 * * *', dag_id=f"permitting-pipeline-{LOB}-replication", ) as dag: - run_lexis_replication = KubernetesPodOperator( + run_replication = KubernetesPodOperator( task_id="run_replication", image="ghcr.io/bcgov/nr-permitting-pipelines:main", image_pull_policy="Always", diff --git a/dags/permitting_lexis.py b/dags/permitting_lexis.py index 6c07eaa..ed31893 100644 --- a/dags/permitting_lexis.py +++ b/dags/permitting_lexis.py @@ -15,7 +15,7 @@ schedule='10 6 * * *', dag_id=f"permitting-pipeline-{LOB}", ) as dag: - run_lexis_replication = KubernetesPodOperator( + run_replication = KubernetesPodOperator( task_id="run_replication", image="ghcr.io/bcgov/nr-permitting-pipelines:main", image_pull_policy="Always", diff --git a/dags/permitting_rrs_rp.py b/dags/permitting_rrs_rp.py index ddc16e1..be01b8f 100644 --- a/dags/permitting_rrs_rp.py +++ b/dags/permitting_rrs_rp.py @@ -16,7 +16,7 @@ schedule='15 6 * * *', dag_id=f"permitting-pipeline-{LOB}", ) as dag: - run_lexis_replication = KubernetesPodOperator( + run_replication = KubernetesPodOperator( task_id="run_replication", image="ghcr.io/bcgov/nr-permitting-pipelines:main", image_pull_policy="Always", diff --git a/dags/permitting_rrs_rup.py b/dags/permitting_rrs_rup.py index 1453553..02cb93b 100644 --- a/dags/permitting_rrs_rup.py +++ b/dags/permitting_rrs_rup.py @@ -16,7 +16,7 @@ schedule='20 6 * * *', dag_id=f"permitting-pipeline-{LOB}", ) as dag: - run_lexis_replication = KubernetesPodOperator( + run_replication = KubernetesPodOperator( task_id="run_replication", image="ghcr.io/bcgov/nr-permitting-pipelines:main", image_pull_policy="Always", From aa9f797a37b885c2aa4183674a32fbe582419bb7 Mon Sep 17 00:00:00 2001 From: abimichel Date: Wed, 27 Mar 2024 16:53:28 -0700 Subject: [PATCH 10/11] clean up --- README.md | 2 -- oc/{add-label-airflow.yaml => auto-label-airflow.yaml} | 0 2 files changed, 2 deletions(-) rename oc/{add-label-airflow.yaml => auto-label-airflow.yaml} (100%) diff --git a/README.md b/README.md index 7d2923f..d9f2ce0 100644 --- a/README.md +++ b/README.md @@ -26,5 +26,3 @@ http://nr-airflow.apps.emerald.devops.gov.bc.ca/ ```sh helm upgrade -f values.yaml airflow . ``` - -More info: https://apps.nrs.gov.bc.ca/int/confluence/x/zQ09Cg diff --git a/oc/add-label-airflow.yaml b/oc/auto-label-airflow.yaml similarity index 100% rename from oc/add-label-airflow.yaml rename to oc/auto-label-airflow.yaml From 3a3cafa4c986aad85f8fa5b510dc547f2baad6fe Mon Sep 17 00:00:00 2001 From: abimichel Date: Tue, 9 Apr 2024 14:43:09 -0700 Subject: [PATCH 11/11] egress policy for prod db ports --- oc/allow-traffic-airflow.yaml | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/oc/allow-traffic-airflow.yaml b/oc/allow-traffic-airflow.yaml index 530e7d0..da9f93d 100644 --- a/oc/allow-traffic-airflow.yaml +++ b/oc/allow-traffic-airflow.yaml @@ -1,5 +1,24 @@ kind: NetworkPolicy apiVersion: networking.k8s.io/v1 +metadata: + name: allow-egress-zone-b +spec: + podSelector: + matchLabels: + ConnectionType: database + egress: + - ports: + - protocol: TCP + port: 1521 + - protocol: TCP + port: 5435 + - protocol: TCP + port: 5436 + policyTypes: + - Egress +--- +kind: NetworkPolicy +apiVersion: networking.k8s.io/v1 metadata: name: allow-traffic-airflow spec: @@ -13,4 +32,3 @@ spec: policyTypes: - Ingress - Egress -status: {} \ No newline at end of file