Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
bolkedebruin committed Jun 16, 2016
2 parents fb89276 + 06e70e2 commit 949479c
Show file tree
Hide file tree
Showing 11 changed files with 225 additions and 32 deletions.
11 changes: 8 additions & 3 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ def version(args): # noqa

def flower(args):
broka = conf.get('celery', 'BROKER_URL')
address = '--address={}'.format(args.hostname)
port = '--port={}'.format(args.port)
api = ''
if args.broker_api:
Expand All @@ -605,15 +606,15 @@ def flower(args):
)

with ctx:
os.execvp("flower", ['flower', '-b', broka, port, api])
os.execvp("flower", ['flower', '-b', broka, address, port, api])

stdout.close()
stderr.close()
else:
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)

os.execvp("flower", ['flower', '-b', broka, port, api])
os.execvp("flower", ['flower', '-b', broka, address, port, api])


def kerberos(args): # noqa
Expand Down Expand Up @@ -856,6 +857,10 @@ class CLIFactory(object):
default=conf.get('celery', 'celeryd_concurrency')),
# flower
'broker_api': Arg(("-a", "--broker_api"), help="Broker api"),
'flower_hostname': Arg(
("-hn", "--hostname"),
default=conf.get('celery', 'FLOWER_HOST'),
help="Set the hostname on which to run the server"),
'flower_port': Arg(
("-p", "--port"),
default=conf.get('celery', 'FLOWER_PORT'),
Expand Down Expand Up @@ -973,7 +978,7 @@ class CLIFactory(object):
}, {
'func': flower,
'help': "Start a Celery Flower",
'args': ('flower_port', 'broker_api',
'args': ('flower_hostname', 'flower_port', 'broker_api',
'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
}, {
'func': version,
Expand Down
7 changes: 6 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ def run_command(command):
'celery_result_backend': 'db+mysql://airflow:airflow@localhost:3306/airflow',
'celeryd_concurrency': 16,
'default_queue': 'default',
'flower_host': '0.0.0.0',
'flower_port': '5555',
'worker_log_server_port': '8793',
},
Expand Down Expand Up @@ -337,7 +338,10 @@ def run_command(command):
celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the port that Celery Flower runs on
# it `airflow flower`. This defines the IP that Celery Flower runs on
flower_host = 0.0.0.0
# This defines the port that Celery Flower runs on
flower_port = 5555
# Default queue that tasks get assigned to and that worker listen on.
Expand Down Expand Up @@ -447,6 +451,7 @@ def run_command(command):
worker_log_server_port = 8793
broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
flower_host = 0.0.0.0
flower_port = 5555
default_queue = default
Expand Down
3 changes: 2 additions & 1 deletion airflow/contrib/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
'gcs_hook': ['GoogleCloudStorageHook'],
'datastore_hook': ['DatastoreHook'],
'gcp_dataproc_hook': ['DataProcHook'],
'cloudant_hook': ['CloudantHook']
'cloudant_hook': ['CloudantHook'],
'fs_hook': ['FSHook']
}

_import_module_attrs(globals(), _hooks)
41 changes: 41 additions & 0 deletions airflow/contrib/hooks/fs_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from airflow.hooks.base_hook import BaseHook


class FSHook(BaseHook):
'''
Allows for interaction with an file server.
Connection should have a name and a path specified under extra:
example:
Conn Id: fs_test
Conn Type: File (path)
Host, Shchema, Login, Password, Port: empty
Extra: {"path": "/tmp"}
'''

def __init__(self, conn_id='fs_default'):
conn = self.get_connection(conn_id)
self.basepath = conn.extra_dejson.get('path', '')
self.conn = conn

def get_conn(self):
pass

def get_path(self):
return self.basepath
3 changes: 2 additions & 1 deletion airflow/contrib/operators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
'ssh_execute_operator': ['SSHExecuteOperator'],
'vertica_operator': ['VerticaOperator'],
'vertica_to_hive': ['VerticaToHiveTransfer'],
'qubole_operator': ['QuboleOperator']
'qubole_operator': ['QuboleOperator'],
'fs': ['FileSensor']
}

_import_module_attrs(globals(), _operators)
57 changes: 57 additions & 0 deletions airflow/contrib/operators/fs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from os import walk
import logging

from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.hooks import FSHook
from airflow.utils.decorators import apply_defaults

class FileSensor(BaseSensorOperator):
"""
Waits for a file or folder to land in a filesystem
:param fs_conn_id: reference to the File (path)
connection id
:type fs_conn_id: string
:param filepath: File or folder name (relative to
the base path set within the connection)
:type fs_conn_id: string
"""
template_fields = ('filepath',)

@apply_defaults
def __init__(
self,
filepath,
fs_conn_id='fs_default2',
*args, **kwargs):
super(FileSensor, self).__init__(*args, **kwargs)
self.filepath = filepath
self.fs_conn_id = fs_conn_id

def poke(self, context):
hook = FSHook(self.fs_conn_id)
basepath = hook.get_path()
full_path = "/".join([basepath, self.filepath])
logging.info(
'Poking for file {full_path} '.format(**locals()))
try:
files = [f for f in walk(full_path)]
except:
return False
return True

4 changes: 4 additions & 0 deletions airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def initdb():
models.Connection(
conn_id='ssh_default', conn_type='ssh',
host='localhost'))
merge_conn(
models.Connection(
conn_id='fs_default', conn_type='fs',
extra='{"path": "/"}'))

# Known event types
KET = models.KnownEventType
Expand Down
1 change: 1 addition & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -2263,6 +2263,7 @@ class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView):
}
form_choices = {
'conn_type': [
('fs', 'File (path)'),
('ftp', 'FTP',),
('google_cloud_platform', 'Google Cloud Platform'),
('hdfs', 'HDFS',),
Expand Down
16 changes: 16 additions & 0 deletions tests/contrib/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,18 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import absolute_import
from .ssh_execute_operator import *
from .fs_operator import *
64 changes: 64 additions & 0 deletions tests/contrib/operators/fs_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import unittest
from datetime import datetime

from airflow import configuration
from airflow.settings import Session
from airflow import models, DAG
from airflow.contrib.operators.fs_operator import FileSensor

TEST_DAG_ID = 'unit_tests'
DEFAULT_DATE = datetime(2015, 1, 1)
configuration.test_mode()


def reset(dag_id=TEST_DAG_ID):
session = Session()
tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id)
tis.delete()
session.commit()
session.close()

reset()

class FileSensorTest(unittest.TestCase):
def setUp(self):
configuration.test_mode()
from airflow.contrib.hooks.fs_hook import FSHook
hook = FSHook()
args = {
'owner': 'airflow',
'start_date': DEFAULT_DATE,
'provide_context': True
}
dag = DAG(TEST_DAG_ID+'test_schedule_dag_once', default_args=args)
dag.schedule_interval = '@once'
self.hook = hook
self.dag = dag

def test_simple(self):
task = FileSensor(
task_id="test",
filepath="etc/hosts",
fs_conn_id='fs_default',
_hook=self.hook,
dag=self.dag,
)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

if __name__ == '__main__':
unittest.main()
50 changes: 24 additions & 26 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1587,33 +1587,31 @@ def test_presto(self):
task_id='presto_check', sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

def test_presto_to_mysql(self):
t = operators.PrestoToMySqlTransfer(
task_id='presto_to_mysql_check',
sql="""
SELECT name, count(*) as ccount
FROM airflow.static_babynames
GROUP BY name
""",
mysql_table='test_static_babynames',
mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

def test_presto_to_mysql(self):
t = operators.PrestoToMySqlTransfer(
task_id='presto_to_mysql_check',
sql="""
SELECT name, count(*) as ccount
FROM airflow.static_babynames
GROUP BY name
""",
mysql_table='test_static_babynames',
mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)


def test_presto_to_mysql(self):
t = operators.PrestoToMySqlTransfer(
task_id='presto_to_mysql_check',
sql="""
SELECT name, count(*) as ccount
FROM airflow.static_babynames
GROUP BY name
""",
mysql_table='test_static_babynames',
mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)
def test_presto_to_mysql(self):
t = operators.PrestoToMySqlTransfer(
task_id='presto_to_mysql_check',
sql="""
SELECT name, count(*) as ccount
FROM airflow.static_babynames
GROUP BY name
""",
mysql_table='test_static_babynames',
mysql_preoperator='TRUNCATE TABLE test_static_babynames;',
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, force=True)

def test_hdfs_sensor(self):
t = operators.HdfsSensor(
Expand Down

0 comments on commit 949479c

Please sign in to comment.