Skip to content

Commit

Permalink
Replace usage of DummyOperator with EmptyOperator (apache#22974)
Browse files Browse the repository at this point in the history
* Replace usage of `DummyOperator` with `EmptyOperator`
  • Loading branch information
eladkal authored Apr 13, 2022
1 parent d0a880d commit 49e336a
Show file tree
Hide file tree
Showing 96 changed files with 863 additions and 827 deletions.
4 changes: 2 additions & 2 deletions airflow/example_dags/example_bash_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator

with DAG(
dag_id='example_bash_operator',
Expand All @@ -35,7 +35,7 @@
tags=['example', 'example2'],
params={"example_key": "example_value"},
) as dag:
run_this_last = DummyOperator(
run_this_last = EmptyOperator(
task_id='run_this_last',
)

Expand Down
18 changes: 9 additions & 9 deletions airflow/example_dags/example_branch_datetime_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from airflow import DAG
from airflow.operators.datetime import BranchDateTimeOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator

dag = DAG(
dag_id="example_branch_datetime_operator",
Expand All @@ -35,8 +35,8 @@
)

# [START howto_branch_datetime_operator]
dummy_task_1 = DummyOperator(task_id='date_in_range', dag=dag)
dummy_task_2 = DummyOperator(task_id='date_outside_range', dag=dag)
empty_task_1 = EmptyOperator(task_id='date_in_range', dag=dag)
empty_task_2 = EmptyOperator(task_id='date_outside_range', dag=dag)

cond1 = BranchDateTimeOperator(
task_id='datetime_branch',
Expand All @@ -47,8 +47,8 @@
dag=dag,
)

# Run dummy_task_1 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [dummy_task_1, dummy_task_2]
# Run empty_task_1 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [empty_task_1, empty_task_2]
# [END howto_branch_datetime_operator]


Expand All @@ -60,8 +60,8 @@
schedule_interval="@daily",
)
# [START howto_branch_datetime_operator_next_day]
dummy_task_1 = DummyOperator(task_id='date_in_range', dag=dag)
dummy_task_2 = DummyOperator(task_id='date_outside_range', dag=dag)
empty_task_1 = EmptyOperator(task_id='date_in_range', dag=dag)
empty_task_2 = EmptyOperator(task_id='date_outside_range', dag=dag)

cond2 = BranchDateTimeOperator(
task_id='datetime_branch',
Expand All @@ -73,6 +73,6 @@
)

# Since target_lower happens after target_upper, target_upper will be moved to the following day
# Run dummy_task_1 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [dummy_task_1, dummy_task_2]
# Run empty_task_1 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [empty_task_1, empty_task_2]
# [END howto_branch_datetime_operator_next_day]
10 changes: 5 additions & 5 deletions airflow/example_dags/example_branch_day_of_week_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.weekday import BranchDayOfWeekOperator

with DAG(
Expand All @@ -33,8 +33,8 @@
schedule_interval="@daily",
) as dag:
# [START howto_operator_day_of_week_branch]
dummy_task_1 = DummyOperator(task_id='branch_true', dag=dag)
dummy_task_2 = DummyOperator(task_id='branch_false', dag=dag)
empty_task_1 = EmptyOperator(task_id='branch_true', dag=dag)
empty_task_2 = EmptyOperator(task_id='branch_false', dag=dag)

branch = BranchDayOfWeekOperator(
task_id="make_choice",
Expand All @@ -43,6 +43,6 @@
week_day="Monday",
)

# Run dummy_task_1 if branch executes on Monday
branch >> [dummy_task_1, dummy_task_2]
# Run empty_task_1 if branch executes on Monday
branch >> [empty_task_1, empty_task_2]
# [END howto_operator_day_of_week_branch]
16 changes: 8 additions & 8 deletions airflow/example_dags/example_branch_labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.edgemodifier import Label

with DAG(
Expand All @@ -31,13 +31,13 @@
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
) as dag:
ingest = DummyOperator(task_id="ingest")
analyse = DummyOperator(task_id="analyze")
check = DummyOperator(task_id="check_integrity")
describe = DummyOperator(task_id="describe_integrity")
error = DummyOperator(task_id="email_error")
save = DummyOperator(task_id="save")
report = DummyOperator(task_id="report")
ingest = EmptyOperator(task_id="ingest")
analyse = EmptyOperator(task_id="analyze")
check = EmptyOperator(task_id="check_integrity")
describe = EmptyOperator(task_id="describe_integrity")
error = EmptyOperator(task_id="email_error")
save = EmptyOperator(task_id="save")
report = EmptyOperator(task_id="report")

ingest >> analyse >> check
check >> Label("No errors") >> save >> report
Expand Down
12 changes: 6 additions & 6 deletions airflow/example_dags/example_branch_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule
Expand All @@ -35,7 +35,7 @@
schedule_interval="@daily",
tags=['example', 'example2'],
) as dag:
run_this_first = DummyOperator(
run_this_first = EmptyOperator(
task_id='run_this_first',
)

Expand All @@ -47,19 +47,19 @@
)
run_this_first >> branching

join = DummyOperator(
join = EmptyOperator(
task_id='join',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

for option in options:
t = DummyOperator(
t = EmptyOperator(
task_id=option,
)

dummy_follow = DummyOperator(
empty_follow = EmptyOperator(
task_id='follow_' + option,
)

# Label is optional here, but it can help identify more complex branches
branching >> Label(option) >> t >> dummy_follow >> join
branching >> Label(option) >> t >> empty_follow >> join
12 changes: 6 additions & 6 deletions airflow/example_dags/example_branch_operator_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from airflow import DAG
from airflow.decorators import task
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.utils.edgemodifier import Label
from airflow.utils.trigger_rule import TriggerRule

Expand All @@ -34,7 +34,7 @@
schedule_interval="@daily",
tags=['example', 'example2'],
) as dag:
run_this_first = DummyOperator(
run_this_first = EmptyOperator(
task_id='run_this_first',
)

Expand All @@ -48,19 +48,19 @@ def random_choice():

run_this_first >> random_choice_instance

join = DummyOperator(
join = EmptyOperator(
task_id='join',
trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
)

for option in options:
t = DummyOperator(
t = EmptyOperator(
task_id=option,
)

dummy_follow = DummyOperator(
empty_follow = EmptyOperator(
task_id='follow_' + option,
)

# Label is optional here, but it can help identify more complex branches
random_choice_instance >> Label(option) >> t >> dummy_follow >> join
random_choice_instance >> Label(option) >> t >> empty_follow >> join
14 changes: 7 additions & 7 deletions airflow/example_dags/example_branch_python_dop_operator_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator


def should_run(**kwargs):
"""
Determine which dummy_task should be run based on if the execution date minute is even or odd.
Determine which empty_task should be run based on if the execution date minute is even or odd.
:param dict kwargs: Context
:return: Id of the task to run
Expand All @@ -41,9 +41,9 @@ def should_run(**kwargs):
)
)
if kwargs['execution_date'].minute % 2 == 0:
return "dummy_task_1"
return "empty_task_1"
else:
return "dummy_task_2"
return "empty_task_2"


with DAG(
Expand All @@ -59,6 +59,6 @@ def should_run(**kwargs):
python_callable=should_run,
)

dummy_task_1 = DummyOperator(task_id='dummy_task_1')
dummy_task_2 = DummyOperator(task_id='dummy_task_2')
cond >> [dummy_task_1, dummy_task_2]
empty_task_1 = EmptyOperator(task_id='empty_task_1')
empty_task_2 = EmptyOperator(task_id='empty_task_2')
cond >> [empty_task_1, empty_task_2]
4 changes: 2 additions & 2 deletions airflow/example_dags/example_external_task_marker_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor

start_date = pendulum.datetime(2021, 1, 1, tz="UTC")
Expand Down Expand Up @@ -78,5 +78,5 @@
mode="reschedule",
)
# [END howto_operator_external_task_sensor]
child_task2 = DummyOperator(task_id="child_task2")
child_task2 = EmptyOperator(task_id="child_task2")
child_task1 >> child_task2
4 changes: 2 additions & 2 deletions airflow/example_dags/example_latest_only.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import datetime as dt

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator

with DAG(
Expand All @@ -32,6 +32,6 @@
tags=['example2', 'example3'],
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = DummyOperator(task_id='task1')
task1 = EmptyOperator(task_id='task1')

latest_only >> task1
10 changes: 5 additions & 5 deletions airflow/example_dags/example_latest_only_with_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import pendulum

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.utils.trigger_rule import TriggerRule

Expand All @@ -37,10 +37,10 @@
tags=['example3'],
) as dag:
latest_only = LatestOnlyOperator(task_id='latest_only')
task1 = DummyOperator(task_id='task1')
task2 = DummyOperator(task_id='task2')
task3 = DummyOperator(task_id='task3')
task4 = DummyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)
task1 = EmptyOperator(task_id='task1')
task2 = EmptyOperator(task_id='task2')
task3 = EmptyOperator(task_id='task3')
task4 = EmptyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)

latest_only >> task1 >> [task3, task4]
task2 >> [task3, task4]
Expand Down
16 changes: 8 additions & 8 deletions airflow/example_dags/example_nested_branch_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import pendulum

from airflow.models import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule

Expand All @@ -36,14 +36,14 @@
tags=["example"],
) as dag:
branch_1 = BranchPythonOperator(task_id="branch_1", python_callable=lambda: "true_1")
join_1 = DummyOperator(task_id="join_1", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
true_1 = DummyOperator(task_id="true_1")
false_1 = DummyOperator(task_id="false_1")
join_1 = EmptyOperator(task_id="join_1", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
true_1 = EmptyOperator(task_id="true_1")
false_1 = EmptyOperator(task_id="false_1")
branch_2 = BranchPythonOperator(task_id="branch_2", python_callable=lambda: "true_2")
join_2 = DummyOperator(task_id="join_2", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
true_2 = DummyOperator(task_id="true_2")
false_2 = DummyOperator(task_id="false_2")
false_3 = DummyOperator(task_id="false_3")
join_2 = EmptyOperator(task_id="join_2", trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
true_2 = EmptyOperator(task_id="true_2")
false_2 = EmptyOperator(task_id="false_2")
false_3 = EmptyOperator(task_id="false_3")

branch_1 >> true_1 >> join_1
branch_1 >> false_1 >> branch_2 >> [true_2, false_2] >> join_2 >> false_3 >> join_1
10 changes: 5 additions & 5 deletions airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.operators.dummy import DummyOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import ShortCircuitOperator
from airflow.utils.trigger_rule import TriggerRule

Expand All @@ -42,19 +42,19 @@
python_callable=lambda: False,
)

ds_true = [DummyOperator(task_id='true_' + str(i)) for i in [1, 2]]
ds_false = [DummyOperator(task_id='false_' + str(i)) for i in [1, 2]]
ds_true = [EmptyOperator(task_id='true_' + str(i)) for i in [1, 2]]
ds_false = [EmptyOperator(task_id='false_' + str(i)) for i in [1, 2]]

chain(cond_true, *ds_true)
chain(cond_false, *ds_false)
# [END howto_operator_short_circuit]

# [START howto_operator_short_circuit_trigger_rules]
[task_1, task_2, task_3, task_4, task_5, task_6] = [
DummyOperator(task_id=f"task_{i}") for i in range(1, 7)
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = DummyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = ShortCircuitOperator(
task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
Expand Down
Loading

0 comments on commit 49e336a

Please sign in to comment.