Skip to content

Commit

Permalink
chore(di): trigger probes (#10942)
Browse files Browse the repository at this point in the history
We implement trigger probes. These allows triggering the capture of
debug information along a trace, ensuring all the relevant probes are
also triggered.

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
  • Loading branch information
P403n1x87 authored Feb 12, 2025
1 parent 6b5f13c commit 84949f3
Show file tree
Hide file tree
Showing 29 changed files with 751 additions and 98 deletions.
16 changes: 14 additions & 2 deletions ddtrace/_trace/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,17 @@ class Context(object):
boundaries.
"""

__slots__ = ["trace_id", "span_id", "_lock", "_meta", "_metrics", "_span_links", "_baggage", "_is_remote"]
__slots__ = [
"trace_id",
"span_id",
"_lock",
"_meta",
"_metrics",
"_span_links",
"_baggage",
"_is_remote",
"__weakref__",
]

def __init__(
self,
Expand Down Expand Up @@ -259,7 +269,6 @@ def __eq__(self, other: Any) -> bool:
with self._lock:
return (
self.trace_id == other.trace_id
and self.span_id == other.span_id
and self._meta == other._meta
and self._metrics == other._metrics
and self._span_links == other._span_links
Expand All @@ -279,4 +288,7 @@ def __repr__(self) -> str:
self._is_remote,
)

def __hash__(self) -> int:
return hash(self.trace_id)

__str__ = __repr__
3 changes: 3 additions & 0 deletions ddtrace/contrib/internal/trace_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from ddtrace.internal.compat import ensure_text
from ddtrace.internal.compat import ip_is_global
from ddtrace.internal.compat import parse
from ddtrace.internal.core.event_hub import dispatch
from ddtrace.internal.logger import get_logger
from ddtrace.internal.utils.cache import cached
from ddtrace.internal.utils.http import normalize_header_name
Expand Down Expand Up @@ -596,6 +597,8 @@ def activate_distributed_headers(tracer, int_config=None, request_headers=None,
# have a context with the same trace id active
tracer.context_provider.activate(context)

dispatch("distributed_context.activated", (context,))


def _flatten(
obj, # type: Any
Expand Down
4 changes: 4 additions & 0 deletions ddtrace/debugging/_exception/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from ddtrace.debugging._probe.model import LiteralTemplateSegment
from ddtrace.debugging._probe.model import LogLineProbe
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.snapshot import DEFAULT_CAPTURE_LIMITS
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
Expand Down Expand Up @@ -194,6 +195,9 @@ def can_capture(span: Span) -> bool:
return True

if info_captured is None:
if Session.from_trace():
# If we are in a debug session we always capture
return True
result = GLOBAL_RATE_LIMITER.limit() is not RateLimitExceeded
root.set_tag_str(CAPTURE_TRACE_TAG, str(result).lower())
return result
Expand Down
10 changes: 10 additions & 0 deletions ddtrace/debugging/_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from ddtrace.debugging._session import Session
from ddtrace.internal import core


def enable() -> None:
core.on("distributed_context.activated", Session.activate_distributed, "live_debugger")


def disable() -> None:
core.reset_listeners("distributed_context.activated", Session.activate_distributed)
133 changes: 77 additions & 56 deletions ddtrace/debugging/_origin/span.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
from dataclasses import dataclass
from functools import partial
from itertools import count
from pathlib import Path
import sys
import time

# from threading import current_thread
from threading import current_thread
from time import monotonic_ns
from types import FrameType
from types import FunctionType
import typing as t
import uuid

import ddtrace

# from ddtrace import config
from ddtrace._trace.processor import SpanProcessor

# from ddtrace.debugging._debugger import Debugger
from ddtrace.debugging._probe.model import DEFAULT_CAPTURE_LIMITS
from ddtrace.debugging._probe.model import LiteralTemplateSegment
from ddtrace.debugging._probe.model import LogFunctionProbe
from ddtrace.debugging._probe.model import LogLineProbe
from ddtrace.debugging._probe.model import ProbeEvalTiming

# from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._signal.model import Signal
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.debugging._uploader import UploaderProduct
from ddtrace.ext import EXIT_SPAN_TYPES
from ddtrace.internal import core
from ddtrace.internal.packages import is_user_code
Expand All @@ -41,13 +39,13 @@ def frame_stack(frame: FrameType) -> t.Iterator[FrameType]:
_frame = _frame.f_back


def wrap_entrypoint(f: t.Callable) -> None:
def wrap_entrypoint(collector: SignalCollector, f: t.Callable) -> None:
if not _isinstance(f, FunctionType):
return

_f = t.cast(FunctionType, f)
if not EntrySpanWrappingContext.is_wrapped(_f):
EntrySpanWrappingContext(_f).wrap()
EntrySpanWrappingContext(collector, _f).wrap()


@dataclass
Expand Down Expand Up @@ -121,9 +119,13 @@ class EntrySpanLocation:


class EntrySpanWrappingContext(WrappingContext):
def __init__(self, f):
__priority__ = 199

def __init__(self, collector: SignalCollector, f: FunctionType) -> None:
super().__init__(f)

self.collector = collector

filename = str(Path(f.__code__.co_filename).resolve())
name = f.__qualname__
module = f.__module__
Expand Down Expand Up @@ -153,36 +155,37 @@ def __enter__(self):
s.set_tag_str("_dd.code_origin.frames.0.type", location.module)
s.set_tag_str("_dd.code_origin.frames.0.method", location.name)

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=location.probe,
# frame=self.__frame__,
# thread=current_thread(),
# trace_context=root,
# )

# # Capture on entry
# context = Debugger.get_collector().attach(snapshot)

# # Correlate the snapshot with the span
# root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
# span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)

# self.set("context", context)
# self.set("start_time", time.monotonic_ns())
self.set("start_time", monotonic_ns())

return self

def _close_signal(self, retval=None, exc_info=(None, None, None)):
try:
signal: Signal = t.cast(Signal, self.get("signal"))
except KeyError:
# No snapshot was created
root = ddtrace.tracer.current_root_span()
span = ddtrace.tracer.current_span()
if root is None or span is None:
return

signal.do_exit(retval, exc_info, time.monotonic_ns() - self.get("start_time"))
# Check if we have any level 2 debugging sessions running for the
# current trace
if any(s.level >= 2 for s in Session.from_trace()):
# Create a snapshot
snapshot = Snapshot(
probe=self.location.probe,
frame=self.__frame__,
thread=current_thread(),
trace_context=root,
)

# Capture on entry
snapshot.do_enter()

# Correlate the snapshot with the span
root.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)
span.set_tag_str("_dd.code_origin.frames.0.snapshot_id", snapshot.uuid)

snapshot.do_exit(retval, exc_info, monotonic_ns() - self.get("start_time"))

self.collector.push(snapshot)

def __return__(self, retval):
self._close_signal(retval=retval)
Expand All @@ -195,7 +198,10 @@ def __exit__(self, exc_type, exc_value, traceback):

@dataclass
class SpanCodeOriginProcessor(SpanProcessor):
__uploader__ = LogsIntakeUploaderV1

_instance: t.Optional["SpanCodeOriginProcessor"] = None
_handler: t.Optional[t.Callable] = None

def on_span_start(self, span: Span) -> None:
if span.span_type not in EXIT_SPAN_TYPES:
Expand Down Expand Up @@ -224,24 +230,25 @@ def on_span_start(self, span: Span) -> None:
except ValueError:
continue

# TODO[gab]: This will be enabled as part of the live debugger/distributed debugging
# if ld_config.enabled:
# # Create a snapshot
# snapshot = Snapshot(
# probe=ExitSpanProbe.from_frame(frame),
# frame=frame,
# thread=current_thread(),
# trace_context=span,
# )
# Check if we have any level 2 debugging sessions running for
# the current trace
if any(s.level >= 2 for s in Session.from_trace()):
# Create a snapshot
snapshot = Snapshot(
probe=ExitSpanProbe.from_frame(frame),
frame=frame,
thread=current_thread(),
trace_context=span,
)

# # Capture on entry
# snapshot.line()
# Capture on entry
snapshot.do_line()

# # Collect
# Debugger.get_collector().push(snapshot)
# Collect
self.__uploader__.get_collector().push(snapshot)

# # Correlate the snapshot with the span
# span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)
# Correlate the snapshot with the span
span.set_tag_str(f"_dd.code_origin.frames.{n}.snapshot_id", snapshot.uuid)

def on_span_finish(self, span: Span) -> None:
pass
Expand All @@ -251,17 +258,31 @@ def enable(cls):
if cls._instance is not None:
return

core.on("service_entrypoint.patch", wrap_entrypoint)

instance = cls._instance = cls()

# Register code origin for span with the snapshot uploader
cls.__uploader__.register(UploaderProduct.CODE_ORIGIN_SPAN)

# Register the processor for exit spans
instance.register()

# Register the entrypoint wrapping for entry spans
cls._handler = handler = partial(wrap_entrypoint, cls.__uploader__.get_collector())
core.on("service_entrypoint.patch", handler)

@classmethod
def disable(cls):
if cls._instance is None:
return

# Unregister the entrypoint wrapping for entry spans
core.reset_listeners("service_entrypoint.patch", cls._handler)
cls._handler = None

# Unregister the processor for exit spans
cls._instance.unregister()
cls._instance = None

core.reset_listeners("service_entrypoint.patch", wrap_entrypoint)
# Unregister code origin for span with the snapshot uploader
cls.__uploader__.unregister(UploaderProduct.CODE_ORIGIN_SPAN)

cls._instance = None
28 changes: 26 additions & 2 deletions ddtrace/debugging/_probe/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

DEFAULT_PROBE_RATE = 5000.0
DEFAULT_SNAPSHOT_PROBE_RATE = 1.0
DEFAULT_TRIGGER_PROBE_RATE = 1.0 / 60.0 # 1 per minute
DEFAULT_PROBE_CONDITION_ERROR_RATE = 1.0 / 60 / 5


Expand Down Expand Up @@ -245,6 +246,10 @@ class LogProbeMixin(AbstractProbeMixIn):
take_snapshot: bool
limits: CaptureLimits = field(compare=False)

@property
def __budget__(self) -> int:
return 10 if self.take_snapshot else 100


@dataclass
class LogLineProbe(Probe, LineLocationMixin, LogProbeMixin, ProbeConditionMixin, RateLimitMixin):
Expand Down Expand Up @@ -301,12 +306,31 @@ class SpanDecorationFunctionProbe(Probe, FunctionLocationMixin, TimingMixin, Spa
pass


LineProbe = Union[LogLineProbe, MetricLineProbe, SpanDecorationLineProbe]
FunctionProbe = Union[LogFunctionProbe, MetricFunctionProbe, SpanFunctionProbe, SpanDecorationFunctionProbe]
@dataclass
class SessionMixin:
session_id: str
level: int


@dataclass
class TriggerLineProbe(Probe, LineLocationMixin, SessionMixin, ProbeConditionMixin, RateLimitMixin):
pass


@dataclass
class TriggerFunctionProbe(Probe, FunctionLocationMixin, SessionMixin, ProbeConditionMixin, RateLimitMixin):
pass


LineProbe = Union[LogLineProbe, MetricLineProbe, SpanDecorationLineProbe, TriggerLineProbe]
FunctionProbe = Union[
LogFunctionProbe, MetricFunctionProbe, SpanFunctionProbe, SpanDecorationFunctionProbe, TriggerFunctionProbe
]


class ProbeType(str, Enum):
LOG_PROBE = "LOG_PROBE"
METRIC_PROBE = "METRIC_PROBE"
SPAN_PROBE = "SPAN_PROBE"
SPAN_DECORATION_PROBE = "SPAN_DECORATION_PROBE"
TRIGGER_PROBE = "TRIGGER_PROBE"
Loading

0 comments on commit 84949f3

Please sign in to comment.