Skip to content

Commit

Permalink
Improving the unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Feb 16, 2015
1 parent 614ccc5 commit 55c268d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 6 deletions.
8 changes: 4 additions & 4 deletions airflow/hooks/hive_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def check_for_partition(self, schema, table, partition):
>>> hh = HiveHook()
>>> t = 'static_babynames_partitioned'
>>> hh.check_for_partition('airflow', t, "year='2008'")
>>> hh.check_for_partition('airflow', t, "ds='2015-01-01'")
True
'''
self.hive._oprot.trans.open()
Expand Down Expand Up @@ -184,9 +184,9 @@ def get_partitions(self, schema, table_name):
>>> t = 'static_babynames_partitioned'
>>> parts = hh.get_partitions(schema='airflow', table_name=t)
>>> len(parts)
49
1
>>> max(parts)
'2008'
'2015-01-01'
'''
self.hive._oprot.trans.open()
table = self.hive.get_table(dbname=schema, tbl_name=table_name)
Expand All @@ -212,7 +212,7 @@ def max_partition(self, schema, table_name):
>>> hh = HiveHook()
>>> t = 'static_babynames_partitioned'
>>> hh.max_partition(schema='airflow', table_name=t)
'2008'
'2015-01-01'
'''
return max(self.get_partitions(schema, table_name))

Expand Down
1 change: 0 additions & 1 deletion airflow/operators/hive_operator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import re

from airflow.configuration import conf
from airflow.hooks import HiveHook
from airflow.models import BaseOperator
from airflow.utils import apply_defaults
Expand Down
4 changes: 4 additions & 0 deletions airflow/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def apply_defaults(func):
'''
@wraps(func)
def wrapper(*args, **kwargs):
if len(args) > 1:
print args
raise Exception(
"Use keyword arguments when initializing operators")
dag_args = {}
if 'dag' in kwargs and kwargs['dag']:
dag = kwargs['dag']
Expand Down
1 change: 1 addition & 0 deletions run_unit_tests.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export AIRFLOW_CONFIG=~/airflow/unittests.cfg
rm airflow/www/static/coverage/*
nosetests --with-doctest --with-coverage --cover-html --cover-package=airflow --cover-html-dir=airflow/www/static/coverage
69 changes: 68 additions & 1 deletion tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import unittest
from airflow import configuration
configuration.test_mode()
from airflow import jobs, models, executors, utils
from airflow import jobs, models, DAG, executors, utils, operators
from airflow.www.app import app

NUM_EXAMPLE_DAGS = 3
Expand All @@ -12,6 +12,40 @@
configuration.test_mode()


class HivePrestoTest(unittest.TestCase):

def setUp(self):
args = {'owner': 'airflow', 'start_date': datetime(2015, 1, 1)}
dag = DAG('hive_test', default_args=args)
self.dag = dag

def test_hive(self):
hql = """
USE airflow;
DROP TABLE IF EXISTS static_babynames_partitioned;
CREATE TABLE IF NOT EXISTS static_babynames_partitioned (
state string,
year string,
name string,
gender string,
num int)
PARTITIONED BY (ds string);
INSERT OVERWRITE TABLE static_babynames_partitioned
PARTITION(ds='{{ ds }}')
SELECT state, year, name, gender, num FROM static_babynames;
"""
t = operators.HiveOperator(task_id='basic_hql', hql=hql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

def test_presto(self):
sql = """
SELECT count(1) FROM airflow.static_babynames_partitioned;
"""
t = operators.PrestoCheckOperator(
task_id='presto_check', sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)


class CoreTest(unittest.TestCase):

def setUp(self):
Expand Down Expand Up @@ -104,6 +138,39 @@ def test_dag_views(self):
response = self.app.get(
'/admin/airflow/code?dag_id=example_bash_operator')
assert "DAG: example_bash_operator" in response.data
response = self.app.get(
'/admin/airflow/conf')
assert "Airflow Configuration" in response.data
response = self.app.get(
'/admin/airflow/rendered?'
'task_id=runme_1&dag_id=example_bash_operator&'
'execution_date=2015-01-07T00:00:00')
assert "example_bash_operator__runme_1__20150107" in response.data
response = self.app.get(
'/admin/airflow/log?task_id=run_this_last&'
'dag_id=example_bash_operator&execution_date=2015-01-01T00:00:00')
assert "Logs for run_this_last on 2015-01-01T00:00:00" in response.data
response = self.app.get(
'/admin/airflow/task?task_id=runme_0&dag_id=example_bash_operator')
assert "Attributes" in response.data
response = self.app.get(
'/admin/airflow/dag_stats')
assert "example_bash_operator" in response.data
response = self.app.get(
'/admin/airflow/action?action=clear&task_id=run_this_last&'
'dag_id=example_bash_operator&future=true&past=false&'
'upstream=true&downstream=false&'
'execution_date=2015-01-01T00:00:00&'
'origin=http%3A%2F%2Fjn8.brain.musta.ch%3A8080%2Fadmin%2Fairflow'
'%2Ftree%3Fnum_runs%3D65%26dag_id%3Dexample_bash_operator')
assert "Wait a minute" in response.data
response = self.app.get(
'/admin/airflow/action?action=clear&task_id=run_this_last&'
'dag_id=example_bash_operator&future=true&past=false&'
'upstream=true&downstream=false&'
'execution_date=2015-01-01T00:00:00&confirmed=true&'
'origin=http%3A%2F%2Fjn8.brain.musta.ch%3A8080%2Fadmin%2Fairflow'
'%2Ftree%3Fnum_runs%3D65%26dag_id%3Dexample_bash_operator')

def test_charts(self):
response = self.app.get(
Expand Down

0 comments on commit 55c268d

Please sign in to comment.