Skip to content

Commit

Permalink
Merge branch 'master' of github.com:mistercrunch/Airflow
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Oct 27, 2014
2 parents 5800ac1 + c5c00c6 commit b8eb9fe
Show file tree
Hide file tree
Showing 16 changed files with 235 additions and 105 deletions.
1 change: 1 addition & 0 deletions airflow/bin/airflow
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def run(args):
dag = dag_pickle.get_object()
task = dag.get_task(task_id=args.task_id)

# TODO: add run_local and fire it with the right executor from run
ti = TaskInstance(task, args.execution_date)

# This is enough to fail the task instance
Expand Down
7 changes: 5 additions & 2 deletions airflow/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from base_executor import LocalExecutor
from base_executor import SequentialExecutor
from celery_executor import CeleryExecutor
from local_executor import LocalExecutor
from sequential_executor import SequentialExecutor

# DEFAULT_EXECUTOR = CeleryExecutor()
# DEFAULT_EXECUTOR = LocalExecutor()
DEFAULT_EXECUTOR = SequentialExecutor()
98 changes: 0 additions & 98 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import logging
import time

from airflow import settings
from airflow.utils import State


class BaseExecutor(object):

Expand Down Expand Up @@ -51,96 +46,3 @@ def end(self):
"""
raise NotImplementedError()

import multiprocessing
import subprocess


class LocalWorker(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue

def run(self):
proc_name = self.name
while True:
key, command = self.task_queue.get()
if key is None:
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
BASE_FOLDER = settings.BASE_FOLDER
print command
command = (
"exec bash -c '"
"cd $AIRFLOW_HOME;\n" +
"source init.sh;\n" +
command +
"'"
).format(**locals())
try:
sp = subprocess.Popen(command, shell=True).wait()
except Exception as e:
self.result_queue.put((key, State.FAILED))
raise e
self.result_queue.put((key, State.SUCCESS))
self.task_queue.task_done()
time.sleep(1)


class LocalExecutor(BaseExecutor):

def __init__(self, parallelism=8):
super(LocalExecutor, self).__init__()
self.parallelism = parallelism

def start(self):
self.queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.Queue()
self.workers = [
LocalWorker(self.queue, self.result_queue)
for i in xrange(self.parallelism)
]

for w in self.workers:
w.start()

def execute_async(self, key, command):
self.queue.put((key, command))

def heartbeat(self):
while not self.result_queue.empty():
results = self.result_queue.get()
self.change_state(*results)

def end(self):
# Sending poison pill to all worker
[self.queue.put((None, None)) for w in self.workers]
# Wait for commands to finish
self.queue.join()


class SequentialExecutor(BaseExecutor):
"""
Will only run one task instance at a time, can be used for debugging.
"""
def __init__(self):
super(SequentialExecutor, self).__init__()
self.commands_to_run = []

def queue_command(self, key, command):
self.commands_to_run.append((key, command,))

def heartbeat(self):
for key, command in self.commands_to_run:
try:
sp = subprocess.Popen(command, shell=True).wait()
except Exception as e:
self.change_state(key, State.FAILED)
raise e
self.change_state(key, State.SUCCESS)
self.commands_to_run = []

def end(self):
pass
72 changes: 72 additions & 0 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import multiprocessing
import subprocess
import time

from airflow.executors.base_executor import BaseExecutor
from airflow import settings
from airflow.utils import State

from celery_worker import execute_command

class CeleryExecutor(BaseExecutor):
""" Submits the task to RabbitMQ, which is picked up and executed by a bunch
of worker processes """
def __init__(self, parallelism=1):
super(CeleryExecutor, self).__init__()
self.parallelism = parallelism

def start(self):
self.queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.Queue()
self.workers = [ CelerySubmitter(self.queue, self.result_queue) for i in xrange(self.parallelism) ]

for w in self.workers:
w.start()

def execute_async(self, key, command):
self.queue.put((key, command))

def heartbeat(self):
while not self.result_queue.empty():
results = self.result_queue.get()
self.change_state(*results)

def end(self):
# Sending poison pill to all worker
[self.queue.put(None) for w in self.workers]
self.queue.join()


class CelerySubmitter(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue

def run(self):
while True:
key, command = self.task_queue.get()
if command == None:
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
BASE_FOLDER = settings.BASE_FOLDER
command = (
"exec bash -c '"
"cd $AIRFLOW_HOME;\n" +
"source init.sh;\n" +
command +
"'"
).format(**locals())

try:
res = execute_command.delay(command)
result = res.get()
except Exception as e:
self.result_queue.put((key, State.FAILED))
raise e
self.result_queue.put((key, State.SUCCESS))
self.task_queue.task_done()
time.sleep(1)

21 changes: 21 additions & 0 deletions airflow/executors/celery_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import subprocess
import time
import logging
from celery import Celery

from airflow import settings

# to start the celery worker, run the command:
# "celery -A airflow.executors.celery_worker worker --loglevel=info"

# app = Celery('airflow.executors.celery_worker', backend='amqp', broker='amqp://')
app = Celery(settings.CELERY_APP_NAME, backend=settings.CELERY_BROKER, broker=settings.CELERY_RESULTS_BACKEND)

@app.task (name='airflow.executors.celery_worker.execute_command')
def execute_command(command):
logging.info("Executing command in Celery " + command)
try:
subprocess.Popen(command, shell=True).wait()
except Exception as e:
raise e
return True
74 changes: 74 additions & 0 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import logging
import multiprocessing
import subprocess
import time

from airflow import settings
from airflow.utils import State

from airflow.executors.base_executor import BaseExecutor

class LocalWorker(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue

def run(self):
proc_name = self.name
while True:
key, command = self.task_queue.get()
if key is None:
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
BASE_FOLDER = settings.BASE_FOLDER
print command
command = (
"exec bash -c '"
"cd $AIRFLOW_HOME;\n" +
"source init.sh;\n" +
command +
"'"
).format(**locals())
try:
sp = subprocess.Popen(command, shell=True).wait()
except Exception as e:
self.result_queue.put((key, State.FAILED))
raise e
self.result_queue.put((key, State.SUCCESS))
self.task_queue.task_done()
time.sleep(1)


class LocalExecutor(BaseExecutor):

def __init__(self, parallelism=8):
super(LocalExecutor, self).__init__()
self.parallelism = parallelism

def start(self):
self.queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.Queue()
self.workers = [
LocalWorker(self.queue, self.result_queue)
for i in xrange(self.parallelism)
]

for w in self.workers:
w.start()

def execute_async(self, key, command):
self.queue.put((key, command))

def heartbeat(self):
while not self.result_queue.empty():
results = self.result_queue.get()
self.change_state(*results)

def end(self):
# Sending poison pill to all worker
[self.queue.put((None, None)) for w in self.workers]
# Wait for commands to finish
self.queue.join()
29 changes: 29 additions & 0 deletions airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import subprocess

from airflow.utils import State

from airflow.executors.base_executor import BaseExecutor

class SequentialExecutor(BaseExecutor):
"""
Will only run one task instance at a time, can be used for debugging.
"""
def __init__(self):
super(SequentialExecutor, self).__init__()
self.commands_to_run = []

def queue_command(self, key, command):
self.commands_to_run.append((key, command,))

def heartbeat(self):
for key, command in self.commands_to_run:
try:
sp = subprocess.Popen(command, shell=True).wait()
except Exception as e:
self.change_state(key, State.FAILED)
raise e
self.change_state(key, State.SUCCESS)
self.commands_to_run = []

def end(self):
pass
Empty file.
4 changes: 3 additions & 1 deletion airflow/hooks/presto/presto_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,6 @@ def get_pandas_df(self, hql, schema="default"):
else:
raise PrestoException(self.client.getlasterrormessage())


def run(self, hql, schema="default"):
if not self.client.runquery(hql, schema):
raise PrestoException(self.client.getlasterrormessage())
3 changes: 3 additions & 0 deletions airflow/macros/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from random import random
from datetime import datetime
import time
8 changes: 8 additions & 0 deletions airflow/macros/hive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from airflow import settings
from airflow.hooks.hive_hook import HiveHook

def max_partition(
table, schema="default", hive_dbid=settings.HIVE_DEFAULT_DBID):
hh = HiveHook(hive_dbid=hive_dbid)
return hh.max_partition(schema=schema, table=table)

2 changes: 2 additions & 0 deletions airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import jinja2
import logging
import os
import sys
import pickle
import re
from time import sleep
Expand Down Expand Up @@ -58,6 +59,7 @@ def process_file(self, filepath):
m = imp.load_source(mod_name, filepath)
except:
logging.error("Failed to import: " + filepath)
logging.error("Exception: " + str(sys.exc_info()))
else:
for dag in m.__dict__.values():
if type(dag) == DAG:
Expand Down
11 changes: 11 additions & 0 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,14 @@
#engine = create_engine('mysql://airflow:airflow@localhost/airflow')
engine = create_engine('sqlite:///' + BASE_FOLDER + '/airflow.db' )
Session.configure(bind=engine)
HEADER = """\
.__ _____.__
_____ |__|_______/ ____\ | ______ _ __
\__ \ | \_ __ \ __\| | / _ \ \/ \/ /
/ __ \| || | \/| | | |_( <_> ) /
(____ /__||__| |__| |____/\____/ \/\_/
\/"""

CELERY_APP_NAME = "airflow.executors.celery_worker"
CELERY_BROKER = "amqp"
CELERY_RESULTS_BACKEND = "amqp://"
2 changes: 1 addition & 1 deletion dags/examples/example2.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

default_args = {
'owner': 'mistercrunch',
'start_date': datetime(2014, 9, 1),
'start_date': datetime(2014, 10, 1),
}

dag = DAG(dag_id='example_2')
Expand Down
Loading

0 comments on commit b8eb9fe

Please sign in to comment.