Skip to content

Commit

Permalink
Providing a way to specify the executor to use while constructing the…
Browse files Browse the repository at this point in the history
… DAG object

adding host=0.0.0.0 to ensure the service is accesible from outside localhost

Revert "adding host=0.0.0.0 to ensure the service is accesible from outside localhost"

This reverts commit ce5436bc7dcd70dcb129c1dfa46f9fd7ce743e43.
  • Loading branch information
Krishna Puttaswamy committed Nov 7, 2014
1 parent 88815ad commit 25cb726
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 8 deletions.
9 changes: 4 additions & 5 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,10 +927,11 @@ def __init__(
self, dag_id,
schedule_interval=timedelta(days=1),
start_date=None, end_date=None, parallelism=0,
full_filepath=None):
full_filepath=None, executor=DEFAULT_EXECUTOR):

utils.validate_key(dag_id)
self.dag_id = dag_id
self.executor = executor
self.end_date = end_date or datetime.now()
self.parallelism = parallelism
self.schedule_interval = schedule_interval
Expand Down Expand Up @@ -1088,11 +1089,9 @@ def db_merge(self):
session.merge(self)
session.commit()

def run(
self, start_date=None, end_date=None, mark_success=False,
executor=DEFAULT_EXECUTOR):
def run(self, start_date=None, end_date=None, mark_success=False):
session = settings.Session()
job = BackfillJob(executor=executor)
job = BackfillJob(executor=self.executor)
session.add(job)
session.commit()
job.run(self, start_date, end_date, mark_success)
Expand Down
6 changes: 5 additions & 1 deletion dags/examples/example1.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from airflow.operators import BashOperator, MySqlOperator, DummyOperator
from airflow.models import DAG
from airflow.executors import SequentialExecutor
from airflow.executors import LocalExecutor
from datetime import datetime

default_args = {
Expand All @@ -8,7 +10,8 @@
'mysql_dbid': 'local_mysql',
}

dag = DAG(dag_id='example_1')
dag = DAG(dag_id='example_1', executor=LocalExecutor())
# dag = DAG(dag_id='example_1', executor=SequentialExecutor())

cmd = 'ls -l'
run_this_last = DummyOperator(
Expand Down Expand Up @@ -45,3 +48,4 @@
dag.add_task(task)
task.set_downstream(run_this_last)
task.set_upstream(create_table)

5 changes: 3 additions & 2 deletions dags/examples/simple.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from airflow.operators import MySqlOperator
from airflow.executors import SequentialExecutor
from airflow.executors import LocalExecutor
from airflow import DAG
from datetime import datetime

# Setting some default operator parameters
default_args = {
'owner': 'max',
'owner': 'max',
'mysql_dbid': 'local_mysql',
}

# Initializing a directed acyclic graph
dag = DAG(dag_id='simple', )
dag = DAG(dag_id='simple', executor=LocalExecutor())

# MySQL Operator
sql = "TRUNCATE TABLE tmp;"
Expand Down

0 comments on commit 25cb726

Please sign in to comment.