Skip to content

Commit

Permalink
#1070 use pre_execute instead (base operators)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielwol committed Sep 10, 2024
1 parent c3f1352 commit 7bda3db
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 20 deletions.
16 changes: 7 additions & 9 deletions dags/common_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from airflow.decorators import task
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.sensors.base import PokeReturnValue
from airflow.exceptions import AirflowFailException
from airflow.exceptions import AirflowFailException, AirflowSkipException
from airflow.models import Variable
from airflow.sensors.time_sensor import TimeSensor

Expand Down Expand Up @@ -130,21 +130,19 @@ def copy_table(conn_id:str, table:Tuple[str, str], **context) -> None:

LOGGER.info(f"Successfully copied {table[0]} to {table[1]}.")

@task.short_circuit(ignore_downstream_trigger_rules=False, retries=0) #only skip immediately downstream task
def check_jan_1st(ds=None): #check if Jan 1 to trigger partition creates.
def check_jan_1st(context): #check if Jan 1 to trigger partition creates.
from datetime import datetime
start_date = datetime.strptime(ds, '%Y-%m-%d')
start_date = datetime.strptime(context["ds"], '%Y-%m-%d')
if start_date.month == 1 and start_date.day == 1:
return True
return False
raise AirflowSkipException('Not Jan 1st; skipping partition creates.')

@task.short_circuit(ignore_downstream_trigger_rules=False, retries=0) #only skip immediately downstream task
def check_1st_of_month(ds=None): #check if 1st of Month to trigger partition creates.
def check_1st_of_month(context): #check if 1st of Month to trigger partition creates.
from datetime import datetime
start_date = datetime.strptime(ds, '%Y-%m-%d')
start_date = datetime.strptime(context["ds"], '%Y-%m-%d')
if start_date.day == 1:
return True
return False
raise AirflowSkipException('Not 1st of month; skipping partition creates.')

@task.short_circuit(ignore_downstream_trigger_rules=False, retries=0) #only skip immediately downstream task
def check_if_dow(isodow, ds):
Expand Down
3 changes: 2 additions & 1 deletion dags/ecocounter_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def check_partitions():

create_annual_partition = PostgresOperator(
task_id='create_annual_partitions',
pre_execute=check_jan_1st,
sql="""SELECT ecocounter.create_yyyy_counts_unfiltered_partition(
base_table := 'counts_unfiltered',
year_ := '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int
Expand All @@ -83,7 +84,7 @@ def check_partitions():
autocommit=True
)

check_jan_1st.override(task_id="check_annual_partition")() >> create_annual_partition
create_annual_partition

def get_connections():
api_conn = BaseHook.get_connection('ecocounter_api_key')
Expand Down
6 changes: 3 additions & 3 deletions dags/miovision_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def check_partitions():

create_annual_partition = PostgresOperator(
task_id='create_annual_partitions',
pre_execute=check_jan_1st,
sql=["SELECT miovision_api.create_yyyy_volumes_partition('volumes', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, 'datetime_bin')",
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)",
"SELECT miovision_api.create_yyyy_volumes_15min_partition('volumes_15min_mvt', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)"],
Expand All @@ -86,15 +87,14 @@ def check_partitions():

create_month_partition = PostgresOperator(
task_id='create_month_partition',
pre_execute=check_1st_of_month,
sql="""SELECT miovision_api.create_mm_nested_volumes_partitions('volumes'::text, '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, '{{ macros.ds_format(ds, '%Y-%m-%d', '%m') }}'::int)""",
postgres_conn_id='miovision_api_bot',
autocommit=True,
trigger_rule='none_failed_min_one_success'
)

check_jan_1st.override(task_id="check_annual_partition")() >> create_annual_partition >> (
check_1st_of_month.override(task_id="check_month_partition")() >> create_month_partition
)
create_annual_partition >> create_month_partition

@task(trigger_rule='none_failed', retries = 1)
def pull_miovision(ds = None, **context):
Expand Down
4 changes: 2 additions & 2 deletions dags/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ Contains common Airflow Task definitions which can be used in multiple DAGs.
- `wait_for_external_trigger`: Reusable sensor to wait for external DAG trigger.
- `get_variable`: Task to access an airflow variable and return value as XCOM.
- `copy_table`: A task to copy a postgres table contents (and comment) from one location to another within the same database.
- `check_jan_1st`, `check_1st_of_month`: `short_circuit` operators which can be used to have downstream tasks occur only on Jan 1 / 1st of each month.
- `check_if_dow`: A `short_circuit` operator which checks a date against a day of week and short circuits (skips downstream tasks) if not.
- `check_jan_1st`, `check_1st_of_month`: Can be used in as `pre_execute` parameter in base operators to have downstream tasks occur only on Jan 1 / 1st of each month. Remember to set downstream tasks to `trigger_rule='none_failed'`.
- `check_if_dow`: A `short_circuit` operator which checks a date against a day of week and short circuits (skips downstream tasks) if not. Not currently in use, may be better written as a @task.run_if decorator.
- `wait_for_weather_timesensor`: returns a `TimeSensor` Airflow operator which can be used to delay data checks until the time of day when historical weather is avaialble.

### [**dag_functions.py**](dag_functions.py)
Expand Down
3 changes: 2 additions & 1 deletion dags/vds_pull_vdsdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def check_partitions():

create_partitions = PostgresOperator(
task_id='create_partitions',
pre_execute=check_jan_1st,
sql=[#partition by year and month:
"SELECT vds.partition_vds_yyyymm('raw_vdsdata_div8001'::text, '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, 'dt'::text)",
"SELECT vds.partition_vds_yyyymm('raw_vdsdata_div2'::text, '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, 'dt'::text)",
Expand All @@ -107,7 +108,7 @@ def check_partitions():
)

#check if Jan 1, if so trigger partition creates.
check_jan_1st.override(task_id="check_partitions")() >> create_partitions
create_partitions

@task_group
def pull_vdsdata():
Expand Down
3 changes: 2 additions & 1 deletion dags/vds_pull_vdsvehicledata.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,14 @@ def check_partitions_TG():

create_partitions = PostgresOperator(
task_id='create_partitions',
pre_execute=check_jan_1st,
sql="SELECT vds.partition_vds_yyyymm('raw_vdsvehicledata', '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, 'dt')",
postgres_conn_id='vds_bot',
autocommit=True
)

#check if Jan 1, if so trigger partition creates.
check_jan_1st.override(task_id="check_partitions")() >> create_partitions
create_partitions

#this task group deletes any existing data from `vds.raw_vdsvehicledata` and then pulls and inserts from ITSC into RDS
@task_group
Expand Down
6 changes: 3 additions & 3 deletions dags/wys_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,22 @@ def check_partitions():

create_annual_partition = PostgresOperator(
task_id='create_annual_partitions',
pre_execute=check_jan_1st,
sql="SELECT wys.create_yyyy_raw_data_partition('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int)",
postgres_conn_id='wys_bot',
autocommit=True
)

create_month_partition = PostgresOperator(
task_id='create_month_partition',
pre_execute=check_1st_of_month,
trigger_rule='none_failed_min_one_success',
sql="SELECT wys.create_mm_nested_raw_data_partitions('{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}'::int, '{{ macros.ds_format(ds, '%Y-%m-%d', '%m') }}'::int)",
postgres_conn_id='wys_bot',
autocommit=True
)

check_jan_1st() >> create_annual_partition >> (
check_1st_of_month() >> create_month_partition
)
create_annual_partition >> create_month_partition

@task_group()
def api_pull():
Expand Down

0 comments on commit 7bda3db

Please sign in to comment.