Skip to content

Commit

Permalink
Make sure to include the s3 improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
bolkedebruin committed Oct 28, 2015
1 parent 5c74cb9 commit 25ecdfb
Showing 1 changed file with 34 additions and 10 deletions.
44 changes: 34 additions & 10 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,9 @@ def log(self):
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dag = dagbag.get_dag(dag_id)
log_relative = "/{dag_id}/{task_id}/{execution_date}".format(
log_relative = "{dag_id}/{task_id}/{execution_date}".format(
**locals())
loc = BASE_LOG_FOLDER + log_relative
loc = os.path.join(BASE_LOG_FOLDER, log_relative)
loc = loc.format(**locals())
log = ""
TI = models.TaskInstance
Expand All @@ -731,28 +731,52 @@ def log(self):
TI.execution_date == dttm).first()
dttm = dateutil.parser.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})

if ti:
host = ti.hostname
log_loaded = False

if socket.gethostname() == host:
try:
f = open(loc)
log += "".join(f.readlines())
f.close()
log_loaded = True
except:
log = "Log file isn't where expected.\n".format(loc)
log = "*** Log file isn't where expected.\n".format(loc)
else:
WORKER_LOG_SERVER_PORT = \
conf.get('celery', 'WORKER_LOG_SERVER_PORT')
url = (
"http://{host}:{WORKER_LOG_SERVER_PORT}/log"
"{log_relative}").format(**locals())
log += "Log file isn't local.\n"
log += "Fetching here: {url}\n".format(**locals())
url = os.path.join(
"http://{host}:{WORKER_LOG_SERVER_PORT}/log", log_relative
).format(**locals())
log += "*** Log file isn't local.\n"
log += "*** Fetching here: {url}\n".format(**locals())
try:
import requests
log += requests.get(url).text
log += '\n' + requests.get(url).text
log_loaded = True
except:
log += "Failed to fetch log file.".format(**locals())
log += "*** Failed to fetch log file from worker.\n".format(
**locals())

# try to load log backup from S3
s3_log_folder = conf.get('core', 'S3_LOG_FOLDER')
if not log_loaded and s3_log_folder.startswith('s3:'):
import boto
s3 = boto.connect_s3()
s3_log_loc = os.path.join(
conf.get('core', 'S3_LOG_FOLDER'), log_relative)
log += '*** Fetching log from S3: {}\n'.format(s3_log_loc)
log += ('*** Note: S3 logs are only available once '
'tasks have completed.\n')
bucket, key = s3_log_loc.lstrip('s3:/').split('/', 1)
s3_key = boto.s3.key.Key(s3.get_bucket(bucket), key)
if s3_key.exists():
log += '\n' + s3_key.get_contents_as_string().decode()
else:
log += '*** No log found on S3.\n'

session.commit()
session.close()
log = log.decode('utf-8') if PY2 else log
Expand Down

0 comments on commit 25ecdfb

Please sign in to comment.