Skip to content

Commit

Permalink
chore(peer.service): peer.service logic + flag (DataDog#6042)
Browse files Browse the repository at this point in the history
== Special Notes ==
These changes are gated behind a feature flag and are not expected to be
stable until a release note is generated. Existing customers should be
unaffected.

The logic relies on 'precursor' tags, which are being added in other
PRs. Logic will not be fully enabled until until the precursors are
completed.

These changes are reliant on service naming as well, and will not be
fully enabled until that initiative is complete.

== Peer.Service Summary ==
The changes for peer.service will enable new functionality in the
service map, when enabled, in conjunction with service naming.

The changes in this PR are:

1. Introduce the peer.service computation, gated behind a feature flag
2. Add in the peer.service and peer.service.source computation logic

When peer service is enabled, the logic is applied as a post-processor
on spans, as part of the "on_finish" methods
enabled.

## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines)
are followed.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).

## Reviewer Checklist

- [ ] Title is accurate.
- [ ] No unnecessary changes are introduced.
- [ ] Description motivates each change.
- [ ] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [ ] Testing strategy adequately addresses listed risk(s).
- [ ] Change is maintainable (easy to change, telemetry, documentation).
- [ ] Release note makes sense to a user of the library.
- [ ] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.
  • Loading branch information
tabgok authored Jun 8, 2023
1 parent 6c7aaad commit 3270900
Show file tree
Hide file tree
Showing 18 changed files with 219 additions and 12 deletions.
31 changes: 31 additions & 0 deletions ddtrace/internal/processor/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from ddtrace import config
from ddtrace.constants import SAMPLING_PRIORITY_KEY
from ddtrace.constants import SPAN_KIND
from ddtrace.constants import USER_KEEP
from ddtrace.internal import gitmetadata
from ddtrace.internal.constants import HIGHER_ORDER_TRACE_ID_BITS
Expand Down Expand Up @@ -278,3 +279,33 @@ def on_span_finish(self, span):
if config._trace_compute_stats:
span.set_metric(SAMPLING_PRIORITY_KEY, USER_KEEP)
break


class PeerServiceProcessor(SpanProcessor):
def __init__(self, peer_service_config):
self._config = peer_service_config
self.enabled = self._config.enabled

def on_span_start(self, span):
"""
We don't do anything on span start
"""
pass

def on_span_finish(self, span):
if not self.enabled:
return

if span.get_tag(self._config.tag_name): # If the tag already exists, assume it is user generated
span.set_tag(self._config.source_tag_name, self._config.tag_name)
return

if span.get_tag(SPAN_KIND) not in self._config.enabled_span_kinds:
return

for data_source in self._config.prioritized_data_sources:
peer_service_definition = span.get_tag(data_source)
if peer_service_definition:
span.set_tag(self._config.tag_name, peer_service_definition)
span.set_tag(self._config.source_tag_name, data_source)
return
22 changes: 13 additions & 9 deletions ddtrace/internal/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# Span attribute schema
def _validate_schema(version):
error_message = (
"You have specified an invalid span attribute schema version: '{}'.".format(__schema_version),
"You have specified an invalid span attribute schema version: '{}'.".format(version),
"Valid options are: {}. You can change the specified value by updating".format(
_SPAN_ATTRIBUTE_TO_FUNCTION.keys()
),
Expand All @@ -19,21 +19,25 @@ def _validate_schema(version):
assert version in _SPAN_ATTRIBUTE_TO_FUNCTION.keys(), error_message


__schema_version = os.getenv("DD_TRACE_SPAN_ATTRIBUTE_SCHEMA", default="v0")
_remove_client_service_names = asbool(os.getenv("DD_TRACE_REMOVE_INTEGRATION_SERVICE_NAMES_ENABLED", default=False))
_validate_schema(__schema_version)
def _get_schema_version():
return os.getenv("DD_TRACE_SPAN_ATTRIBUTE_SCHEMA", default="v0")


_service_name_schema_version = "v0" if __schema_version == "v0" and not _remove_client_service_names else "v1"
SCHEMA_VERSION = _get_schema_version()
_validate_schema(SCHEMA_VERSION)
_remove_client_service_names = asbool(os.getenv("DD_TRACE_REMOVE_INTEGRATION_SERVICE_NAMES_ENABLED", default=False))
_service_name_schema_version = "v0" if SCHEMA_VERSION == "v0" and not _remove_client_service_names else "v1"

DEFAULT_SPAN_SERVICE_NAME = _DEFAULT_SPAN_SERVICE_NAMES[_service_name_schema_version]
schematize_service_name = _SPAN_ATTRIBUTE_TO_FUNCTION[_service_name_schema_version]["service_name"]
schematize_database_operation = _SPAN_ATTRIBUTE_TO_FUNCTION[__schema_version]["database_operation"]
schematize_cache_operation = _SPAN_ATTRIBUTE_TO_FUNCTION[__schema_version]["cache_operation"]
schematize_cloud_api_operation = _SPAN_ATTRIBUTE_TO_FUNCTION[__schema_version]["cloud_api_operation"]
schematize_url_operation = _SPAN_ATTRIBUTE_TO_FUNCTION[__schema_version]["url_operation"]
schematize_database_operation = _SPAN_ATTRIBUTE_TO_FUNCTION[SCHEMA_VERSION]["database_operation"]
schematize_cache_operation = _SPAN_ATTRIBUTE_TO_FUNCTION[SCHEMA_VERSION]["cache_operation"]
schematize_cloud_api_operation = _SPAN_ATTRIBUTE_TO_FUNCTION[SCHEMA_VERSION]["cloud_api_operation"]
schematize_url_operation = _SPAN_ATTRIBUTE_TO_FUNCTION[SCHEMA_VERSION]["url_operation"]

__all__ = [
"DEFAULT_SPAN_SERVICE_NAME",
"SCHEMA_VERSION",
"schematize_service_name",
"schematize_database_operation",
"schematize_cache_operation",
Expand Down
18 changes: 18 additions & 0 deletions ddtrace/settings/peer_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import os

from ddtrace.ext import SpanKind
from ddtrace.internal.schema import SCHEMA_VERSION
from ddtrace.internal.utils.formats import asbool


class PeerServiceConfig(object):
source_tag_name = "_dd.peer.service.source"
tag_name = "peer.service"
enabled_span_kinds = {SpanKind.CLIENT, SpanKind.PRODUCER}
prioritized_data_sources = ["messaging.kafka.bootstrap.servers", "db.name", "rpc.service", "out.host"]

@property
def enabled(self):
env_enabled = asbool(os.getenv("DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED", default=False))

return SCHEMA_VERSION == "v1" or (SCHEMA_VERSION == "v0" and env_enabled)
10 changes: 10 additions & 0 deletions ddtrace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from ddtrace.internal.processor.endpoint_call_counter import EndpointCallCounterProcessor
from ddtrace.internal.sampling import SpanSamplingRule
from ddtrace.internal.sampling import get_span_sampling_rules
from ddtrace.settings.peer_service import PeerServiceConfig
from ddtrace.vendor import debtcollector

from . import _hooks
Expand All @@ -36,6 +37,7 @@
from .internal.logger import get_logger
from .internal.logger import hasHandlers
from .internal.processor import SpanProcessor
from .internal.processor.trace import PeerServiceProcessor
from .internal.processor.trace import SpanAggregator
from .internal.processor.trace import SpanSamplingProcessor
from .internal.processor.trace import TopLevelSpanProcessor
Expand Down Expand Up @@ -136,6 +138,7 @@ def _default_span_processors_factory(
single_span_sampling_rules, # type: List[SpanSamplingRule]
agent_url, # type: str
profiling_span_processor, # type: EndpointCallCounterProcessor
peer_service_processor, # type: PeerServiceProcessor
):
# type: (...) -> Tuple[List[SpanProcessor], Optional[Any], List[SpanProcessor]]
# FIXME: type should be AppsecSpanProcessor but we have a cyclic import here
Expand Down Expand Up @@ -176,6 +179,9 @@ def _default_span_processors_factory(
if single_span_sampling_rules:
span_processors.append(SpanSamplingProcessor(single_span_sampling_rules))

if peer_service_processor.enabled:
span_processors.append(peer_service_processor)

# These need to run after all the other processors
deferred_processors = [
SpanAggregator(
Expand Down Expand Up @@ -262,6 +268,7 @@ def __init__(
self._appsec_processor = None
self._iast_enabled = config._iast_enabled
self._endpoint_call_counter_span_processor = EndpointCallCounterProcessor()
self._peer_service_processor = PeerServiceProcessor(PeerServiceConfig())
self._span_processors, self._appsec_processor, self._deferred_processors = _default_span_processors_factory(
self._filters,
self._writer,
Expand All @@ -273,6 +280,7 @@ def __init__(
self._single_span_sampling_rules,
self._agent_url,
self._endpoint_call_counter_span_processor,
self._peer_service_processor,
)
if config._data_streams_enabled:
# Inline the import to avoid pulling in ddsketch or protobuf
Expand Down Expand Up @@ -515,6 +523,7 @@ def configure(
self._single_span_sampling_rules,
self._agent_url,
self._endpoint_call_counter_span_processor,
self._peer_service_processor,
)

if context_provider is not None:
Expand Down Expand Up @@ -564,6 +573,7 @@ def _child_after_fork(self):
self._single_span_sampling_rules,
self._agent_url,
self._endpoint_call_counter_span_processor,
self._peer_service_processor,
)

self._new_process = True
Expand Down
Empty file.
115 changes: 115 additions & 0 deletions tests/internal/peer_service/test_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import os

import mock
import pytest

from ddtrace.constants import SPAN_KIND
from ddtrace.ext import SpanKind
from ddtrace.internal.processor.trace import PeerServiceProcessor
from ddtrace.settings.peer_service import PeerServiceConfig
from ddtrace.span import Span


@pytest.fixture
def peer_service_config():
return PeerServiceConfig()


@pytest.fixture
def processor(peer_service_config):
return PeerServiceProcessor(peer_service_config)


@pytest.fixture
def test_span():
return Span(
"test_messaging_span",
service="test_service",
resource="test_resource",
span_type="test_span_type",
)


@pytest.mark.parametrize("span_kind", [SpanKind.CLIENT, SpanKind.PRODUCER])
def test_processing_peer_service_exists(processor, test_span, span_kind, peer_service_config):
processor.enabled = True
test_span.set_tag(SPAN_KIND, span_kind)
test_span.set_tag(peer_service_config.tag_name, "fake_peer_service")
test_span.set_tag("out.host", "fake_falue") # Should not show up
processor.on_span_finish(test_span)

assert test_span.get_tag(peer_service_config.tag_name) == "fake_peer_service"
assert test_span.get_tag(peer_service_config.source_tag_name) == "peer.service"


@pytest.mark.parametrize("span_kind", [SpanKind.SERVER, SpanKind.CONSUMER])
def test_nothing_happens_for_server_and_consumer(processor, test_span, span_kind, peer_service_config):
processor.enabled = True
test_span.set_tag(SPAN_KIND, span_kind)
test_span.set_tag("out.host", "fake_host")
processor.on_span_finish(test_span)

assert test_span.get_tag(peer_service_config.source_tag_name) is None


@pytest.mark.parametrize("data_source", PeerServiceConfig.prioritized_data_sources)
def test_existing_data_sources(processor, test_span, data_source, peer_service_config):
processor.enabled = True
test_span.set_tag(SPAN_KIND, SpanKind.CLIENT)
test_span.set_tag(data_source, "test_value")

processor.on_span_finish(test_span)

assert test_span.get_tag(peer_service_config.tag_name) == "test_value"
assert test_span.get_tag(peer_service_config.source_tag_name) == data_source


@pytest.mark.parametrize("data_source", PeerServiceConfig.prioritized_data_sources)
def test_disabled_peer_service(processor, test_span, data_source, peer_service_config):
processor.enabled = False
test_span.set_tag(data_source, "test_value")
processor.on_span_finish(test_span)

assert test_span.get_tag(peer_service_config.tag_name) is None
assert test_span.get_tag(peer_service_config.source_tag_name) is None


@pytest.mark.parametrize(
"schema_peer_enabled",
[
("v0", False, False),
("v0", True, True),
("v1", False, True),
("v1", True, True),
],
)
def fake_peer_service_enablement(span, schema_peer_enabled):
schema_version, env_enabled, expected = schema_peer_enabled

with mock.patch.dict(os.environ, {"DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED": env_enabled}):
with mock.patch("ddtrace.internal.schema.SCHEMA_VERSION", schema_version):
assert PeerServiceConfig().enabled == expected


@pytest.mark.subprocess(env=dict(DD_TRACE_PEER_SERVICE_DEFAULTS_ENABLED="True"), ddtrace_run=True)
def test_tracer_hooks():
from ddtrace.constants import SPAN_KIND
from ddtrace.ext import SpanKind
from ddtrace.settings.peer_service import PeerServiceConfig
from tests.utils import DummyTracer

peer_service_config = PeerServiceConfig()
tracer = DummyTracer()
span = tracer.trace(
"test",
service="test_service",
resource="test_resource",
span_type="test_span_type",
)
span.set_tag(SPAN_KIND, SpanKind.CLIENT)
span.set_tag("out.host", "test_value")

span.finish()

assert span.get_tag(peer_service_config.tag_name) == "test_value"
assert span.get_tag(peer_service_config.source_tag_name) == "out.host"
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
"error": 0,
"meta": {
"_dd.p.dm": "-0",
"_dd.peer.service.source": "out.host",
"component": "aiohttp_client",
"http.method": "GET",
"http.status_code": "200",
"http.status_msg": "OK",
"http.url": "http://localhost:8001/status/200",
"language": "python",
"out.host": "localhost",
"peer.service": "localhost",
"runtime-id": "b3d83691a6a8417b8ef35d1924f5fb15",
"span.kind": "client"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
"error": 0,
"meta": {
"_dd.p.dm": "-0",
"_dd.peer.service.source": "out.host",
"component": "aiohttp_client",
"http.method": "GET",
"http.status_code": "200",
"http.status_msg": "OK",
"http.url": "http://localhost:8001/status/200",
"language": "python",
"out.host": "localhost",
"peer.service": "localhost",
"runtime-id": "da917e48b6e4471bb4f574683c218cb8",
"span.kind": "client"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
"error": 0,
"meta": {
"_dd.p.dm": "-0",
"_dd.peer.service.source": "db.name",
"component": "aiomysql",
"db.name": "test",
"db.system": "mysql",
"db.user": "test",
"language": "python",
"out.host": "127.0.0.1",
"peer.service": "test",
"runtime-id": "9f466d3fd1ef4e59ab6d72707e720ecf",
"span.kind": "client",
"sql.query": "select 'dba4x4'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
"error": 0,
"meta": {
"_dd.p.dm": "-0",
"_dd.peer.service.source": "db.name",
"component": "aiomysql",
"db.name": "test",
"db.system": "mysql",
"db.user": "test",
"language": "python",
"out.host": "127.0.0.1",
"peer.service": "test",
"runtime-id": "f446781b5d244b95abc64fb246064c81",
"span.kind": "client",
"sql.query": "select 'dba4x4'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
"error": 0,
"meta": {
"_dd.p.dm": "-0",
"_dd.peer.service.source": "db.name",
"component": "aiomysql",
"db.name": "test",
"db.system": "mysql",
"db.user": "test",
"language": "python",
"out.host": "127.0.0.1",
"peer.service": "test",
"runtime-id": "ae2b5fb6b2b849ed9a299852cb4df1d5",
"span.kind": "client",
"sql.query": "select 'dba4x4'"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
"error": 0,
"meta": {
"_dd.p.dm": "-0",
"_dd.peer.service.source": "db.name",
"component": "asyncpg",
"db.name": "postgres",
"db.system": "postgresql",
"db.user": "postgres",
"language": "python",
"out.host": "127.0.0.1",
"peer.service": "postgres",
"runtime-id": "f4cf631eaf9b4be0b728f535891258cb",
"span.kind": "client"
},
Expand All @@ -42,12 +44,14 @@
"error": 0,
"meta": {
"_dd.p.dm": "-0",
"_dd.peer.service.source": "db.name",
"component": "asyncpg",
"db.name": "postgres",
"db.system": "postgresql",
"db.user": "postgres",
"language": "python",
"out.host": "127.0.0.1",
"peer.service": "postgres",
"runtime-id": "f4cf631eaf9b4be0b728f535891258cb",
"span.kind": "client"
},
Expand Down
Loading

0 comments on commit 3270900

Please sign in to comment.