Skip to content

Commit

Permalink
Merge branch 'master' of github.com:mistercrunch/Airflow
Browse files Browse the repository at this point in the history
Conflicts:
	airflow/www/app.py
  • Loading branch information
mistercrunch committed Oct 31, 2014
2 parents f10e90e + eee9332 commit eb885d2
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 21 deletions.
1 change: 1 addition & 0 deletions airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging


class BaseExecutor(object):

def __init__(self):
Expand Down
9 changes: 5 additions & 4 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import multiprocessing
import subprocess
import time

from airflow.executors.base_executor import BaseExecutor
from airflow import settings
from airflow.utils import State

from celery_worker import execute_command


class CeleryExecutor(BaseExecutor):
""" Submits the task to RabbitMQ, which is picked up and executed by a bunch
of worker processes """
Expand All @@ -18,7 +17,9 @@ def __init__(self, parallelism=1):
def start(self):
self.queue = multiprocessing.JoinableQueue()
self.result_queue = multiprocessing.Queue()
self.workers = [ CelerySubmitter(self.queue, self.result_queue) for i in xrange(self.parallelism) ]
self.workers = [
CelerySubmitter(self.queue, self.result_queue)
for i in xrange(self.parallelism)]

for w in self.workers:
w.start()
Expand Down Expand Up @@ -47,7 +48,7 @@ def __init__(self, task_queue, result_queue):
def run(self):
while True:
key, command = self.task_queue.get()
if command == None:
if command is None:
# Received poison pill, no more tasks to run
self.task_queue.task_done()
break
Expand Down
9 changes: 6 additions & 3 deletions airflow/executors/celery_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import subprocess
import time
import logging
from celery import Celery

Expand All @@ -9,9 +8,13 @@
# "celery -A airflow.executors.celery_worker worker --loglevel=info"

# app = Celery('airflow.executors.celery_worker', backend='amqp', broker='amqp://')
app = Celery(settings.CELERY_APP_NAME, backend=settings.CELERY_BROKER, broker=settings.CELERY_RESULTS_BACKEND)
app = Celery(
settings.CELERY_APP_NAME,
backend=settings.CELERY_BROKER,
broker=settings.CELERY_RESULTS_BACKEND)

@app.task (name='airflow.executors.celery_worker.execute_command')

@app.task(name='airflow.executors.celery_worker.execute_command')
def execute_command(command):
logging.info("Executing command in Celery " + command)
try:
Expand Down
1 change: 1 addition & 0 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from airflow.executors.base_executor import BaseExecutor


class LocalWorker(multiprocessing.Process):

def __init__(self, task_queue, result_queue):
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import subprocess

from airflow.executors.base_executor import BaseExecutor
from airflow.utils import State

from airflow.executors.base_executor import BaseExecutor

class SequentialExecutor(BaseExecutor):
"""
Expand Down
13 changes: 4 additions & 9 deletions airflow/hooks/hive_hook.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import logging
import subprocess
import sys
import os
from airflow.models import DatabaseConnection
from airflow import settings

# Adding the Hive python libs to python path
sys.path.insert(0, settings.HIVE_HOME_PY)

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from hive_metastore import ThriftHiveMetastore
from hive_service import ThriftHive

from airflow.hooks.base_hook import BaseHook
Expand Down Expand Up @@ -55,9 +52,7 @@ def check_for_partition(self, schema, table, partition):
else:
return False
except Exception as e:
logging.error(
"Metastore down? Activing as if partition doesn't "
"exist to be safe...")
logging.error(e)
return False

def get_records(self, hql, schema=None):
Expand Down Expand Up @@ -99,14 +94,14 @@ def max_partition(self, schema, table):
for tables that have a single partition key. For subpartitionned
table, we recommend using signal tables.
'''
table = client.get_table(dbname=schema, tbl_name=table)
table = self.hive.get_table(dbname=schema, tbl_name=table)
if len(table.partitionKeys) == 0:
raise Exception("The table isn't partitionned")
elif len(table.partitionKeys) >1:
elif len(table.partitionKeys) > 1:
raise Exception(
"The table is partitionned by multiple columns, "
"use a signal table!")
else:
parts = client.get_partitions(
parts = self.hive.get_partitions(
db_name='core_data', tbl_name='dim_users', max_parts=32767)
return max([p.values[0] for p in parts])
4 changes: 2 additions & 2 deletions airflow/macros/hive.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from airflow import settings


def max_partition(
from airflow.hooks.hive_hook import HiveHook
table, schema="default", hive_dbid=settings.HIVE_DEFAULT_DBID):
from airflow.hooks.hive_hook import HiveHook
hh = HiveHook(hive_dbid=hive_dbid)
return hh.max_partition(schema=schema, table=table)

1 change: 0 additions & 1 deletion airflow/operators/hive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def __init__(
self.hook = HiveHook(hive_dbid=hive_dbid)
self.hql = hql


def execute(self, execution_date):
logging.info('Executing: ' + self.hql)
self.hook.run_cli(hql=self.hql, schema=self.hive_dbid)
3 changes: 2 additions & 1 deletion airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
class DateTimeForm(Form):
execution_date = DateTimeField("Execution date")


class GraphForm(Form):
execution_date = DateTimeField("Execution date")
arrange = SelectField("Layout", choices=(
Expand Down Expand Up @@ -149,7 +150,7 @@ def task(self):
if not attr_name.startswith('_'):
attr = getattr(task, attr_name)
if type(attr) != type(self.task) and attr_name not in sql_attrs:
s += attr_name + ': ' + str(attr) + '\n'
s += attr_name + ': ' + str(attr) + '\n'

s = s.replace('<', '&lt')
s = s.replace('>', '&gt')
Expand Down

0 comments on commit eb885d2

Please sign in to comment.