Skip to content

Commit

Permalink
Improving docs and packaging for pypi
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Jan 17, 2015
1 parent cb2a0e5 commit 927fe04
Show file tree
Hide file tree
Showing 15 changed files with 108 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ env
initdb.py
dbinit.py
logs
*.cfg
airflow.cfg
MANIFEST
secrets.py
*.egg-info
Expand Down
8 changes: 8 additions & 0 deletions airflow/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ def execute_command(command):


class CeleryExecutor(BaseExecutor):
'''
CeleryExecutor is recommended for production use of Airflow. It allows
distributing the exectuion of task instances to multiple worker nodes.
Celery is a simple, flexible and reliable distributed system to process
vast amounts of messages, while providing operations with the tools
required to maintain such a system.
'''

def start(self):
self.tasks = {}
Expand Down
9 changes: 9 additions & 0 deletions airflow/executors/local_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,17 @@ def run(self):


class LocalExecutor(BaseExecutor):
'''
LocalExecutor executes tasks locally in parallel. It uses the
multiprocessing Python library and queues to parallelize the execution
of tasks.
'''

def __init__(self, parallelism=16):
'''
:param parallelism: Number of processes running simultanously
:type parallelism: int
'''
super(LocalExecutor, self).__init__()
self.parallelism = parallelism

Expand Down
7 changes: 6 additions & 1 deletion airflow/executors/sequential_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@

class SequentialExecutor(BaseExecutor):
"""
Will only run one task instance at a time, can be used for debugging.
This executor will only run one task instance at a time, can be used
for debugging. It is also the only executor that can be used with sqlite
since sqlite doesn't support multiple connections.
Since we want airflow to work out of the box, it default to this
SequentialExecutor alongside sqlite as you first install it.
"""
def __init__(self):
super(SequentialExecutor, self).__init__()
Expand Down
10 changes: 10 additions & 0 deletions airflow/hooks/hive_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@


class HiveHook(BaseHook):
'''
Interact with Hive. This class is both a wrapper around the Hive Thrift
client and the Hive CLI.
'''
def __init__(self,
hive_conn_id=conf.get('hooks', 'HIVE_DEFAULT_CONN_ID')):
session = settings.Session()
Expand Down Expand Up @@ -53,6 +57,9 @@ def __setstate__(self, d):
self.__dict__['hive'] = self.get_hive_client()

def get_hive_client(self):
'''
Returns a Hive thrift client.
'''
transport = TSocket.TSocket(self.host, self.port)
transport = TTransport.TBufferedTransport(transport)
protocol = TBinaryProtocol.TBinaryProtocol(transport)
Expand All @@ -72,6 +79,9 @@ def check_for_partition(self, schema, table, partition):
return False

def get_records(self, hql, schema=None):
'''
Get a set of records from a Hive query.
'''
self.hive._oprot.trans.open()
if schema:
self.hive.execute("USE " + schema)
Expand Down
9 changes: 9 additions & 0 deletions airflow/hooks/mysql_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@


class MySqlHook(object):
'''
Interact with MySQL.
'''

def __init__(
self, host=None, login=None,
Expand Down Expand Up @@ -38,6 +41,9 @@ def get_conn(self):
return conn

def get_records(self, sql):
'''
Executes the sql and returns a set of records.
'''
conn = self.get_conn()
cur = conn.cursor()
cur.execute(sql)
Expand All @@ -47,6 +53,9 @@ def get_records(self, sql):
return rows

def get_pandas_df(self, sql):
'''
Executes the sql and returns a pandas dataframe
'''
import pandas.io.sql as psql
conn = self.get_conn()
df = psql.read_sql(sql, con=conn)
Expand Down
16 changes: 16 additions & 0 deletions airflow/hooks/presto_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,34 @@ def __init__(self, host=None, db=None, port=None,
session.close() # currently only a pass in pyhive

def get_cursor(self):
'''
Returns a cursor.
'''
return self.cursor

@staticmethod
def _strip_sql(sql):
return sql.strip().rstrip(';')

def get_records(self, hql, parameters=None):
'''
Get a set of records from Presto
'''
self.cursor.execute(self._strip_sql(hql), parameters)
return self.cursor.fetchall()

def get_first(self, hql, parameters=None):
'''
Returns only the first row, regardless of how many rows the query
returns.
'''
self.cursor.execute(self._strip_sql(hql), parameters)
return self.cursor.fetchone()

def get_pandas_df(self, hql, parameters=None):
'''
Get a pandas dataframe from a sql query.
'''
import pandas
cursor = self.get_cursor()
cursor.execute(self._strip_sql(hql), parameters)
Expand All @@ -64,4 +77,7 @@ def get_pandas_df(self, hql, parameters=None):
return df

def run(self, hql, parameters=None):
'''
Execute the statement against Presto. Can be used to create views.
'''
self.cursor.execute(self._strip_sql(hql), parameters)
21 changes: 11 additions & 10 deletions airflow/operators/email_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,6 @@


class EmailOperator(BaseOperator):
'''
Sends an email.
:param to: list of emails to send the email to
:type to: list or string (comma or semicolon delimited)
:param subject: subject line for the email (templated)
:type subject: string
:param html_content: content of the email (templated), html markup
is allowed
:type html_content: string
'''

template_fields = ('subject', 'html_content')

Expand All @@ -28,6 +18,17 @@ def __init__(
subject,
html_content,
*args, **kwargs):
"""
Sends an email.
:param to: list of emails to send the email to
:type to: list or string (comma or semicolon delimited)
:param subject: subject line for the email (templated)
:type subject: string
:param html_content: content of the email (templated), html markup
is allowed
:type html_content: string
"""
super(EmailOperator, self).__init__(*args, **kwargs)
self.to = to
self.subject = subject
Expand Down
3 changes: 0 additions & 3 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
BASE_FOLDER = conf.get('core', 'BASE_FOLDER')
BASE_LOG_URL = "/admin/airflow/log"
SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
print "-"* 100
print(SQL_ALCHEMY_CONN)
print "-"* 100
if BASE_FOLDER not in sys.path:
sys.path.append(BASE_FOLDER)

Expand Down
21 changes: 11 additions & 10 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,25 @@ persisted in the database.

.. automodule:: airflow.models
:show-inheritance:
:members: DAG, BaseOperator, TaskInstance, DagBag, DatabaseConnection
:members: DAG, BaseOperator, TaskInstance, DagBag, Connection

Hooks
-----
.. automodule:: airflow.hooks
:show-inheritance:
:members: MySqlHook, PrestoHook, HiveHook

Operators
---------
Operators allows to generate a certain type of task on the graph.

.. automodule:: airflow.operators
:show-inheritance:
:members: MySqlOperator, BashOperator, ExternalTaskSensor, HiveOperator, SqlSensor, HivePartitionSensor

Hooks
-----
.. automodule:: airflow.hooks
:show-inheritance:
:members: MySqlHook
:members: MySqlOperator, BashOperator, ExternalTaskSensor, HiveOperator, SqlSensor, HivePartitionSensor, EmailOperator

Executors
---------
Executors are the mechanism by which task instances get run.
.. automodule:: airflow.executors
:show-inheritance:
:members: LocalExecutor, SequentialExecutor
:show-inheritance:
:members: LocalExecutor, CeleryExecutor, SequentialExecutor
12 changes: 6 additions & 6 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# serve to show the default.

import sys
import os
from airflow import settings

# If extensions (or modules to document with autodoc) are in another directory,
Expand Down Expand Up @@ -247,11 +246,12 @@
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'Airflow', u'Airflow Documentation',
u'Maxime Beauchemin', 'Airflow', 'One line description of project.',
'Miscellaneous'),
]
texinfo_documents = [(
'index', 'Airflow', u'Airflow Documentation',
u'Maxime Beauchemin', 'Airflow',
'Airflow is a system to programmaticaly author, schedule and monitor data pipelines.',
'Miscellaneous'
),]

# Documents to append as an appendix to all manuals.
#texinfo_appendices = []
Expand Down
Binary file added docs/img/pin_large.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
.. image:: img/pin_large.png
:width: 70
Airflow's Documentation
================================

Airflow is a system to programmaticaly author, schedule and monitor data pipelines.

Use the Airflow library to define workflows as directed acyclic graphs (DAGs) of data related tasks. Command line utilities make it easy to run parts of workflows interactively, and commiting pipelines into production is all it takes for the master scheduler to run the pipelines with the schedule and dependencies specified.
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ requests
setproctitle
sphinx
sphinx_rtd_theme
Sphinx-PyPI-upload
sqlalchemy
thrift
17 changes: 17 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[metadata]
name = Airflow
summary = Airflow is a system to programmaticaly author, schedule and monitor data pipelines.
author = Maxime Beauchemin
author-email = [email protected]

[files]
packages = airflow

[build_sphinx]
source-dir = docs/
build-dir = docs/_build
all_files = 1

[upload_sphinx]
upload-dir = docs/_build/html

0 comments on commit 927fe04

Please sign in to comment.