Skip to content

Commit

Permalink
Merge pull request apache#882 from asnir/docker_operator
Browse files Browse the repository at this point in the history
Docker operator
  • Loading branch information
mistercrunch committed Jan 29, 2016
2 parents 980e7c3 + 7f74bf8 commit eac1edd
Show file tree
Hide file tree
Showing 13 changed files with 444 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/*
*.bkp
*.egg-info
*.pyc
Expand Down
84 changes: 84 additions & 0 deletions airflow/example_dags/docker_copy_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'''
This sample "listen to directory". move the new file and print it, using docker-containers.
The following operators are being used: DockerOperator, BashOperator & ShortCircuitOperator.
TODO: Review the workflow, change it accordingly to to your environment & enable the code.
'''

# from __future__ import print_function
#
# from airflow import DAG
# import airflow
# from datetime import datetime, timedelta
# from airflow.operators import BashOperator
# from airflow.operators import ShortCircuitOperator
# from airflow.operators.docker_operator import DockerOperator
#
# default_args = {
# 'owner': 'airflow',
# 'depends_on_past': False,
# 'start_date': datetime.now(),
# 'email': ['[email protected]'],
# 'email_on_failure': False,
# 'email_on_retry': False,
# 'retries': 1,
# 'retry_delay': timedelta(minutes=5),
# }
#
# dag = DAG(
# 'docker_sample_copy_data', default_args=default_args, schedule_interval=timedelta(minutes=10))
#
# locate_file_cmd = """
# sleep 10
# find {{params.source_location}} -type f -printf "%f\n" | head -1
# """
#
# t_view = BashOperator(
# task_id='view_file',
# bash_command=locate_file_cmd,
# xcom_push=True,
# params={'source_location': '/your/input_dir/path'},
# dag=dag)
#
#
# def is_data_available(*args, **kwargs):
# ti = kwargs['ti']
# data = ti.xcom_pull(key=None, task_ids='view_file')
# return not data == ''
#
#
# t_is_data_available = ShortCircuitOperator(
# task_id='check_if_data_available',
# provide_context=True,
# python_callable=is_data_available,
# dag=dag)
#
# t_move = DockerOperator(
# api_version='1.19',
# docker_url='tcp://localhost:2375', # replace it with swarm/docker endpoint
# image='centos:latest',
# network_mode='bridge',
# volumes=['/your/host/input_dir/path:/your/input_dir/path',
# '/your/host/output_dir/path:/your/output_dir/path'],
# command='./entrypoint.sh',
# task_id='move_data',
# xcom_push=True,
# params={'source_location': '/your/input_dir/path',
# 'target_location': '/your/output_dir/path'},
# dag=dag)
#
# print_templated_cmd = """
# cat {{ ti.xcom_pull('move_data') }}
# """
#
# t_print = DockerOperator(
# api_version='1.19',
# docker_url='tcp://localhost:2375',
# image='centos:latest',
# volumes=['/your/host/output_dir/path:/your/output_dir/path'],
# command=print_templated_cmd,
# task_id='print',
# dag=dag)
#
# t_view.set_downstream(t_is_data_available)
# t_is_data_available.set_downstream(t_move)
# t_move.set_downstream(t_print)
1 change: 1 addition & 0 deletions airflow/example_dags/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
["/bin/bash", "-c", "/bin/sleep 30; /bin/mv {{params.source_location}}/{{ ti.xcom_pull('view_file') }} {{params.target_location}}; /bin/echo '{{params.target_location}}/{{ ti.xcom_pull('view_file') }}';"]
48 changes: 48 additions & 0 deletions airflow/example_dags/example_docker_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from airflow import DAG
from airflow.operators import BashOperator
from datetime import datetime, timedelta
from airflow.operators.docker_operator import DockerOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

dag = DAG(
'docker_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))

t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)

t2 = BashOperator(
task_id='sleep',
bash_command='sleep 5',
retries=3,
dag=dag)

t3 = DockerOperator(api_version='1.19',
docker_url='tcp://localhost:2375', #Set your docker URL
command='/bin/sleep 30',
image='centos:latest',
network_mode='bridge',
task_id='docker_op_tester',
dag=dag)


t4 = BashOperator(
task_id='print_hello',
bash_command='echo "hello world!!!"',
dag=dag)


t1.set_downstream(t2)
t1.set_downstream(t3)
t3.set_downstream(t4)
181 changes: 181 additions & 0 deletions airflow/operators/docker_operator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import json
import logging
from airflow.models import BaseOperator
from airflow.utils import apply_defaults, AirflowException, TemporaryDirectory
from docker import Client, tls
import ast


class DockerOperator(BaseOperator):
"""
Execute a command inside a docker container.
A temporary directory is created on the host and mounted into a container to allow storing files
that together exceed the default disk size of 10GB in a container. The path to the mounted
directory can be accessed via the environment variable ``AIRFLOW_TMP_DIR``.
:param image: Docker image from which to create the container.
:type image: str
:param api_version: Remote API version.
:type api_version: str
:param command: Command to be run in the container.
:type command: str or list
:param cpus: Number of CPUs to assign to the container.
This value gets multiplied with 1024. See
https://docs.docker.com/engine/reference/run/#cpu-share-constraint
:type cpus: float
:param docker_url: URL of the host running the docker daemon.
:type docker_url: str
:param environment: Environment variables to set in the container.
:type environment: dict
:param force_pull: Pull the docker image on every run.
:type force_pull: bool
:param mem_limit: Maximum amount of memory the container can use. Either a float value, which
represents the limit in bytes, or a string like ``128m`` or ``1g``.
:type mem_limit: float or str
:param network_mode: Network mode for the container.
:type network_mode: str
:param tls_ca_cert: Path to a PEM-encoded certificate authority to secure the docker connection.
:type tls_ca_cert: str
:param tls_client_cert: Path to the PEM-encoded certificate used to authenticate docker client.
:type tls_client_cert: str
:param tls_client_key: Path to the PEM-encoded key used to authenticate docker client.
:type tls_client_key: str
:param tls_hostname: Hostname to match against the docker server certificate or False to
disable the check.
:type tls_hostname: str or bool
:param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
:type tls_ssl_version: str
:param tmp_dir: Mount point inside the container to a temporary directory created on the host by
the operator. The path is also made available via the environment variable
``AIRFLOW_TMP_DIR`` inside the container.
:type tmp_dir: str
:param user: Default user inside the docker container.
:type user: int or str
:param volumes: List of volumes to mount into the container, e.g.
``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
:param xcom_push: Does the stdout will be pushed to the next step using XCom.
The default is False.
:type xcom_push: bool
:param xcom_all: Push all the stdout or just the last line. The default is False (last line).
:type xcom_all: bool
"""
template_fields = ('command',)
template_ext = ('.sh', '.bash',)

@apply_defaults
def __init__(
self,
image,
api_version=None,
command=None,
cpus=1.0,
docker_url='unix://var/run/docker.sock',
environment=None,
force_pull=False,
mem_limit=None,
network_mode=None,
tls_ca_cert=None,
tls_client_cert=None,
tls_client_key=None,
tls_hostname=None,
tls_ssl_version=None,
tmp_dir='/tmp/airflow',
user=None,
volumes=None,
xcom_push=False,
xcom_all=False,
*args,
**kwargs):

super(DockerOperator, self).__init__(*args, **kwargs)
self.api_version = api_version
self.command = command
self.cpus = cpus
self.docker_url = docker_url
self.environment = environment or {}
self.force_pull = force_pull
self.image = image
self.mem_limit = mem_limit
self.network_mode = network_mode
self.tls_ca_cert = tls_ca_cert
self.tls_client_cert = tls_client_cert
self.tls_client_key = tls_client_key
self.tls_hostname = tls_hostname
self.tls_ssl_version = tls_ssl_version
self.tmp_dir = tmp_dir
self.user = user
self.volumes = volumes or []
self.xcom_push = xcom_push
self.xcom_all = xcom_all

self.cli = None
self.container = None

def execute(self, context):
logging.info('Starting docker container from image ' + self.image)

tls_config = None
if self.tls_ca_cert and self.tls_client_cert and self.tls_client_key:
tls_config = tls.TLSConfig(
ca_cert=self.tls_ca_cert,
client_cert=(self.tls_client_cert, self.tls_client_key),
verify=True,
ssl_version=self.tls_ssl_version,
assert_hostname=self.tls_hostname
)
self.docker_url = self.docker_url.replace('tcp://', 'https://')

self.cli = Client(base_url=self.docker_url, version=self.api_version, tls=tls_config)

if ':' not in self.image:
image = self.image + ':latest'
else:
image = self.image

if self.force_pull or len(self.cli.images(name=image)) == 0:
logging.info('Pulling docker image ' + image)
for l in self.cli.pull(image, stream=True):
output = json.loads(l)
logging.info("{}".format(output['status']))

cpu_shares = int(round(self.cpus * 1024))

with TemporaryDirectory(prefix='airflowtmp') as host_tmp_dir:
self.environment['AIRFLOW_TMP_DIR'] = self.tmp_dir
self.volumes.append('{0}:{1}'.format(host_tmp_dir, self.tmp_dir))

self.container = self.cli.create_container(
command=self.get_command(),
cpu_shares=cpu_shares,
environment=self.environment,
host_config=self.cli.create_host_config(binds=self.volumes,
network_mode=self.network_mode),
image=image,
mem_limit=self.mem_limit,
user=self.user
)
self.cli.start(self.container['Id'])

line = ''
for line in self.cli.logs(container=self.container['Id'], stream=True):
logging.info("{}".format(line.strip()))

exit_code = self.cli.wait(self.container['Id'])
if exit_code != 0:
raise AirflowException('docker container failed')

if self.xcom_push:
return self.cli.logs(container=self.container['Id']) if self.xcom_all else str(line.strip())

def get_command(self):
if self.command is not None and self.command.strip().find('[') == 0:
commands = ast.literal_eval(self.command)
else:
commands = self.command
return commands

def on_kill(self):
if self.cli is not None:
logging.info('Stopping docker container')
self.cli.stop(self.container['Id'])
1 change: 1 addition & 0 deletions docs/code.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Operator API
BashOperator,
BranchPythonOperator,
TriggerDagRunOperator,
DockerOperator,
DummyOperator,
EmailOperator,
ExternalTaskSensor,
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ coverage
coveralls
croniter
dill
docker-py
filechunkio
flake8
flask
Expand Down Expand Up @@ -53,3 +54,4 @@ lxml
pykerberos
bcrypt
flask-bcrypt
mock
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def run(self):
'sphinx-rtd-theme>=0.1.6',
'Sphinx-PyPI-upload>=0.2.1'
]
docker = ['docker-py>=1.6.0']
druid = ['pydruid>=0.2.1']
hdfs = ['snakebite>=2.4.13']
webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4']
Expand Down Expand Up @@ -84,7 +85,7 @@ def run(self):
qds = ['qds-sdk>=1.9.0']

all_dbs = postgres + mysql + hive + mssql + hdfs + vertica
devel = all_dbs + doc + samba + s3 + ['nose'] + slack + crypto + oracle
devel = all_dbs + doc + samba + s3 + ['nose'] + slack + crypto + oracle + docker

setup(
name='airflow',
Expand Down Expand Up @@ -126,6 +127,7 @@ def run(self):
'crypto': crypto,
'devel': devel,
'doc': doc,
'docker': docker,
'druid': druid,
'hdfs': hdfs,
'hive': hive,
Expand Down
1 change: 1 addition & 0 deletions tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import absolute_import
from .core import *
from .models import *
from .operators import *
2 changes: 1 addition & 1 deletion tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from lxml import html
from airflow.utils import AirflowException

NUM_EXAMPLE_DAGS = 7
NUM_EXAMPLE_DAGS = 8
DEV_NULL = '/dev/null'
DEFAULT_DATE = datetime(2015, 1, 1)
DEFAULT_DATE_ISO = DEFAULT_DATE.isoformat()
Expand Down
2 changes: 1 addition & 1 deletion tests/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_get_existing_dag(self):
assert dag is not None
assert dag.dag_id == dag_id

assert dagbag.size() == 7
assert dagbag.size() == 8

def test_get_non_existing_dag(self):
"""
Expand Down
1 change: 1 addition & 0 deletions tests/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .docker_operator import *
Loading

0 comments on commit eac1edd

Please sign in to comment.