-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support distributed tracing #38
Comments
Btw: When implementing it I stumbled over this spec stating that the traceparent must be some binary thing. All I can say is that I used strings and it works just fine with the opentelemetry package for amqplib. So that might be a bit sticky situation to consider. |
I would like to second this suggestion. Having distributed tracing integrated would be very nice. |
@tweidema it sounds like you have it all setup then, I am not quite sure for which part are you looking for pointers. I guess the only piece missing are the actual spans, I use the following for now (not sure how otel spec-compliant it is though): extension AMQPChannel {
func publish(_ buffer: ByteBuffer, to exchange: String, with routingKey: String) async throws {
try await withSpan("\(exchange) > \(routingKey) publish", ofKind: .producer) { span in
span.attributes = [
"messaging.system": "rabbitmq",
"messaging.destination": .string(exchange),
"messaging.rabbitmq.routing_key": .string(routingKey),
]
try await basicPublish(
from: buffer, exchange: exchange, routingKey: routingKey,
properties: .init(
headers: ServiceContext.current?.asAqmpHeaders,
deliveryMode: .deliveryModePersistent,
expiration: "\(SharelogBrokerDefaults.DEFAULT_ACKNOWLEDGE_EXPIRATION)",
type: SharelogBrokerDefaults.MESSAGE_TYPE_PUBSUB
)
)
}
}
extension BrokerMessageReponder { // <- my type that handles messages
@discardableResult
func handleDelivery(_ delivery: AMQPResponse.Channel.Message.Delivery, from channel: AMQPChannel, logger: Logger) async -> HandleDeliveryResult {
await withSpan("\(delivery.exchange) \(delivery.routingKey) process", context: delivery.serviceContext, ofKind: .consumer) { span in
span.attributes = [
"messaging.system": "rabbitmq",
"messaging.destination": .string(delivery.exchange),
"messaging.rabbitmq.routing_key": .string(delivery.routingKey),
]
span.attributes["messaging.conversation_id"] = delivery.properties.correlationID
// ... process message ...
}
}
} |
@sliemeobn Thank you very much. That helped me get the span-headers passed back and forth. Still missing a piece reg. how to get the span-context on the receiving side set from those headers. So I get two distinct traces - one from the client and one from the server - they are not being seen as one trace. |
in the snippet I posted above you see where the service context stored in the headers: ServiceContext.current?.asAqmpHeaders, and on the receiving side, the span is initialized with the context extracted from the message here await withSpan("\(delivery.exchange) \(delivery.routingKey) process", context: delivery.serviceContext, ofKind: .consumer) { span in this should forward the traceId as part of the |
@sliemeobn Finally got around to looking at this again, and that was the piece I was missing. Have it working now! Thanks so much for your help. |
I added a tiny wrapper around this library to participate in distributed tracing, works like a charm with nodejs services (communicating with swift services through rabbitmq.)
Ideally, this can come out of the box. We'd either need some sort of config for it or maybe a middleware system?
Here is what I use:
The text was updated successfully, but these errors were encountered: