Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support distributed tracing #38

Open
sliemeobn opened this issue Oct 26, 2023 · 6 comments
Open

Support distributed tracing #38

sliemeobn opened this issue Oct 26, 2023 · 6 comments
Labels
enhancement New feature or request

Comments

@sliemeobn
Copy link
Contributor

sliemeobn commented Oct 26, 2023

I added a tiny wrapper around this library to participate in distributed tracing, works like a charm with nodejs services (communicating with swift services through rabbitmq.)

Ideally, this can come out of the box. We'd either need some sort of config for it or maybe a middleware system?

Here is what I use:

struct AMQPHeaderPropagation: Extractor, Injector {
    typealias Carrier = Table?

    func extract(key: String, from carrier: Carrier) -> String? { carrier?[key]?.asString }

    func inject(_ value: String, forKey key: String, into carrier: inout Carrier) {
        carrier = carrier ?? Table()
        carrier![key] = .longString(value)
    }
}

private extension Field {
    var asString: String? {
        switch self {
        case let .longString(s): return s
        // NOTE: maybe support more data types
        default: return nil
        }
    }
}

extension ServiceContext {
    var asAqmpHeaders: Table? {
        var headers: Table?
        InstrumentationSystem.instrument.inject(self, into: &headers, using: AMQPHeaderPropagation())
        return headers
    }
}

extension AMQPResponse.Channel.Message.Delivery {
    var serviceContext: ServiceContext {
        var context = ServiceContext.topLevel
        InstrumentationSystem.instrument.extract(properties.headers, into: &context, using: AMQPHeaderPropagation())
        return context
    }
}
@sliemeobn
Copy link
Contributor Author

Btw: When implementing it I stumbled over this spec stating that the traceparent must be some binary thing.

All I can say is that I used strings and it works just fine with the opentelemetry package for amqplib. So that might be a bit sticky situation to consider.

@funcmike funcmike added the enhancement New feature or request label Aug 20, 2024
@tweidema
Copy link

tweidema commented Aug 28, 2024

I would like to second this suggestion. Having distributed tracing integrated would be very nice.
@sliemeobn I would be grateful for at few pointers wrt how to integrate your code above with Swift tracing/swift-otel. I am new to both swift and distributed tracing but have a working example with http from a swift client (using AsyncHTTPClient with retrofitted distributed tracing built from https://github.com/bok-/devworld24-tracing-demo.git) to a Hummingbird server (which supports distributed tracing out of the box). Traces sent to Tempo and visualized with Grafana.

@sliemeobn
Copy link
Contributor Author

@tweidema it sounds like you have it all setup then, I am not quite sure for which part are you looking for pointers.

I guess the only piece missing are the actual spans, I use the following for now (not sure how otel spec-compliant it is though):

extension AMQPChannel {
    func publish(_ buffer: ByteBuffer, to exchange: String, with routingKey: String) async throws {
        try await withSpan("\(exchange) > \(routingKey) publish", ofKind: .producer) { span in
            span.attributes = [
                "messaging.system": "rabbitmq",
                "messaging.destination": .string(exchange),
                "messaging.rabbitmq.routing_key": .string(routingKey),
            ]

            try await basicPublish(
                from: buffer, exchange: exchange, routingKey: routingKey,
                properties: .init(
                    headers: ServiceContext.current?.asAqmpHeaders,
                    deliveryMode: .deliveryModePersistent,
                    expiration: "\(SharelogBrokerDefaults.DEFAULT_ACKNOWLEDGE_EXPIRATION)",
                    type: SharelogBrokerDefaults.MESSAGE_TYPE_PUBSUB
                )
            )
        }
    }
extension BrokerMessageReponder { // <- my type that handles messages
    @discardableResult
    func handleDelivery(_ delivery: AMQPResponse.Channel.Message.Delivery, from channel: AMQPChannel, logger: Logger) async -> HandleDeliveryResult {
        await withSpan("\(delivery.exchange) \(delivery.routingKey) process", context: delivery.serviceContext, ofKind: .consumer) { span in
            span.attributes = [
                "messaging.system": "rabbitmq",
                "messaging.destination": .string(delivery.exchange),
                "messaging.rabbitmq.routing_key": .string(delivery.routingKey),
            ]
            span.attributes["messaging.conversation_id"] = delivery.properties.correlationID

            // ... process message ...
        }
    }
}

@tweidema
Copy link

tweidema commented Sep 1, 2024

@sliemeobn Thank you very much. That helped me get the span-headers passed back and forth. Still missing a piece reg. how to get the span-context on the receiving side set from those headers. So I get two distinct traces - one from the client and one from the server - they are not being seen as one trace.
I can guess it has something to do with the extension you have on AMQPResponse.Channel.Message.Delivery, just can't figure out how to plug that in.

@sliemeobn
Copy link
Contributor Author

in the snippet I posted above you see where the service context stored in the headers on publish here

headers: ServiceContext.current?.asAqmpHeaders,

and on the receiving side, the span is initialized with the context extracted from the message here

await withSpan("\(delivery.exchange) \(delivery.routingKey) process", context: delivery.serviceContext, ofKind: .consumer) { span in

this should forward the traceId as part of the serviceContext

@tweidema
Copy link

@sliemeobn Finally got around to looking at this again, and that was the piece I was missing. Have it working now! Thanks so much for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants