Skip to content

Commit

Permalink
Added new ExternalTaskSensor operator, other minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Oct 8, 2014
1 parent 1f890d0 commit 1f6f538
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 75 deletions.
17 changes: 17 additions & 0 deletions dags/examples/test_wf_external.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from flux.operators import ExternalTaskSensor
from flux import DAG
from datetime import datetime

# Setting some default operator parameters
default_args = {
'owner': 'max',
'mysql_dbid': 'local_mysql',
}

# Initializing a directed acyclic graph
dag = DAG(dag_id='test_wf')

wf_ext = ExternalTaskSensor(
task_id='wf_ext',
external_dag_id='example_1', external_task_id='runme_0', **default_args)
dag.add_task(wf_ext)
54 changes: 36 additions & 18 deletions docs/.ipynb_checkpoints/Flux Tutorial-checkpoint.ipynb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"metadata": {
"name": "",
"signature": "sha256:acbd69c2f62fd7d821515a4f17aa5c2a5b2761a65341017fd9faed07b5fcbf75"
"signature": "sha256:9a2db68ca4fcf1753abba1d7b9af6535fdef71de15c381a458bea37e70f6a421"
},
"nbformat": 3,
"nbformat_minor": 0,
Expand Down Expand Up @@ -33,7 +33,7 @@
"cell_type": "code",
"collapsed": false,
"input": [
"from core.hooks import MySqlHook\n",
"from flux.hooks import MySqlHook\n",
"mysql_hook = MySqlHook(mysql_dbid='local_mysql')\n",
"sql = \"\"\"\n",
"SELECT table_schema, table_name \n",
Expand Down Expand Up @@ -90,7 +90,7 @@
"cell_type": "code",
"collapsed": false,
"input": [
"from core.operators import MySqlOperator\n",
"from flux.operators import MySqlOperator\n",
"from datetime import datetime, timedelta\n",
"\n",
"sql = \"\"\"\n",
Expand Down Expand Up @@ -124,8 +124,8 @@
"cell_type": "code",
"collapsed": false,
"input": [
"from core.operators import MySqlOperator\n",
"from core import DAG\n",
"from flux.operators import MySqlOperator\n",
"from flux import DAG\n",
"from datetime import datetime\n",
"\n",
"# Setting some default operator parameters\n",
Expand All @@ -151,7 +151,7 @@
"mysql_second.set_upstream(mysql_fisrt)\n",
" \n",
"dag.tree_view()\n",
"dag.run(start_date=datetime(2014, 9, 1), end_date=datetime(2014, 9, 1))\n"
"#dag.run(start_date=datetime(2014, 9, 1), end_date=datetime(2014, 9, 1))\n"
],
"language": "python",
"metadata": {},
Expand All @@ -163,20 +163,9 @@
"<Task(MySqlOperator): mysql_second>\n",
" <Task(MySqlOperator): mysql_fisrt>\n"
]
},
{
"ename": "AttributeError",
"evalue": "'DAG' object has no attribute 'dagbag'",
"output_type": "pyerr",
"traceback": [
"\u001b[1;31m---------------------------------------------------------------------------\u001b[0m\n\u001b[1;31mAttributeError\u001b[0m Traceback (most recent call last)",
"\u001b[1;32m<ipython-input-8-dc6593b5029f>\u001b[0m in \u001b[0;36m<module>\u001b[1;34m()\u001b[0m\n\u001b[0;32m 26\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 27\u001b[0m \u001b[0mdag\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mtree_view\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m---> 28\u001b[1;33m \u001b[0mdag\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mrun\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mstart_date\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mdatetime\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m2014\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m9\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m,\u001b[0m \u001b[0mend_date\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mdatetime\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;36m2014\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m9\u001b[0m\u001b[1;33m,\u001b[0m \u001b[1;36m1\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m",
"\u001b[1;32m/home/mistercrunch/Flux/core/models.pyc\u001b[0m in \u001b[0;36mrun\u001b[1;34m(self, start_date, end_date, mark_success)\u001b[0m\n\u001b[0;32m 849\u001b[0m \u001b[0msession\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0msettings\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mSession\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 850\u001b[0m \u001b[1;33m\u001b[0m\u001b[0m\n\u001b[1;32m--> 851\u001b[1;33m \u001b[0mjob\u001b[0m \u001b[1;33m=\u001b[0m \u001b[0mJob\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mexecutor\u001b[0m\u001b[1;33m=\u001b[0m\u001b[0mself\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mexecutor\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0m\u001b[0;32m 852\u001b[0m \u001b[0msession\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0madd\u001b[0m\u001b[1;33m(\u001b[0m\u001b[0mjob\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n\u001b[0;32m 853\u001b[0m \u001b[0msession\u001b[0m\u001b[1;33m.\u001b[0m\u001b[0mcommit\u001b[0m\u001b[1;33m(\u001b[0m\u001b[1;33m)\u001b[0m\u001b[1;33m\u001b[0m\u001b[0m\n",
"\u001b[1;31mAttributeError\u001b[0m: 'DAG' object has no attribute 'dagbag'"
]
}
],
"prompt_number": 8
"prompt_number": 1
},
{
"cell_type": "heading",
Expand Down Expand Up @@ -442,6 +431,35 @@
"\n",
"Arbitrary executors can be derived from BaseExecutor. Expect a Celery, Redis/Mesos and other executors to be created soon. "
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from flux.operators import ExternalTaskSensor\n",
"from flux import DAG\n",
"from datetime import datetime\n",
"\n",
"# Setting some default operator parameters\n",
"default_args = {\n",
" 'owner': 'max',\n",
" 'mysql_dbid': 'local_mysql',\n",
"}\n",
"\n",
"# Initializing a directed acyclic graph\n",
"dag = DAG(dag_id='test_dag')\n",
"\n",
"wf_ext = ExternalTaskSensor(\n",
" task_id='wf_ext', external_dag_id='example_1', external_task_id='runme_0', **default_args)\n",
"dag.add_task(wf_ext)\n",
"\n",
" \n",
"dag.tree_view()\n",
"dag.run(start_date=datetime(2014, 9, 1), end_date=datetime(2014, 9, 1))\n"
],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
Expand Down
48 changes: 12 additions & 36 deletions docs/Flux Tutorial.ipynb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"metadata": {
"name": "",
"signature": "sha256:18cc8ae7304427d2cf4defc71d18d8f5e2701170ee376e73de767e25cbbf2271"
"signature": "sha256:9a2db68ca4fcf1753abba1d7b9af6535fdef71de15c381a458bea37e70f6a421"
},
"nbformat": 3,
"nbformat_minor": 0,
Expand Down Expand Up @@ -33,7 +33,7 @@
"cell_type": "code",
"collapsed": false,
"input": [
"from core.hooks import MySqlHook\n",
"from flux.hooks import MySqlHook\n",
"mysql_hook = MySqlHook(mysql_dbid='local_mysql')\n",
"sql = \"\"\"\n",
"SELECT table_schema, table_name \n",
Expand Down Expand Up @@ -90,7 +90,7 @@
"cell_type": "code",
"collapsed": false,
"input": [
"from core.operators import MySqlOperator\n",
"from flux.operators import MySqlOperator\n",
"from datetime import datetime, timedelta\n",
"\n",
"sql = \"\"\"\n",
Expand Down Expand Up @@ -124,8 +124,8 @@
"cell_type": "code",
"collapsed": false,
"input": [
"from core.operators import MySqlOperator\n",
"from core import DAG\n",
"from flux.operators import MySqlOperator\n",
"from flux import DAG\n",
"from datetime import datetime\n",
"\n",
"# Setting some default operator parameters\n",
Expand Down Expand Up @@ -436,8 +436,8 @@
"cell_type": "code",
"collapsed": false,
"input": [
"from core.operators import MySqlOperator\n",
"from core import DAG\n",
"from flux.operators import ExternalTaskSensor\n",
"from flux import DAG\n",
"from datetime import datetime\n",
"\n",
"# Setting some default operator parameters\n",
Expand All @@ -449,41 +449,17 @@
"# Initializing a directed acyclic graph\n",
"dag = DAG(dag_id='test_dag')\n",
"\n",
"# MySQL Operator \n",
"sql = \"TRUNCATE TABLE tmp;\"\n",
"mysql_fisrt = MySqlOperator(task_id='mysql_fisrt', sql=sql, **default_args)\n",
"dag.add_task(mysql_fisrt)\n",
"wf_ext = ExternalTaskSensor(\n",
" task_id='wf_ext', external_dag_id='example_1', external_task_id='runme_0', **default_args)\n",
"dag.add_task(wf_ext)\n",
"\n",
"sql = \"\"\"\n",
"INSERT INTO tmp\n",
"SELECT {{ macros.random() * 100 }};\n",
"\"\"\"\n",
"mysql_second = MySqlOperator(task_id='mysql_second', sql=sql, **default_args)\n",
"dag.add_task(mysql_second)\n",
"mysql_second.set_upstream(mysql_fisrt)\n",
" \n",
"dag.tree_view()\n",
"#dag.run(start_date=datetime(2014, 9, 1), end_date=datetime(2014, 9, 1))\n",
"\n",
"from core.models import TaskInstance as TI\n",
"ti = TI(dag.tasks[0], execution_date =datetime(2014,9,1))\n",
"print ti.command()\n",
"#print ti.task.pickle()"
"dag.run(start_date=datetime(2014, 9, 1), end_date=datetime(2014, 9, 1))\n"
],
"language": "python",
"metadata": {},
"outputs": [
{
"output_type": "stream",
"stream": "stdout",
"text": [
"<Task(MySqlOperator): mysql_second>\n",
" <Task(MySqlOperator): mysql_fisrt>\n",
"./flux run test_dag mysql_fisrt 2014-09-01T00:00:00 --pickle \"ccopy_reg|~|~|_reconstructor|~|~|p0|~|~|(ccore.operators.mysql_operator|~|~|MySqlOperator|~|~|p1|~|~|c__builtin__|~|~|object|~|~|p2|~|~|Ntp3|~|~|Rp4|~|~|(dp5|~|~|S'retries'|~|~|p6|~|~|I0|~|~|sS'_sa_instance_state'|~|~|p7|~|~|g0|~|~|(csqlalchemy.orm.state|~|~|InstanceState|~|~|p8|~|~|g2|~|~|Ntp9|~|~|Rp10|~|~|(dp11|~|~|S'manager'|~|~|p12|~|~|g0|~|~|(csqlalchemy.orm.instrumentation|~|~|_SerializeManager|~|~|p13|~|~|g2|~|~|Ntp14|~|~|Rp15|~|~|(dp16|~|~|S'class_'|~|~|p17|~|~|g1|~|~|sbsS'class_'|~|~|p18|~|~|g1|~|~|sS'modified'|~|~|p19|~|~|I01|~|~|sS'committed_state'|~|~|p20|~|~|(dp21|~|~|S'task_type'|~|~|p22|~|~|csqlalchemy.util.langhelpers|~|~|symbol|~|~|p23|~|~|(S'NO_VALUE'|~|~|p24|~|~|S'x'|~|~|p25|~|~|I5699626739644918547|~|~|tp26|~|~|Rp27|~|~|sS'task_id'|~|~|p28|~|~|g23|~|~|(S'NEVER_SET'|~|~|p29|~|~|g25|~|~|I-941588541056912994|~|~|tp30|~|~|Rp31|~|~|sS'end_date'|~|~|p32|~|~|g27|~|~|sS'dag'|~|~|p33|~|~|NsS'owner'|~|~|p34|~|~|g27|~|~|sS'depends_on_past'|~|~|p35|~|~|g27|~|~|sS'start_date'|~|~|p36|~|~|g27|~|~|sS'dag_id'|~|~|p37|~|~|g31|~|~|ssS'instance'|~|~|p38|~|~|g4|~|~|sS'callables'|~|~|p39|~|~|(dp40|~|~|sS'parents'|~|~|p41|~|~|(dp42|~|~|I65665808|~|~|g0|~|~|(g8|~|~|g2|~|~|Ntp43|~|~|Rp44|~|~|(dp45|~|~|g18|~|~|ccore.models|~|~|DAG|~|~|p46|~|~|sg39|~|~|(dp47|~|~|sg19|~|~|I01|~|~|sg38|~|~|g0|~|~|(g46|~|~|g2|~|~|Ntp48|~|~|Rp49|~|~|(dp50|~|~|S'schedule_interval'|~|~|p51|~|~|cdatetime|~|~|timedelta|~|~|p52|~|~|(I1|~|~|I0|~|~|I0|~|~|tp53|~|~|Rp54|~|~|sg7|~|~|g44|~|~|sS'_executor'|~|~|p55|~|~|ccore.executors.base_executor|~|~|LocalExecutor|~|~|p56|~|~|sg32|~|~|cdatetime|~|~|datetime|~|~|p57|~|~|(S'\\x07\\xde\\n\\x07\\x11\\x0c6\\x0e\\xc83'|~|~|p58|~|~|tp59|~|~|Rp60|~|~|sS'filepath'|~|~|p61|~|~|S'<ipython-input-3-106d89bd9ea4>'|~|~|p62|~|~|sS'tasks'|~|~|p63|~|~|g0|~|~|(csqlalchemy.orm.collections|~|~|InstrumentedList|~|~|p64|~|~|c__builtin__|~|~|list|~|~|p65|~|~|(lp66|~|~|g4|~|~|ag0|~|~|(g1|~|~|g2|~|~|Ntp67|~|~|Rp68|~|~|(dp69|~|~|g6|~|~|I0|~|~|sg7|~|~|g0|~|~|(g8|~|~|g2|~|~|Ntp70|~|~|Rp71|~|~|(dp72|~|~|g12|~|~|g0|~|~|(g13|~|~|g2|~|~|Ntp73|~|~|Rp74|~|~|(dp75|~|~|g17|~|~|g1|~|~|sbsg18|~|~|g1|~|~|sg19|~|~|I01|~|~|sg20|~|~|(dp76|~|~|g22|~|~|g27|~|~|sg28|~|~|g31|~|~|sg32|~|~|g27|~|~|sg33|~|~|Nsg34|~|~|g27|~|~|sg35|~|~|g27|~|~|sg36|~|~|g27|~|~|sg37|~|~|g31|~|~|ssg38|~|~|g68|~|~|sg39|~|~|(dp77|~|~|sg41|~|~|(dp78|~|~|I65665808|~|~|g44|~|~|ssbsg32|~|~|Nsg28|~|~|S'mysql_second'|~|~|p79|~|~|sS'sql'|~|~|p80|~|~|S'\\nINSERT INTO tmp\\nSELECT {{ macros.random() * 100 }};\\n'|~|~|p81|~|~|sS'hook'|~|~|p82|~|~|g0|~|~|(ccore.hooks.mysql_hook|~|~|MySqlHook|~|~|p83|~|~|g2|~|~|Ntp84|~|~|Rp85|~|~|(dp86|~|~|S'host'|~|~|p87|~|~|S'localhost'|~|~|p88|~|~|sS'db'|~|~|p89|~|~|S'flux'|~|~|p90|~|~|sS'login'|~|~|p91|~|~|S'flux'|~|~|p92|~|~|sS'psw'|~|~|p93|~|~|S'flux'|~|~|p94|~|~|sbsg33|~|~|g49|~|~|sg22|~|~|S'MySqlOperator'|~|~|p95|~|~|sS'params'|~|~|p96|~|~|(dp97|~|~|sS'_upstream_list'|~|~|p98|~|~|(lp99|~|~|g4|~|~|asS'_schedule_interval'|~|~|p100|~|~|g52|~|~|(I1|~|~|I0|~|~|I0|~|~|tp101|~|~|Rp102|~|~|sS'retry_delay'|~|~|p103|~|~|g52|~|~|(I0|~|~|I10|~|~|I0|~|~|tp104|~|~|Rp105|~|~|sg34|~|~|S'max'|~|~|p106|~|~|sg35|~|~|I00|~|~|sS'_downstream_list'|~|~|p107|~|~|(lp108|~|~|sg36|~|~|Nsg37|~|~|S'test_dag'|~|~|p109|~|~|sbatp110|~|~|Rp111|~|~|(dp112|~|~|S'_sa_adapter'|~|~|p113|~|~|g0|~|~|(csqlalchemy.orm.collections|~|~|CollectionAdapter|~|~|p114|~|~|g2|~|~|Ntp115|~|~|Rp116|~|~|(dp117|~|~|S'data'|~|~|p118|~|~|g111|~|~|sS'owner_state'|~|~|p119|~|~|g44|~|~|sS'key'|~|~|p120|~|~|g63|~|~|sbsbsS'parallelism'|~|~|p121|~|~|I0|~|~|sg37|~|~|g109|~|~|sS'task_count'|~|~|p122|~|~|I2|~|~|sbsg12|~|~|g0|~|~|(g13|~|~|g2|~|~|Ntp123|~|~|Rp124|~|~|(dp125|~|~|g17|~|~|g46|~|~|sbsg20|~|~|(dp126|~|~|g63|~|~|(lp127|~|~|sg122|~|~|g27|~|~|sg121|~|~|g27|~|~|sg37|~|~|g31|~|~|sg61|~|~|g27|~|~|ssbssbsg32|~|~|Nsg28|~|~|S'mysql_fisrt'|~|~|p128|~|~|sg80|~|~|S'TRUNCATE TABLE tmp;'|~|~|p129|~|~|sg82|~|~|g0|~|~|(g83|~|~|g2|~|~|Ntp130|~|~|Rp131|~|~|(dp132|~|~|g87|~|~|S'localhost'|~|~|p133|~|~|sg89|~|~|S'flux'|~|~|p134|~|~|sg91|~|~|S'flux'|~|~|p135|~|~|sg93|~|~|S'flux'|~|~|p136|~|~|sbsg33|~|~|g49|~|~|sg22|~|~|g95|~|~|sg96|~|~|(dp137|~|~|sg98|~|~|(lp138|~|~|sg100|~|~|g102|~|~|sg103|~|~|g105|~|~|sg34|~|~|g106|~|~|sg35|~|~|I00|~|~|sg107|~|~|(lp139|~|~|g68|~|~|asg36|~|~|Nsg37|~|~|g109|~|~|sb.\" \n"
]
}
],
"prompt_number": 3
"outputs": []
}
],
"metadata": {}
Expand Down
33 changes: 18 additions & 15 deletions src/flux/bin/flux_bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,20 @@ def backfill(args):


def run(args):

# Setting up logging
directory = settings.BASE_LOG_FOLDER + \
"/{args.dag_id}/{args.task_id}".format(args=args)
if not os.path.exists(directory):
os.makedirs(directory)
args.execution_date = dateutil.parser.parse(args.execution_date)
iso = args.execution_date.isoformat()
filename = "{directory}/{iso}".format(**locals())
logging.basicConfig(
filename=filename, level=logging.INFO, format=settings.LOG_FORMAT)
print("Logging into: " + filename)

if not args.pickle:
directory = settings.BASE_LOG_FOLDER + \
"/{args.dag_id}/{args.task_id}".format(args=args)
if not os.path.exists(directory):
os.makedirs(directory)
filename = "{directory}/{iso}".format(**locals())
logging.basicConfig(
filename=filename, level=logging.INFO, format=settings.LOG_FORMAT)

print("Logging into: " + filename)
dagbag = DagBag(args.subdir)
if args.dag_id not in dagbag.dags:
raise Exception('dag_id could not be found')
Expand All @@ -62,19 +64,20 @@ def run(args):
else:
session = settings.Session()
dag_pickle, = session.query(
DagPickle.pickle).filter(DagPickle.id==args.pickle).all()[0]
dag = pickle.get_object()
DagPickle).filter(DagPickle.id==args.pickle).all()
dag = dag_pickle.get_object()
task = dag.get_task(task_id=args.task_id)

ti = TaskInstance(task, args.execution_date)

# This is enough to fail the task instance
def signal_handler(signum, frame):
logging.error("SIGINT (ctrl-c) received".format(args.task_id))

ti.error(args.execution_date)
sys.exit()
signal.signal(signal.SIGINT, signal_handler)

task.run(
start_date=args.execution_date,
end_date=args.execution_date,
ti.run(
mark_success=args.mark_success,
force=args.force,
ignore_dependencies=args.ignore_dependencies)
Expand Down
14 changes: 13 additions & 1 deletion src/flux/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def command(self, mark_success=False, pickle=None):
"{self.dag_id} {self.task_id} {iso} "
"{mark_success} "
"{pickle} "
"{sub_dir} "
"{subdir} "
).format(**locals())

@property
Expand Down Expand Up @@ -217,6 +217,17 @@ def current_state(self, main_session=None):
session.close()
return state

def error(self, main_session=None):
"""
Fails the task
"""
session = settings.Session()
logging.error("Recording the task instance as FAILED")
self.state = State.FAILED
session.merge(self)
session.commit()
session.close()

def refresh_from_db(self, main_session=None):
session = main_session or settings.Session()
TI = TaskInstance
Expand Down Expand Up @@ -499,6 +510,7 @@ def run(self, dag, start_date=None, end_date=None, mark_success=False):
mark_success=mark_success,
pickle=pickle)
)
ti.state = State.RUNNING
if task_instances:
self.heartbeat()
executor.heartbeat()
Expand Down
1 change: 1 addition & 0 deletions src/flux/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from bash_operator import BashOperator
from mysql_operator import MySqlOperator
from base_sensor_operator import MySqlSensorOperator
from base_sensor_operator import ExternalTaskSensor
from dummy_operator import DummyOperator
40 changes: 37 additions & 3 deletions src/flux/operators/base_sensor_operator.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from datetime import datetime
import logging
from time import sleep

from flux.models import BaseOperator
from flux import settings
from flux.models import BaseOperator, TaskInstance, State
from flux.hooks import MySqlHook


class BaseSensorOperator(BaseOperator):

def __init__(self, poke_interval=5, timeout=60*60*24*7, *args, **kwargs):
Expand Down Expand Up @@ -39,7 +40,7 @@ def __init__(self, mysql_dbid, sql, *args, **kwargs):
self.hook = MySqlHook(mysql_dbid=mysql_dbid)

def poke(self):
print('Poking: ' + self.sql)
logging.info('Poking: ' + self.sql)
records = self.hook.get_records(self.sql)
if not records:
return False
Expand All @@ -49,3 +50,36 @@ def poke(self):
else:
return True
print(records[0][0])

class ExternalTaskSensor(BaseSensorOperator):
"""
Waits for a task to complete in a different DAG
"""
template_fields = ('execution_date',)
__mapper_args__ = {
'polymorphic_identity': 'MySqlSensorOperator'
}

def __init__(self, external_dag_id, external_task_id, *args, **kwargs):
super(ExternalTaskSensor, self).__init__(*args, **kwargs)
self.external_dag_id = external_dag_id
self.external_task_id = external_task_id
self.execution_date = "{{ execution_date }}"

def poke(self):
logging.info(
'Poking for '
'{self.external_dag_id}.'
'{self.external_task_id} on '
'{self.execution_date} ... '.format(**locals()))
TI = TaskInstance
session = settings.Session()
count = session.query(TI).filter(
TI.dag_id == self.external_dag_id,
TI.task_id == self.external_task_id,
TI.state == State.SUCCESS,
TI.execution_date == self.execution_date
).count()
session.commit()
session.close()
return count
7 changes: 5 additions & 2 deletions src/www/templates/admin/gantt.html
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
{% extends "admin/dag.html" %}

{% block head_css %}
{{ super() }}
<link href="{{ admin_static.url(filename='datetimepicker/bootstrap-datetimepicker.css') }}" rel="stylesheet">
{% endblock %}

{% block body %}
{{ super() }}
<form method="get">
Expand Down Expand Up @@ -34,9 +39,7 @@
$('#container').highcharts(hc);

// Activating the date time picker widget
$('#execution_date').addClass("form-control").datetimepicker();
$('#execution_date').datetimepicker();
console.log($('#execution_date'));
</script>

{% endblock %}

0 comments on commit 1f6f538

Please sign in to comment.