Skip to content

Commit

Permalink
first version of the celery executor
Browse files Browse the repository at this point in the history
  • Loading branch information
Krishna Puttaswamy committed Oct 26, 2014
1 parent 5f94760 commit 67020e5
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/bin/airflow
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def run(args):
dag = dag_pickle.get_object()
task = dag.get_task(task_id=args.task_id)

# TODO: shouldn't the task instance run with the appropriate executor?
ti = TaskInstance(task, args.execution_date)

# This is enough to fail the task instance
Expand Down
2 changes: 2 additions & 0 deletions airflow/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from base_executor import LocalExecutor
from base_executor import SequentialExecutor
from base_executor import CeleryExecutor

DEFAULT_EXECUTOR = SequentialExecutor()
# DEFAULT_EXECUTOR = CeleryExecutor()
63 changes: 63 additions & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import time
from celery_worker import execute_command

from airflow import settings
from airflow.utils import State
Expand Down Expand Up @@ -120,6 +121,68 @@ def end(self):
# Wait for commands to finish
self.queue.join()

class CelerySubmitter(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue

def run(self):
while True:
key, command = self.task_queue.get()
if command == None:
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
BASE_FOLDER = settings.BASE_FOLDER
command = (
"exec bash -c '"
"cd $AIRFLOW_HOME;\n" +
"source init.sh;\n" +
command +
"'"
).format(**locals())

try:
res = execute_command.delay(command)
result = res.get()
except Exception as e:
self.result_queue.put((key, State.FAILED))
raise e
self.result_queue.put((key, State.SUCCESS))
self.task_queue.task_done()
time.sleep(1)


class CeleryExecutor(BaseExecutor):
""" Submits the task to RabbitMQ, which is picked up and executed by a bunch
of worker processes """
def __init__(self, parallelism=1):
super(CeleryExecutor, self).__init__()
self.parallelism = parallelism

def start(self):
self.queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.Queue()
self.workers = [ CelerySubmitter(self.queue, self.result_queue) for i in xrange(self.parallelism) ]

for w in self.workers:
w.start()

def execute_async(self, key, command):
self.queue.put((key, command))

def heartbeat(self):
while not self.result_queue.empty():
results = self.result_queue.get()
self.change_state(*results)

def end(self):
# Sending poison pill to all worker
[self.queue.put(None) for w in self.workers]
self.queue.join()


class SequentialExecutor(BaseExecutor):
"""
Expand Down
18 changes: 18 additions & 0 deletions airflow/executors/celery_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import subprocess
import time
import logging
from celery import Celery

# to start the celery worker, run the command:
# "celery -A airflow.executors.celery_worker worker --loglevel=info"

app = Celery('airflow.executors.celery_worker', backend='amqp', broker='amqp://')

@app.task (name='airflow.executors.celery_worker.execute_command')
def execute_command(command):
logging.info("Executing command in Celery " + command)
try:
subprocess.Popen(command, shell=True).wait()
except Exception as e:
raise e
return True
2 changes: 2 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import jinja2
import logging
import os
import sys
import pickle
import re
from time import sleep
Expand Down Expand Up @@ -58,6 +59,7 @@ def process_file(self, filepath):
m = imp.load_source(mod_name, filepath)
except:
logging.error("Failed to import: " + filepath)
logging.error("Exception: " + str(sys.exc_info()))
else:
for dag in m.__dict__.values():
if type(dag) == DAG:
Expand Down
2 changes: 1 addition & 1 deletion dags/examples/example2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

default_args = {
'owner': 'mistercrunch',
'start_date': datetime(2014, 9, 1),
'start_date': datetime(2014, 10, 1),
}

dag = DAG(dag_id='example_2')
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ sphinx
sphinx_rtd_theme
sqlalchemy
thrift
celery
hive-thrift-py

0 comments on commit 67020e5

Please sign in to comment.