Skip to content

Commit

Permalink
Lint, fixed typos
Browse files Browse the repository at this point in the history
  • Loading branch information
mistercrunch committed Oct 6, 2014
1 parent 46579d3 commit 84c5962
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 101 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ Flux
====
Flux is a system to programmaticaly author, schedule and monitor data pipelines.

The Flux library to define workflows as directed acyclic graphs (DAGs) of data related tasks. Command line utilities make it easy to run parts of workflows interactively, and commiting pipelines into production is all it takes for the master scheduler to run the pipelines with the schedule and dependencies specified.
Use the Flux library to define workflows as directed acyclic graphs (DAGs) of data related tasks. Command line utilities make it easy to run parts of workflows interactively, and commiting pipelines into production is all it takes for the master scheduler to run the pipelines with the schedule and dependencies specified.

The Flux UI make it easy to visualize the pipelines running in production, their hierachy, progress and exposes way to pinpoint and troubleshoot issues when needed.
The Flux UI make it easy to visualize pipelines running in production, monitor progress and pinpoint and troubleshoot issues when needed.

### Principles
* **Dynamic:** Flux has intrinsec support for dynamic pipeline generation: you can write pipelines, as well as writing code that defines pipeline.
Expand All @@ -14,14 +14,14 @@ The Flux UI make it easy to visualize the pipelines running in production, their

### Concepts
##### Operators
An operator allows to generate a certain type of task on the graph. There are 3 main type of operators:
Operators allows to generate a certain type of task on the graph. There are 3 main type of operators:

* **Sensor:** Waits for events to happen, it could be a file appearing in HDFS, the existance of a Hive partition or for an arbitrary MySQL query to return a row.
* **Remote Execution:** Trigger an operation in a remote system, this could be a HQL statement in Hive, a Pig script, a map reduce job, a stored procedure in Oracle or a Bash script to run.
* **Data transfers:** Move data from a system to another. Push data from Hive to MySQL, from a local file to HDFS, from Postgres to Oracle, or anything of that nature.

##### Tasks
A task represent the instantiation of an operator and becomes a node in the directed acyclic graph (DAG). The instantiation defines specific values when calling the abstract operator. A task would be waiting for a specific partition in Hive, or triggerring a specific DML statement in Oracle.
A task represent the instantiation of an operator and becomes a node in the directed acyclic graph (DAG). The instantiation defines specific values when calling the abstract operator. A task could be waiting for a specific partition in Hive, or triggerring a specific DML statement in Oracle.

##### Task instances
A task instance represents a task run, for a specific point in time. While the task defines a start datetime and a schedule (say every hour or every day), a task instance represents a specific run of a task. A task instance will have a status of either "started", "retrying", "failed" or "success"
Expand Down
6 changes: 3 additions & 3 deletions core/bin/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def run(args):
raise Exception('dag_id could not be found')
dag = dagbag.dags[args.dag_id]
task = dag.get_task(task_id=args.task_id)

# This is enough to fail the task instance
def signal_handler(signum, frame):
logging.error("SIGINT (ctrl-c) received".format(args.task_id))
Expand Down Expand Up @@ -175,7 +175,7 @@ def master(args):
parser_backfill = subparsers.add_parser('backfill', help=ht)
parser_backfill.add_argument("dag_id", help="The id of the dag to run")
parser_backfill.add_argument(
"-t", "--task_regex",
"-t", "--task_regex",
help="The regex to filter specific task_ids to backfill (optional)")
parser_backfill.add_argument(
"-s", "--start_date", help="Overide start_date YYYY-MM-DD")
Expand All @@ -193,7 +193,7 @@ def master(args):
parser_clear = subparsers.add_parser('clear', help=ht)
parser_clear.add_argument("dag_id", help="The id of the dag to run")
parser_clear.add_argument(
"-t", "--task_regex",
"-t", "--task_regex",
help="The regex to filter specific task_ids to clear (optional)")
parser_clear.add_argument(
"-s", "--start_date", help="Overide start_date YYYY-MM-DD")
Expand Down
2 changes: 1 addition & 1 deletion core/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __init__(self):

def queue_command(self, key, command):
self.commands_to_run.append((key, command,))

def heartbeat(self):
for key, command in self.commands_to_run:
try:
Expand Down
8 changes: 3 additions & 5 deletions core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import imp
import inspect
import jinja2
import json
import logging
import os
import re
Expand Down Expand Up @@ -262,7 +261,6 @@ def ready_for_retry(self):
return self.state == State.UP_FOR_RETRY and \
self.end_date + self.task.retry_delay < datetime.now()


def run(
self, verbose=True,
ignore_dependencies=False,
Expand Down Expand Up @@ -300,7 +298,7 @@ def run(
)
elif self.state in State.runnable():
if self.state == State.UP_FOR_RETRY:
self.try_number +=1
self.try_number += 1
else:
self.try_number = 1
session.add(Log(State.RUNNING, self))
Expand Down Expand Up @@ -792,7 +790,7 @@ def clear(
return count

def sub_dag(
self, task_regex,
self, task_regex,
include_downstream=False, include_upstream=True):

dag = copy.deepcopy(self)
Expand All @@ -804,7 +802,7 @@ def sub_dag(
also_include += t.get_flat_relatives(upstream=False)
if include_upstream:
also_include += t.get_flat_relatives(upstream=True)

# Compiling the unique list of tasks that made the cut
tasks = list(set(regex_match + also_include))
dag.tasks = tasks
Expand Down
4 changes: 3 additions & 1 deletion core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ def color(cls, state):
def runnable(cls):
return [None, cls.FAILED, cls.UP_FOR_RETRY]


def validate_key(k, max_length=250):
if type(k) is not str:
raise TypeError("The key has to be a string")
raise TypeError("The key has to be a string")
elif len(k) > max_length:
raise Exception("The key has to be less than {0} characters".format(
max_length))
Expand All @@ -38,6 +39,7 @@ def validate_key(k, max_length=250):
else:
return True


def date_range(start_date, end_date=datetime.now(), delta=timedelta(1)):
l = []
if end_date >= start_date:
Expand Down
7 changes: 3 additions & 4 deletions db_reset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@
import logging

if __name__ == '__main__':
if
session = settings.Session()
session.query(models.DatabaseConnection).delete()
mysqldb = models.DatabaseConnection(
db_id='local_mysql', db_type='mysql',
host='localhost', login='flux', password='flux',
schema='flux')
db_id='local_mysql', db_type='mysql',
host='localhost', login='flux', password='flux',
schema='flux')
session.add(mysqldb)
session.commit()

Expand Down
77 changes: 0 additions & 77 deletions docs/ipython_log.py

This file was deleted.

2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ipython[all]
jinja2
markdown
mysql-python
pandas
#pandas
pygments
sphinx
sqlalchemy
14 changes: 9 additions & 5 deletions www/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
app.register_blueprint(ck, url_prefix='/ck')
app.jinja_env.add_extension("chartkick.ext.charts")


# Date filter form needed for gantt and graph view
class DateTimeForm(Form):
execution_date = DateTimeField("Execution date")
Expand Down Expand Up @@ -192,7 +193,6 @@ def graph(self):
session = settings.Session()
dag_id = request.args.get('dag_id')
dag = dagbag.dags[dag_id]
TI = models.TaskInstance

nodes = []
edges = []
Expand Down Expand Up @@ -222,10 +222,14 @@ def get_downstream(task, edges):

form = DateTimeForm(data={'execution_date': dttm})

task_instances = {ti.task_id: utils.alchemy_to_dict(ti)
for ti in dag.get_task_instances(dttm, dttm)}
tasks = {t.task_id: utils.alchemy_to_dict(t)
for t in dag.tasks}
task_instances = {
ti.task_id: utils.alchemy_to_dict(ti)
for ti in dag.get_task_instances(dttm, dttm)
}
tasks = {
t.task_id: utils.alchemy_to_dict(t)
for t in dag.tasks
}
session.commit()
session.close()

Expand Down

0 comments on commit 84c5962

Please sign in to comment.