Skip to content

Commit

Permalink
[Bash] minimal amount of isolation in a self cleaning temp directory
Browse files Browse the repository at this point in the history
  • Loading branch information
artwr committed Mar 27, 2015
1 parent 69d8afa commit 7a28d64
Showing 1 changed file with 43 additions and 14 deletions.
57 changes: 43 additions & 14 deletions airflow/operators/bash_operator.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
import logging
import errno
from tempfile import (gettempdir, mkdtemp, NamedTemporaryFile)
import shutil
from subprocess import Popen, STDOUT, PIPE
from contextlib import contextmanager

from airflow.models import BaseOperator
from airflow.utils import apply_defaults

@contextmanager
def TemporaryDirectory(suffix='',prefix=None, dir=None):
name = mkdtemp(suffix=suffix, prefix=prefix, dir=dir)
try:
yield name
finally:
try:
shutil.rmtree(name)
except OSError as exc:
# ENOENT - no such file or directory
if exc.errno != errno.ENOENT:
raise

class BashOperator(BaseOperator):
'''
Expand All @@ -27,24 +43,37 @@ def __init__(self, bash_command, *args, **kwargs):
self.bash_command = bash_command

def execute(self, context):

'''
Execute the bash command in a temporary directory
which will be cleaned afterwards
'''
bash_command = self.bash_command
logging.info("tmp dir root location: \n"+ gettempdir())
with TemporaryDirectory(prefix='airflowtmp') as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
f.write(bash_command)
f.flush()
fname = f.name
script_location=tmp_dir + "/" + fname
logging.info("Temporary script "
"location :{0}".format(script_location))
logging.info("Running command: " + bash_command)
sp = Popen(
['bash', fname],
stdout=PIPE, stderr=STDOUT,
cwd=tmp_dir)

logging.info("Runnning command: " + bash_command)
sp = Popen(
['bash', '-c', bash_command],
stdout=PIPE, stderr=STDOUT)

self.sp = sp
self.sp = sp

logging.info("Output:")
for line in iter(sp.stdout.readline, ''):
logging.info(line.strip())
sp.wait()
logging.info("Command exited with return code %d", sp.returncode)
logging.info("Output:")
for line in iter(sp.stdout.readline, ''):
logging.info(line.strip())
sp.wait()
logging.info("Command exited with "
"return code {0}".format(sp.returncode))

if sp.returncode:
raise Exception("Bash command failed")
if sp.returncode:
raise Exception("Bash command failed")

def on_kill(self):
logging.info('Sending SIGTERM signal to bash subprocess')
Expand Down

0 comments on commit 7a28d64

Please sign in to comment.