Skip to content

Commit

Permalink
Add failing test for timedelta sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
KostyaEsmukov committed Jul 17, 2019
1 parent eb104ef commit 4c326fb
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 0 deletions.
45 changes: 45 additions & 0 deletions tests/dags/good/timedelta-sensor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#
# Copyright 2019, Rambler Digital Solutions
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#


dags:
time_delta_sensor:
args:
start_date: 2017-07-27
schedule_interval: 1d
operators:
dummy_operator:
class: airflow.operators.dummy_operator:DummyOperator
sensors:
timedelta_sensor:
class: airflow.operators.sensors:TimeDeltaSensor
args:
delta: 6h
flow:
timedelta_sensor:
- dummy_operator
do:
- sensors:
dynamic_timedelta_sensor_{{ item.name }}:
class: airflow.operators.sensors:TimeDeltaSensor
args:
delta: '{{ item.sensor_delta }}'
flow:
dynamic_timedelta_sensor_{{ item.name }}:
- dummy_operator
with_items:
- name: 1h
sensor_delta: 1h
60 changes: 60 additions & 0 deletions tests/test_timedelta_sensor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
#
# Copyright 2019, Rambler Digital Solutions
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import (
absolute_import,
division,
print_function,
unicode_literals,
)

import datetime

from airflow import DAG

import airflow_declarative


try:
# Since Airflow 1.10
from airflow.sensors.time_delta_sensor import TimeDeltaSensor
except ImportError:
from airflow.operators.sensors import TimeDeltaSensor


def test_timedelta_sensor(good_dag_path):
path = good_dag_path('timedelta-sensor')
dags = airflow_declarative.from_path(path)

assert len(dags) == 1

yml_dag = dags[0]

assert isinstance(yml_dag, DAG)

expected_tasks = {'dummy_operator', 'timedelta_sensor', 'dynamic_timedelta_sensor_1h'}
assert set(yml_dag.task_ids) == expected_tasks

timedelta_sensor = yml_dag.task_dict['timedelta_sensor']
assert isinstance(timedelta_sensor, TimeDeltaSensor)
assert isinstance(timedelta_sensor.delta, datetime.timedelta)
assert 6 == timedelta_sensor.delta.hours

dynamic_timedelta_sensor_1h = yml_dag.task_dict['dynamic_timedelta_sensor_1h']
assert isinstance(dynamic_timedelta_sensor_1h, TimeDeltaSensor)
assert isinstance(dynamic_timedelta_sensor_1h.delta, datetime.timedelta)
assert 1 == dynamic_timedelta_sensor_1h.delta.hours

0 comments on commit 4c326fb

Please sign in to comment.