Skip to content

Commit

Permalink
Remove TaskFail duplicates check (apache#26714)
Browse files Browse the repository at this point in the history
This check was added in error.  The duplicates are supposed to be there.  The task duration
and gantt views process TaskFail records with the apparent intention of including durations
from non-final failed attempts to the overall duration of the run.

(cherry picked from commit ee21c1b)
  • Loading branch information
dstandish authored and jedcunningham committed Oct 4, 2022
1 parent 6831e78 commit b94db52
Showing 1 changed file with 19 additions and 18 deletions.
37 changes: 19 additions & 18 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -926,29 +926,31 @@ def reflect_tables(tables: Optional[List[Union[Base, str]]], session):
return metadata


def check_task_fail_for_duplicates(session):
"""Check that there are no duplicates in the task_fail table before creating FK"""
metadata = reflect_tables([TaskFail], session)
task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore
if task_fail is None: # table not there
return
if "run_id" in task_fail.columns: # upgrade already applied
return
yield from check_table_for_duplicates(
table_name=task_fail.name,
uniqueness=['dag_id', 'task_id', 'execution_date'],
session=session,
version='2.3',
)


def check_table_for_duplicates(
*, session: Session, table_name: str, uniqueness: List[str], version: str
) -> Iterable[str]:
"""
Check table for duplicates, given a list of columns which define the uniqueness of the table.
Call from ``run_duplicates_checks``.
Usage example:
.. code-block:: python
def check_task_fail_for_duplicates(session):
from airflow.models.taskfail import TaskFail
metadata = reflect_tables([TaskFail], session)
task_fail = metadata.tables.get(TaskFail.__tablename__) # type: ignore
if task_fail is None: # table not there
return
if "run_id" in task_fail.columns: # upgrade already applied
return
yield from check_table_for_duplicates(
table_name=task_fail.name,
uniqueness=['dag_id', 'task_id', 'execution_date'],
session=session,
version='2.3',
)
:param table_name: table name to check
:param uniqueness: uniqueness constraint to evaluate against
Expand Down Expand Up @@ -1393,7 +1395,6 @@ def _check_migration_errors(session: Session = NEW_SESSION) -> Iterable[str]:
:rtype: list[str]
"""
check_functions: Tuple[Callable[..., Iterable[str]], ...] = (
check_task_fail_for_duplicates,
check_conn_id_duplicates,
check_conn_type_null,
check_run_id_null,
Expand Down

0 comments on commit b94db52

Please sign in to comment.