Skip to content

Commit

Permalink
chore(ci_visibility): add threading to internal coverage collector (D…
Browse files Browse the repository at this point in the history
…ataDog#9426)

Adds the ability to collect context-level coverage when using the
`threading` module.

The `Thread` class is patched with a custom `_bootstrap_inner` method
that enables context-level coverage in the thread before it executes,
and submit the collected data to a queue shared with the parent thread
after execution finishes . The `join()` method is patched to consume the
shared queue and add it to the active context in the parent's execution.

No release notes as this continues to be experimental/unreleased code.

## Checklist

- [x] Change(s) are motivated and described in the PR description
- [x] Testing strategy is described if automated tests are not included
in the PR
- [x] Risks are described (performance impact, potential for breakage,
maintainability)
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
are followed or label `changelog/no-changelog` is set
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/))
- [x] Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))
- [x] If this PR changes the public interface, I've notified
`@DataDog/apm-tees`.

## Reviewer Checklist

- [x] Title is accurate
- [x] All changes are related to the pull request's stated goal
- [x] Description motivates each change
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- [x] Testing strategy adequately addresses listed risks
- [x] Change is maintainable (easy to change, telemetry, documentation)
- [x] Release note makes sense to a user of the library
- [x] Author has acknowledged and discussed the performance implications
of this PR as reported in the benchmarks PR comment
- [x] Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: Gabriele N. Tornetta <[email protected]>
Co-authored-by: Gabriele N. Tornetta <[email protected]>
  • Loading branch information
3 people authored Jun 5, 2024
1 parent 015ef4b commit c035b91
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 4 deletions.
17 changes: 17 additions & 0 deletions ddtrace/internal/coverage/code.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ def get_data_json(cls) -> str:

return json.dumps({"lines": executable_lines, "covered": covered_lines})

@classmethod
def get_context_data_json(cls) -> str:
covered_lines = cls.get_context_covered_lines()

return json.dumps({"lines": {}, "covered": {path: list(lines) for path, lines in covered_lines.items()}})

@classmethod
def get_context_covered_lines(cls):
if cls._instance is None or not ctx_coverage_enabled.get():
return {}

return ctx_covered.get()

@classmethod
def write_json_report_to_file(cls, filename: str, workspace_path: Path, ignore_nocover: bool = False):
if cls._instance is None:
Expand Down Expand Up @@ -182,6 +195,10 @@ def coverage_enabled(cls):
return False
return cls._instance._coverage_enabled

@classmethod
def coverage_enabled_in_context(cls):
return cls._instance is not None and ctx_coverage_enabled.get()

@classmethod
def report_seen_lines(cls):
"""Generate the same data as expected by ddtrace.ci_visibility.coverage.build_payload:
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/internal/coverage/installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@

from ddtrace.internal.coverage.code import ModuleCodeCollector
from ddtrace.internal.coverage.multiprocessing_coverage import _patch_multiprocessing
from ddtrace.internal.coverage.threading_coverage import _patch_threading


def install(include_paths: t.Optional[t.List[Path]] = None) -> None:
ModuleCodeCollector.install(include_paths=include_paths)
_patch_multiprocessing()
_patch_threading()
8 changes: 4 additions & 4 deletions ddtrace/internal/coverage/multiprocessing_coverage.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
DD_PATCH_ATTR = "_datadog_patch"


def _is_patched():
return hasattr(multiprocessing, DD_PATCH_ATTR)


class CoverageCollectingMultiprocess(BaseProcess):
def _absorb_child_coverage(self):
if ModuleCodeCollector._instance is None:
Expand Down Expand Up @@ -162,7 +166,3 @@ def get_preparation_data_with_stowaway(name: str) -> t.Dict[str, t.Any]:
spawn.get_preparation_data = get_preparation_data_with_stowaway

setattr(multiprocessing, DD_PATCH_ATTR, True)


def _is_patched():
return hasattr(multiprocessing, DD_PATCH_ATTR)
70 changes: 70 additions & 0 deletions ddtrace/internal/coverage/threading_coverage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Provides functionality for context-based coverage to work across threads
Without this, context-based collection in the parent process would not capture code executed by threads (due to the
parent process' context variables not being shared in threads).
The collection of coverage is done when the thread's join() method is called, so context-level coverage will not be
captured if join() is not called.
Since the ModuleCodeCollector is already installed at the process level, there is no need to reinstall it or ensure that
its include_paths are set.
Session-level coverage does not need special-casing since the ModuleCodeCollector behavior is process-wide and
thread-safe.
"""
from queue import Queue
import threading

from ddtrace.internal.coverage.code import ModuleCodeCollector


Thread = threading.Thread
thread_init = Thread.__init__
thread_boostrap_inner = Thread._bootstrap_inner # type: ignore[attr-defined]
thread_join = Thread.join

DD_PATCH_ATTR = "_datadog_patch"


def _is_patched():
return hasattr(threading, DD_PATCH_ATTR)


class CoverageCollectingThread(threading.Thread):
def __init__(self, *args, **kwargs):
"""Wraps the thread initialization creation to enable coverage collection
Only enables coverage if the parent process' context-level coverage is enabled.
"""
self._should_cover = ModuleCodeCollector.is_installed() and ModuleCodeCollector.coverage_enabled_in_context()

if self._should_cover:
self._coverage_queue = Queue()

thread_init(self, *args, **kwargs)

def _bootstrap_inner(self):
"""Collect thread-level coverage data in a context and queue it up for the parent process to absorb"""
if self._should_cover:
self._coverage_context = ModuleCodeCollector.CollectInContext()
self._coverage_context.__enter__()

thread_boostrap_inner(self)

if self._should_cover:
covered_lines = ModuleCodeCollector.get_context_data_json()
self._coverage_context.__exit__()
self._coverage_queue.put(covered_lines)

def join(self, *args, **kwargs):
"""Absorb coverage data from the thread after it's joined"""
thread_join(self, *args, **kwargs)
if self._should_cover:
thread_coverage = self._coverage_queue.get()
ModuleCodeCollector._instance.absorb_data_json(thread_coverage)


def _patch_threading():
threading.Thread.__init__ = CoverageCollectingThread.__init__
threading.Thread._bootstrap_inner = CoverageCollectingThread._bootstrap_inner
threading.Thread.join = CoverageCollectingThread.join

0 comments on commit c035b91

Please sign in to comment.