-
Notifications
You must be signed in to change notification settings - Fork 0
/
nicer_consumer.py
73 lines (55 loc) · 2.39 KB
/
nicer_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio
import sys
from elasticapm.base import DummyClient as APMClient
from elasticapm.traces import Transaction, execution_context
from elasticapm.utils.disttracing import TraceParent
from redispatcher import BaseConsumer
"""
When we use redispatcher, we create a new BaseConsumer all of our
consumers inherit from. This will implement base behavior across
our monorepo that is specific to that project. Things like
* tracing
* logging
* retry logic
"""
class OurBaseConsumer(BaseConsumer):
apm_client: APMClient
# This is sent along with every message. We use it for APM tracing
class Headers(BaseConsumer.Headers):
traceparent: str
def __init__(self):
super().__init__()
self.apm_client = APMClient({"SERVICE_NAME": self.QUEUE})
# Implement custom headers, we use elastic-apm to pass on the traceparent
@classmethod
def headers(cls) -> Headers:
transaction: Transaction = execution_context.get_transaction()
trace_parent: TraceParent = transaction.trace_parent
return cls.Headers(traceparent=trace_parent.to_string())
# We stub this out and where the individual consumer logic will live
async def run(self, message: BaseConsumer.Message):
raise NotImplementedError
# Define logic that will run for all our consumers
async def process_message(self, message: BaseConsumer.Message, headers: Headers):
# Start a trace
self.apm_client.begin_transaction("redispatcher", TraceParent.from_string(headers.traceparent))
result = "OK"
# Try to run, have some custom exception logic too like finishing trace result
try:
await self.run(message)
except Exception:
self.apm_client.capture_exception(exc_info=sys.exc_info())
result = "ERROR"
finally:
self.apm_client.end_transaction(self.QUEUE, result=result)
# Subclasses to use the custom shared loging in OurBaseConsumer
class NicerConsumer(OurBaseConsumer):
# This defines the key on which our redis messager queue will live on
QUEUE = "redispatcher-nice-consumer"
class Message(OurBaseConsumer.Message):
yeet: str
async def run(self, message: Message):
print(f"nice consumer processing message {message}")
# lets imitate some IO blocking operation
await asyncio.sleep(2)
print(f"nice consumer done processing message")