forked from DataDog/dd-trace-py
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_writer.py
98 lines (77 loc) · 2.74 KB
/
test_writer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from unittest import TestCase
from ddtrace.span import Span
from ddtrace.writer import AsyncWorker, Q
class RemoveAllFilter():
def __init__(self):
self.filtered_traces = 0
def process_trace(self, trace):
self.filtered_traces += 1
return None
class KeepAllFilter():
def __init__(self):
self.filtered_traces = 0
def process_trace(self, trace):
self.filtered_traces += 1
return trace
class AddTagFilter():
def __init__(self, tag_name):
self.tag_name = tag_name
self.filtered_traces = 0
def process_trace(self, trace):
self.filtered_traces += 1
for span in trace:
span.set_tag(self.tag_name, "A value")
return trace
class DummmyAPI():
def __init__(self):
self.traces = []
def send_traces(self, traces):
for trace in traces:
self.traces.append(trace)
N_TRACES = 11
class AsyncWorkerTests(TestCase):
def setUp(self):
self.api = DummmyAPI()
self.traces = Q()
self.services = Q()
for i in range(N_TRACES):
self.traces.add([
Span(tracer=None, name="name", trace_id=i, span_id=j, parent_id=j - 1 or None)
for j in range(7)
])
def test_filters_keep_all(self):
filtr = KeepAllFilter()
filters = [filtr]
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
worker.stop()
worker.join()
self.assertEqual(len(self.api.traces), N_TRACES)
self.assertEqual(filtr.filtered_traces, N_TRACES)
def test_filters_remove_all(self):
filtr = RemoveAllFilter()
filters = [filtr]
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
worker.stop()
worker.join()
self.assertEqual(len(self.api.traces), 0)
self.assertEqual(filtr.filtered_traces, N_TRACES)
def test_filters_add_tag(self):
tag_name = "Tag"
filtr = AddTagFilter(tag_name)
filters = [filtr]
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
worker.stop()
worker.join()
self.assertEqual(len(self.api.traces), N_TRACES)
self.assertEqual(filtr.filtered_traces, N_TRACES)
for trace in self.api.traces:
for span in trace:
self.assertIsNotNone(span.get_tag(tag_name))
def test_filters_short_circuit(self):
filtr = KeepAllFilter()
filters = [RemoveAllFilter(), filtr]
worker = AsyncWorker(self.api, self.traces, self.services, filters=filters)
worker.stop()
worker.join()
self.assertEqual(len(self.api.traces), 0)
self.assertEqual(filtr.filtered_traces, 0)