Skip to content

Commit

Permalink
Add support for trace propagation from Pants callers (pantsbuild#7177)
Browse files Browse the repository at this point in the history
Follow up for pantsbuild#7125:
Add flags "--reporting-zipkin-trace-id", "--reporting-zipkin-parent-id" to propagate a trace from systems that invoke Pants.
  • Loading branch information
cattibrie authored and illicitonion committed Jan 31, 2019
1 parent 5a6ac93 commit f65734e
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 17 deletions.
30 changes: 27 additions & 3 deletions src/python/pants/reporting/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,18 @@ def register_options(cls, register):
'{workunits}. Possible formatting values are {formats}'.format(
workunits=list(WorkUnitLabel.keys()), formats=list(ToolOutputFormat.keys())))
register('--zipkin-endpoint', advanced=True, default=None,
help='The full HTTP URL of a zipkin server to which traces should be posted.'
'No traces will be made if this is not set.')
help='The full HTTP URL of a zipkin server to which traces should be posted. '
'No traces will be made if this is not set.')
register('--zipkin-trace-id', advanced=True, default=None,
help='The overall 64 or 128-bit ID of the trace. '
'Set if Pants trace should be a part of larger trace '
'for systems that invoke Pants. If zipkin-trace-id '
'and zipkin-parent-id are not set, a trace_id value is randomly generated for a '
'Zipkin trace')
register('--zipkin-parent-id', advanced=True, default=None,
help='The 64-bit ID for a parent span that invokes Pants. '
'zipkin-trace-id and zipkin-parent-id must both either be set or not set '
'when run Pants command')

def initialize(self, run_tracker, all_options, start_time=None):
"""Initialize with the given RunTracker.
Expand Down Expand Up @@ -88,9 +98,23 @@ def initialize(self, run_tracker, all_options, start_time=None):

# Set up Zipkin reporting.
zipkin_endpoint = self.get_options().zipkin_endpoint
trace_id = self.get_options().zipkin_trace_id
parent_id = self.get_options().zipkin_parent_id

if zipkin_endpoint is None and trace_id is not None and parent_id is not None:
raise ValueError(
"The zipkin-endpoint flag must be set if zipkin-trace-id and zipkin-parent-id flags are given."
)
if (trace_id is None) != (parent_id is None):
raise ValueError(
"Flags zipkin-trace-id and zipkin-parent-id must both either be set or not set."
)

if zipkin_endpoint is not None:
zipkin_reporter_settings = ZipkinReporter.Settings(log_level=Report.INFO)
zipkin_reporter = ZipkinReporter(run_tracker, zipkin_reporter_settings, zipkin_endpoint)
zipkin_reporter = ZipkinReporter(
run_tracker, zipkin_reporter_settings, zipkin_endpoint, trace_id, parent_id
)
report.add_reporter('zipkin', zipkin_reporter)

# Add some useful RunInfo.
Expand Down
36 changes: 32 additions & 4 deletions src/python/pants/reporting/zipkin_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
import requests
from py_zipkin import Encoding
from py_zipkin.transport import BaseTransportHandler
from py_zipkin.zipkin import zipkin_span
from py_zipkin.util import generate_random_64bit_string
from py_zipkin.zipkin import ZipkinAttrs, zipkin_span

from pants.base.workunit import WorkUnitLabel
from pants.reporting.reporter import Reporter
Expand Down Expand Up @@ -37,15 +38,29 @@ def send(self, payload):


class ZipkinReporter(Reporter):
"""Reporter that implements Zipkin tracing .
"""
Reporter that implements Zipkin tracing.
"""

def __init__(self, run_tracker, settings, endpoint):
def __init__(self, run_tracker, settings, endpoint, trace_id, parent_id):
"""
When trace_id and parent_id are set a Zipkin trace will be created with given trace_id
and parent_id. If trace_id and parent_id are set to None, a trace_id will be randomly
generated for a Zipkin trace. trace-id and parent-id must both either be set or not set.
:param RunTracker run_tracker: Tracks and times the execution of a pants run.
:param Settings settings: Generic reporting settings.
:param string endpoint: The full HTTP URL of a zipkin server to which traces should be posted.
:param string trace_id: The overall 64 or 128-bit ID of the trace. May be None.
:param string parent_id: The 64-bit ID for a parent span that invokes Pants. May be None.
"""
super(ZipkinReporter, self).__init__(run_tracker, settings)
# We keep track of connection between workunits and spans
self._workunits_to_spans = {}
# Create a transport handler
self.handler = HTTPTransportHandler(endpoint)
self.trace_id = trace_id
self.parent_id = parent_id

def start_workunit(self, workunit):
"""Implementation of Reporter callback."""
Expand All @@ -59,12 +74,25 @@ def start_workunit(self, workunit):
# Check if it is the first workunit
first_span = not self._workunits_to_spans
if first_span:
# If trace_id and parent_id are given create zipkin_attrs
if self.trace_id is not None:
zipkin_attrs = ZipkinAttrs(
trace_id=self.trace_id,
span_id=generate_random_64bit_string(),
parent_span_id=self.parent_id,
flags='0', # flags: stores flags header. Currently unused
is_sampled=True,
)
else:
zipkin_attrs = None

span = zipkin_span(
service_name=service_name,
span_name=workunit.name,
transport_handler=self.handler,
sample_rate=100.0, # Value between 0.0 and 100.0
encoding=Encoding.V1_THRIFT
encoding=Encoding.V1_THRIFT,
zipkin_attrs=zipkin_attrs
)
else:
span = zipkin_span(
Expand Down
13 changes: 12 additions & 1 deletion tests/python/pants_test/reporting/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Licensed under the Apache License, Version 2.0 (see LICENSE).

python_tests(
name = 'linkify',
sources = ['test_linkify.py'],
dependencies = [
'3rdparty/python:future',
Expand All @@ -18,8 +19,18 @@ python_tests(
'3rdparty/python:parameterized',
'3rdparty/python:py-zipkin',
'src/python/pants/util:contextutil',
'tests/python/pants_test:int-test'
'tests/python/pants_test:int-test',
],
tags = {'integration'},
timeout = 240,
)

python_tests(
name = 'reporting',
sources = ['test_reporting.py'],
dependencies = [
'src/python/pants/goal:run_tracker',
'src/python/pants/reporting',
'tests/python/pants_test:test_base',
],
)
96 changes: 96 additions & 0 deletions tests/python/pants_test/reporting/test_reporting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# coding=utf-8
# Copyright 2019 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

from __future__ import absolute_import, division, print_function, unicode_literals

from pants.goal.run_tracker import RunTracker
from pants.reporting.reporting import Reporting
from pants_test.test_base import TestBase


class ReportingTest(TestBase):

# Options for Zipkin tracing
trace_id = "aaaaaaaaaaaaaaaa"
parent_id = "ffffffffffffffff"
zipkin_endpoint = 'http://localhost:9411/api/v1/spans'

def test_raise_no_zipkin_endpoint_set(self):

options = {'reporting': {'zipkin_trace_id': self.trace_id, 'zipkin_parent_id': self.parent_id}}
context = self.context(for_subsystems=[RunTracker, Reporting], options=options)
run_tracker = RunTracker.global_instance()
reporting = Reporting.global_instance()

with self.assertRaises(ValueError) as result:
reporting.initialize(run_tracker, context.options)

self.assertTrue(
"The zipkin-endpoint flag must be set if zipkin-trace-id and zipkin-parent-id flags are given."
in str(result.exception)
)

def test_raise_no_parent_id_set(self):

options = {'reporting': {'zipkin_trace_id': self.trace_id, 'zipkin_endpoint': self.zipkin_endpoint}}
context = self.context(for_subsystems=[RunTracker, Reporting], options=options)

run_tracker = RunTracker.global_instance()
reporting = Reporting.global_instance()

with self.assertRaises(ValueError) as result:
reporting.initialize(run_tracker, context.options)

self.assertTrue(
"Flags zipkin-trace-id and zipkin-parent-id must both either be set or not set."
in str(result.exception)
)

def test_raise_no_trace_id_set(self):

options = {'reporting': {'zipkin_parent_id': self.parent_id, 'zipkin_endpoint': self.zipkin_endpoint}}
context = self.context(for_subsystems=[RunTracker, Reporting], options=options)

run_tracker = RunTracker.global_instance()
reporting = Reporting.global_instance()

with self.assertRaises(ValueError) as result:
reporting.initialize(run_tracker, context.options)

self.assertTrue(
"Flags zipkin-trace-id and zipkin-parent-id must both either be set or not set."
in str(result.exception)
)

def test_raise_if_no_trace_id_and_zipkin_endpoint_set(self):

options = {'reporting': {'zipkin_parent_id': self.parent_id}}
context = self.context(for_subsystems=[RunTracker, Reporting], options=options)

run_tracker = RunTracker.global_instance()
reporting = Reporting.global_instance()

with self.assertRaises(ValueError) as result:
reporting.initialize(run_tracker, context.options)

self.assertTrue(
"Flags zipkin-trace-id and zipkin-parent-id must both either be set or not set."
in str(result.exception)
)

def test_raise_if_no_parent_id_and_zipkin_endpoint_set(self):

options = {'reporting': {'zipkin_trace_id': self.trace_id}}
context = self.context(for_subsystems=[RunTracker, Reporting], options=options)

run_tracker = RunTracker.global_instance()
reporting = Reporting.global_instance()

with self.assertRaises(ValueError) as result:
reporting.initialize(run_tracker, context.options)

self.assertTrue(
"Flags zipkin-trace-id and zipkin-parent-id must both either be set or not set."
in str(result.exception)
)
55 changes: 46 additions & 9 deletions tests/python/pants_test/reporting/test_reporting_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def test_epilog_to_stderr(self, quiet_flag):
self.assertNotIn('Cumulative Timings', pants_run.stdout_data)

def test_zipkin_reporter(self):
ZipkinHandler = zipkin_handler()
with http_server(ZipkinHandler) as port:
endpoint = "http://localhost:{}".format(port)
command = [
Expand All @@ -185,6 +186,40 @@ def test_zipkin_reporter(self):
self.assertTrue(main_children)
self.assertTrue(any(span['name'] == 'cloc' for span in main_children))

def test_zipkin_reporter_with_given_trace_id_parent_id(self):
ZipkinHandler = zipkin_handler()
with http_server(ZipkinHandler) as port:
endpoint = "http://localhost:{}".format(port)
trace_id = "aaaaaaaaaaaaaaaa"
parent_span_id = "ffffffffffffffff"
command = [
'--reporting-zipkin-endpoint={}'.format(endpoint),
'--reporting-zipkin-trace-id={}'.format(trace_id),
'--reporting-zipkin-parent-id={}'.format(parent_span_id),
'cloc',
'src/python/pants:version'
]

pants_run = self.run_pants(command)
self.assert_success(pants_run)

num_of_traces = len(ZipkinHandler.traces)
self.assertEqual(num_of_traces, 1)

trace = ZipkinHandler.traces[-1]
main_span = self.find_spans_by_name(trace, 'main')
self.assertEqual(len(main_span), 1)

main_span_trace_id = main_span[0]['traceId']
self.assertEqual(main_span_trace_id, trace_id)
main_span_parent_id = main_span[0]['parentId']
self.assertEqual(main_span_parent_id, parent_span_id)

parent_id = main_span[0]['id']
main_children = self.find_spans_by_parentId(trace, parent_id)
self.assertTrue(main_children)
self.assertTrue(any(span['name'] == 'cloc' for span in main_children))

@staticmethod
def find_spans_by_name(trace, name):
return [span for span in trace if span['name'] == name]
Expand All @@ -194,13 +229,15 @@ def find_spans_by_parentId(trace, parent_id):
return [span for span in trace if span.get('parentId') == parent_id]


class ZipkinHandler(BaseHTTPRequestHandler):
traces = []
def zipkin_handler():
class ZipkinHandler(BaseHTTPRequestHandler):
traces = []

def do_POST(self):
content_length = self.headers.get('content-length') if PY3 else self.headers.getheader('content-length')
thrift_trace = self.rfile.read(int(content_length))
json_trace = convert_spans(thrift_trace, Encoding.V1_JSON, Encoding.V1_THRIFT)
trace = json.loads(json_trace)
self.__class__.traces.append(trace)
self.send_response(200)
def do_POST(self):
content_length = self.headers.get('content-length') if PY3 else self.headers.getheader('content-length')
thrift_trace = self.rfile.read(int(content_length))
json_trace = convert_spans(thrift_trace, Encoding.V1_JSON, Encoding.V1_THRIFT)
trace = json.loads(json_trace)
self.__class__.traces.append(trace)
self.send_response(200)
return ZipkinHandler

0 comments on commit f65734e

Please sign in to comment.