Skip to content

Commit

Permalink
Provide automatic URL tracking for Spark applications (spotify#2661)
Browse files Browse the repository at this point in the history
  • Loading branch information
GoodDok authored and Tarrasch committed Mar 5, 2019
1 parent 6f32570 commit 3701c68
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 8 deletions.
79 changes: 72 additions & 7 deletions luigi/contrib/external_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@

import logging
import os
import re
import signal
import subprocess
import sys
import tempfile
from contextlib import contextmanager
from multiprocessing import Process
from time import sleep

import luigi
from luigi.parameter import ParameterVisibility

logger = logging.getLogger('luigi-interface')

Expand All @@ -56,11 +61,33 @@ class ExternalProgramTask(luigi.Task):
By default, the output (stdout and stderr) of the run external program
is being captured and displayed after the execution has ended. This
behaviour can be overriden by passing ``--capture-output False``
behaviour can be overridden by passing ``--capture-output False``
"""

capture_output = luigi.BoolParameter(default=True, significant=False, positional=False)

stream_for_searching_tracking_url = luigi.parameter.ChoiceParameter(
var_type=str, choices=['none', 'stdout', 'stderr'], default='none',
significant=False, positional=False, visibility=ParameterVisibility.HIDDEN,
description="Stream for searching tracking URL")
"""
Used for defining which stream should be tracked for URL, may be set to 'stdout', 'stderr' or 'none'.
Default value is 'none', so URL tracking is not performed.
"""

tracking_url_pattern = luigi.OptionalParameter(
default=None, significant=False, positional=False, visibility=ParameterVisibility.HIDDEN,
description="Regex pattern used for searching URL in the logs of the external program")
"""
Regex pattern used for searching URL in the logs of the external program.
If a log line matches the regex, the first group in the matching is set as the tracking URL
for the job in the web UI. Example: 'Job UI is here: (https?://.*)'.
Default value is None, so URL tracking is not performed.
"""

def program_args(self):
"""
Override this method to map your task parameters to the program arguments
Expand Down Expand Up @@ -97,17 +124,19 @@ def run(self):
logger.info('Running command: %s', ' '.join(args))
env = self.program_environment()
kwargs = {'env': env}
tmp_stdout, tmp_stderr = None, None
if self.capture_output:
tmp_stdout, tmp_stderr = tempfile.TemporaryFile(), tempfile.TemporaryFile()
kwargs.update({'stdout': tmp_stdout, 'stderr': tmp_stderr})
proc = subprocess.Popen(
args,
**kwargs
)

try:
with ExternalProgramRunContext(proc):
proc.wait()
if self.stream_for_searching_tracking_url != 'none' and self.tracking_url_pattern is not None:
with self._proc_with_tracking_url_context(proc_args=args, proc_kwargs=kwargs) as proc:
proc.wait()
else:
proc = subprocess.Popen(args, **kwargs)
with ExternalProgramRunContext(proc):
proc.wait()
success = proc.returncode == 0

if self.capture_output:
Expand All @@ -131,6 +160,42 @@ def run(self):
tmp_stderr.close()
tmp_stdout.close()

@contextmanager
def _proc_with_tracking_url_context(self, proc_args, proc_kwargs):
time_to_sleep = 0.5
file_to_write = proc_kwargs.get(self.stream_for_searching_tracking_url)
proc_kwargs.update({self.stream_for_searching_tracking_url: subprocess.PIPE})
main_proc = subprocess.Popen(proc_args, **proc_kwargs)
pipe_to_read = main_proc.stderr if self.stream_for_searching_tracking_url == 'stderr' else main_proc.stdout

def _track_url_by_pattern():
"""
Scans the pipe looking for a passed pattern, if the pattern is found, `set_tracking_url` callback is sent.
If tmp_stdout is passed, also appends lines to this file.
"""
pattern = re.compile(self.tracking_url_pattern)
for new_line in iter(pipe_to_read.readline, ''):
if new_line:
if file_to_write:
file_to_write.write(new_line)
match = re.search(pattern, new_line.decode('utf-8'))
if match:
self.set_tracking_url(match.group(1))
else:
sleep(time_to_sleep)

track_proc = Process(target=_track_url_by_pattern)
try:
track_proc.start()
with ExternalProgramRunContext(main_proc):
yield main_proc
finally:
# need to wait a bit to let the subprocess read the last lines
track_proc.join(time_to_sleep * 2)
if track_proc.is_alive():
track_proc.terminate()
pipe_to_read.close()


class ExternalProgramRunContext(object):
def __init__(self, proc):
Expand Down
11 changes: 11 additions & 0 deletions luigi/contrib/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ class SparkSubmitTask(ExternalProgramTask):

# Only log stderr if spark fails (since stderr is normally quite verbose)
always_log_stderr = False
# Spark applications write its logs into stderr
track_url_in_stderr = True

def run(self):
if self.deploy_mode == "cluster":
# in cluster mode client only receives application status once a period of time
self.tracking_url_pattern = r"tracking URL: (https?://.*)\s"
else:
self.tracking_url_pattern = r"Bound (?:.*) to (?:.*), and started at (https?://.*)\s"

super(SparkSubmitTask, self).run()

def app_options(self):
"""
Expand Down
79 changes: 78 additions & 1 deletion test/contrib/external_program_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
#
import os
import shutil
import subprocess
import tempfile
from functools import partial
from multiprocessing import Value

from helpers import unittest
import luigi
Expand All @@ -25,6 +28,7 @@
from luigi.contrib.external_program import ExternalProgramTask, ExternalPythonProgramTask
from luigi.contrib.external_program import ExternalProgramRunError
from mock import patch, call
from subprocess import Popen
import mock
from nose.plugins.attrib import attr

Expand Down Expand Up @@ -124,7 +128,6 @@ def test_log_stderr_on_success_by_default(self, proc, file, logger):
self.assertIn(call.info('Program stderr:\nstderr'), logger.mock_calls)

def test_capture_output_set_to_false_writes_output_to_stdout(self):
from subprocess import Popen

out = tempfile.TemporaryFile()

Expand Down Expand Up @@ -184,6 +187,80 @@ def test_non_mocked_task_run(self):
# clean up temp files even if assertion fails
shutil.rmtree(tempdir)

def test_tracking_url_pattern_works_with_capture_output_disabled(self):
test_val = Value('i', 0)

def fake_set_tracking_url(val, url):
if url == "TEXT":
val.value += 1

task = TestEchoTask(capture_output=False, stream_for_searching_tracking_url='stdout',
tracking_url_pattern=r"SOME (.*)")
task.MESSAGE = "SOME TEXT"

with mock.patch.object(task, 'set_tracking_url', new=partial(fake_set_tracking_url, test_val)):
task.run()
self.assertEqual(test_val.value, 1)

def test_tracking_url_pattern_works_with_capture_output_enabled(self):
test_val = Value('i', 0)

def fake_set_tracking_url(val, url):
if url == "THING":
val.value += 1

task = TestEchoTask(capture_output=True, stream_for_searching_tracking_url='stdout',
tracking_url_pattern=r"ANY(.*)")
task.MESSAGE = "ANYTHING"

with mock.patch.object(task, 'set_tracking_url', new=partial(fake_set_tracking_url, test_val)):
task.run()
self.assertEqual(test_val.value, 1)

def test_tracking_url_pattern_works_with_stderr(self):
test_val = Value('i', 0)

def fake_set_tracking_url(val, url):
if url == "THING_ELSE":
val.value += 1

def Popen_wrap(args, **kwargs):
return Popen('>&2 echo "ANYTHING_ELSE"', shell=True, **kwargs)

task = TestEchoTask(capture_output=True, stream_for_searching_tracking_url='stderr',
tracking_url_pattern=r"ANY(.*)")

with mock.patch('luigi.contrib.external_program.subprocess.Popen', wraps=Popen_wrap):
with mock.patch.object(task, 'set_tracking_url', new=partial(fake_set_tracking_url, test_val)):
task.run()
self.assertEqual(test_val.value, 1)

def test_no_url_searching_is_performed_if_pattern_is_not_set(self):
def Popen_wrap(args, **kwargs):
# stdout should not be replaced with pipe if tracking_url_pattern is not set
self.assertNotEqual(kwargs['stdout'], subprocess.PIPE)
return Popen(args, **kwargs)

task = TestEchoTask(capture_output=True, stream_for_searching_tracking_url='stdout')

with mock.patch('luigi.contrib.external_program.subprocess.Popen', wraps=Popen_wrap):
task.run()

def test_tracking_url_context_works_without_capture_output(self):
test_val = Value('i', 0)

def fake_set_tracking_url(val, url):
if url == "world":
val.value += 1

task = TestEchoTask(capture_output=False, stream_for_searching_tracking_url='stdout',
tracking_url_pattern=r"Hello, (.*)!")
test_args = list(map(str, task.program_args()))
with mock.patch.object(task, 'set_tracking_url', new=partial(fake_set_tracking_url, test_val)):
with task._proc_with_tracking_url_context(proc_args=test_args, proc_kwargs={}) as proc:
proc.wait()
self.assertEqual(test_val.value, 1)


class TestExternalPythonProgramTask(ExternalPythonProgramTask):
virtualenv = '/path/to/venv'
Expand Down

0 comments on commit 3701c68

Please sign in to comment.