OpenTracing instrumentation for Apache Kafka Client
- Java 8
- Scala 2.12
- Kafka_2.12 0.11.0.1
pom.xml
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-client</artifactId>
<version>0.0.6</version>
</dependency>
pom.xml
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-streams</artifactId>
<version>0.0.6</version>
</dependency>
pom.xml
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-spring</artifactId>
<version>0.0.6</version>
</dependency>
// Instantiate tracer
Tracer tracer = ...
// Instantiate KafkaProducer
KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(senderProps);
//Decorate KafkaProducer with TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingKafkaProducer = new TracingKafkaProducer<>(kafkaProducer,
tracer);
// Send
tracingKafkaProducer.send(...);
// Instantiate KafkaConsumer
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
// Decorate KafkaConsumer with TracingKafkaConsumer
TracingKafkaConsumer<Integer, String> tracingKafkaConsumer = new TracingKafkaConsumer<>(kafkaConsumer,
tracer);
//Subscribe
tracingKafkaConsumer.subscribe(Collections.singletonList("messages"));
// Get records
ConsumerRecords<Integer, String> records = tracingKafkaConsumer.poll(1000);
// To retrieve SpanContext from polled record (Consumer side)
ConsumerRecord<Integer, String> record = ...
SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
// Instantiate TracingKafkaClientSupplier
KafkaClientSupplier supplier = TracingKafkaClientSupplier(tracer);
// Provide supplier to KafkaStreams
KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(config), supplier);
streams.start();
// Declare Tracer bean
@Bean
public Tracer tracer() {
return ...
}
// Decorate ConsumerFactory with TracingConsumerFactory
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps()), tracer());
}
// Decorate ProducerFactory with TracingProducerFactory
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer());
}
// Use decorated ProducerFactory in KafkaTemplate
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}