Skip to content

Latest commit

 

History

History
138 lines (97 loc) · 3.19 KB

README.md

File metadata and controls

138 lines (97 loc) · 3.19 KB

Build Status Released Version

OpenTracing Apache Kafka Client Instrumentation

OpenTracing instrumentation for Apache Kafka Client

Requirements

  • Java 8
  • Scala 2.12
  • Kafka_2.12 0.11.0.1

Installation

Kafka Client

pom.xml

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-client</artifactId>
    <version>0.0.6</version>
</dependency>

Kafka Streams

pom.xml

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-streams</artifactId>
    <version>0.0.6</version>
</dependency>

Spring Kafka

pom.xml

<dependency>
    <groupId>io.opentracing.contrib</groupId>
    <artifactId>opentracing-kafka-spring</artifactId>
    <version>0.0.6</version>
</dependency>

Usage

// Instantiate tracer
Tracer tracer = ...

Kafka Client

// Instantiate KafkaProducer
KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(senderProps);

//Decorate KafkaProducer with TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingKafkaProducer = new TracingKafkaProducer<>(kafkaProducer, 
        tracer);

// Send
tracingKafkaProducer.send(...);

// Instantiate KafkaConsumer
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);

// Decorate KafkaConsumer with TracingKafkaConsumer
TracingKafkaConsumer<Integer, String> tracingKafkaConsumer = new TracingKafkaConsumer<>(kafkaConsumer, 
        tracer);

//Subscribe
tracingKafkaConsumer.subscribe(Collections.singletonList("messages"));

// Get records
ConsumerRecords<Integer, String> records = tracingKafkaConsumer.poll(1000);

// To retrieve SpanContext from polled record (Consumer side)
ConsumerRecord<Integer, String> record = ...
SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);

Kafka Streams

// Instantiate TracingKafkaClientSupplier
KafkaClientSupplier supplier = TracingKafkaClientSupplier(tracer);

// Provide supplier to KafkaStreams
KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(config), supplier);
streams.start();

Spring Kafka

// Declare Tracer bean
@Bean
public Tracer tracer() {
  return ...
}


// Decorate ConsumerFactory with TracingConsumerFactory
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
  return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps()), tracer());
}

// Decorate ProducerFactory with TracingProducerFactory
@Bean
public ProducerFactory<Integer, String> producerFactory() {
  return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer());
}

// Use decorated ProducerFactory in KafkaTemplate 
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
  return new KafkaTemplate<>(producerFactory());
}