Skip to content

Commit

Permalink
merge upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeremiah Lowin committed Aug 14, 2015
2 parents 999d16f + debd41e commit 367ede6
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 14 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ if you may.
* Lyft
* Stripe [@jbalogh]
* Wooga
* Xoom [[@gepser](https://github.com/gepser) & [@omarvides](https://github.com/omarvides)]
* Yahoo!

## Documentation
Expand Down
2 changes: 1 addition & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class AirflowConfigException(Exception):
# The home folder for airflow, default is ~/airflow
airflow_home = {AIRFLOW_HOME}
# The folder where you airflow pipelines live, most likely a
# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
dags_folder = {AIRFLOW_HOME}/dags
Expand Down
6 changes: 5 additions & 1 deletion airflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1686,7 +1686,11 @@ class DAG(object):
Note that jinja/airflow includes the path of your DAG file by
default
:type template_searchpath: string or list of stings
:param user_defined_macros: a dictionary of macros that will be merged
:param user_defined_macros: a dictionary of macros that will be exposed
in your jinja templates. For example, passing ``dict(foo='bar')``
to this argument allows you to ``{{ foo }}`` in all jinja
templates related to this DAG. Note that you can pass any
type of object here.
:type user_defined_macros: dict
:param default_args: A dictionary of default parameters to be used
as constructor keyword parameters when initialising operators.
Expand Down
5 changes: 4 additions & 1 deletion airflow/operators/hive_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ class HiveOperator(BaseOperator):
:param hive_cli_conn_id: reference to the Hive database
:type hive_cli_conn_id: string
:param hiveconf_jinja_translate: when True, hiveconf-type templating
${var} gets translated into jina-type templating {{ var }}
${var} gets translated into jina-type templating {{ var }}. Note that
you may want to use along this along with the
``DAG(user_defined_macros=myargs)`` parameter. View the DAG
object documentation for more details.
:type hiveconf_jinja_translate: boolean
:param script_begin_tag: If defined, the operator will get rid of the
part of the script before the first occurrence of `script_begin_tag`
Expand Down
7 changes: 7 additions & 0 deletions airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,10 @@ def _inner_gen():
return (obj,)
else:
return tuple(_inner_gen())


def round_time(dt, delta):
delta = delta.total_seconds()
seconds = (dt - dt.min).seconds
rounding = (seconds + delta / 2) // delta * delta
return dt + timedelta(0, rounding - seconds, -dt.microsecond)
7 changes: 4 additions & 3 deletions airflow/www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -1002,20 +1002,21 @@ def tree(self):
base_date = datetime.now()
else:
base_date = dateutil.parser.parse(base_date)
base_date = utils.round_time(base_date, dag.schedule_interval)

start_date = dag.start_date
if not start_date:
if not start_date and 'start_date' in dag.default_args:
start_date = dag.default_args['start_date']

if start_date:
difference = base_date - start_date
offset = timedelta(seconds=int(difference.total_seconds() % dag.schedule_interval.total_seconds()))
base_date -= offset
base_date -= timedelta(microseconds=base_date.microsecond)

num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else 25
from_date = (base_date-(num_runs * dag.schedule_interval))
from_date = (base_date - (num_runs * dag.schedule_interval))

dates = utils.date_range(
from_date, base_date, dag.schedule_interval)
Expand Down
18 changes: 17 additions & 1 deletion run_unit_tests.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
export AIRFLOW_HOME=${AIRFLOW_HOME:=~/airflow}
export AIRFLOW_CONFIG=$AIRFLOW_HOME/unittests.cfg
nosetests --with-doctest --with-coverage --cover-erase --cover-html --cover-package=airflow -v --cover-html-dir=airflow/www/static/coverage --logging-level=DEBUG

# Generate the `airflow` executable if needed
which airflow > /dev/null || python setup.py develop

# initialize the test db
AIRFLOW_DB=$AIRFLOW_HOME/unittests.db
ls -s $AIRFLOW_DB > /dev/null 2>&1 || airflow initdb # if it's missing
ls -s $AIRFLOW_DB | egrep '^0 ' > /dev/null && airflow initdb # if it's blank

nosetests --with-doctest \
--with-coverage \
--cover-erase \
--cover-html \
--cover-package=airflow \
--cover-html-dir=airflow/www/static/coverage \
-v \
--logging-level=DEBUG
# To run individual tests:
# nosetests tests.core:CoreTest.test_scheduler_job
37 changes: 30 additions & 7 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
NUM_EXAMPLE_DAGS = 5
DEV_NULL = '/dev/null'
LOCAL_EXECUTOR = executors.LocalExecutor()
DEFAULT_DATE = datetime(2015, 8, 1)
DEFAULT_DATE = datetime(2015, 1, 1)
TEST_DAG_ID = 'unit_tests'
configuration.test_mode()

Expand All @@ -22,6 +22,7 @@ def setUp(self):
utils.initdb()
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
dag = DAG(TEST_DAG_ID, default_args=args)
dag.clear(start_date=DEFAULT_DATE, end_date=datetime.now())
self.dag = dag

def test_mysql_to_hive(self):
Expand Down Expand Up @@ -170,6 +171,14 @@ def test_time_sensor(self):
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

def test_clear_api(self):
task = self.dag_bash.tasks[0]
task.clear(
start_date=DEFAULT_DATE, end_date=DEFAULT_DATE,
upstream=True, downstream=True)
ti = models.TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.are_dependents_done()

def test_bash_operator(self):
t = operators.BashOperator(
task_id='time_sensor_check',
Expand All @@ -187,15 +196,15 @@ def test_timedelta_sensor(self):
def test_external_task_sensor(self):
t = operators.ExternalTaskSensor(
task_id='test_external_task_sensor_check',
external_dag_id='core_test',
external_dag_id=TEST_DAG_ID,
external_task_id='time_sensor_check',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

def test_external_task_sensor_delta(self):
t = operators.ExternalTaskSensor(
task_id='test_external_task_sensor_check_delta',
external_dag_id='core_test',
external_dag_id=TEST_DAG_ID,
external_task_id='time_sensor_check',
execution_delta=timedelta(0),
allowed_states=['success'],
Expand Down Expand Up @@ -316,16 +325,30 @@ def test_dag_views(self):
'dag_id=example_bash_operator&future=true&past=false&'
'upstream=true&downstream=false&'
'execution_date=2015-01-01T00:00:00&'
'origin=http%3A%2F%2Fjn8.brain.musta.ch%3A8080%2Fadmin%2Fairflow'
'%2Ftree%3Fnum_runs%3D65%26dag_id%3Dexample_bash_operator')
'origin=/admin')
assert "Wait a minute" in response.data
response = self.app.get(
'/admin/airflow/action?action=clear&task_id=run_this_last&'
'dag_id=example_bash_operator&future=true&past=false&'
'upstream=true&downstream=false&'
'execution_date=2015-01-01T00:00:00&confirmed=true&'
'origin=http%3A%2F%2Fjn8.brain.musta.ch%3A8080%2Fadmin%2Fairflow'
'%2Ftree%3Fnum_runs%3D65%26dag_id%3Dexample_bash_operator')
'origin=/admin')
url = (
'/admin/airflow/action?action=success&task_id=runme_0&'
'dag_id=example_bash_operator&upstream=false&'
'downstream=false&execution_date=2015-08-12&'
'origin=/admin')
response = self.app.get(url)
assert "Wait a minute" in response.data
response = self.app.get(url + "&confirmed=true")
url = (
"/admin/airflow/action?action=run&task_id=runme_0&"
"dag_id=example_bash_operator&force=true&deps=true&"
"execution_date=2015-08-12T00:00:00&origin=/admin")
response = self.app.get(url)
response = self.app.get(
"/admin/airflow/refresh?dag_id=example_bash_operator")
response = self.app.get("/admin/airflow/refresh_all")

def test_charts(self):
response = self.app.get(
Expand Down

0 comments on commit 367ede6

Please sign in to comment.