Skip to content

EO Kafka Producers and consumers for working with Apache Kafka message broker

License

Notifications You must be signed in to change notification settings

NikiTuz18/eo-kafka

 
 

Repository files navigation

logo

This nice logo made by @l3r8yJ

Managed By Self XDSD

EO principles respected here DevOps By Rultor.com We recommend IntelliJ IDEA

mvn maven central codecov

Hits-of-Code Lines-of-Code PDD status License

Project architect: @h1alexbel

EO Kafka Producers and consumers for working with Apache Kafka message broker.

Read Kafka Producers and Consumers for Elegant Microservices, the blog post about eo-kafka.

Motivation. We are not happy with Spring Kafka, because it is very procedural and not object-oriented. eo-kafka is suggesting to do almost exactly the same, but through objects.

Principles. These are the design principles behind eo-kafka.

How to use. All you need is this (get the latest version here):

Maven:

<dependency>
  <groupId>io.github.eo-cqrs</groupId>
  <artifactId>eo-kafka</artifactId>
</dependency>

Gradle:

dependencies {
    compile 'io.github.eo-cqrs:eo-kafka:<version>'
}

Messages API

To create Kafka Message:

Data<String> string =
  new KfData<>(
    "string-data",          //data
    "strings",              //topic
    1                       //partition
  );

Producer API

To create Kafka Producer you can wrap original KafkaProducer:

KafkaProducer origin = ...;
Producer<String, String> producer = new KfProducer<>(origin);

Or construct it with KfFlexible:

final Producer<String, String> producer =
  new KfProducer<>(
    new KfFlexible<>(
      new KfProducerParams(
        new KfParams(
          new BootstrapServers("localhost:9092"),
          new KeySerializer("org.apache.kafka.common.serialization.StringSerializer"),
          new ValueSerializer("org.apache.kafka.common.serialization.StringSerializer")
        )
      )
    )
  );

Or create it with XML file:

final Producer<String, String> producer =
  new KfProducer<>(
    new KfXmlFlexible<String, String>("producer.xml") // file with producer config
       .producer()
  );

btw, your XML file should be in the resources look like:

<producer>
  <bootstrapServers>localhost:9092</bootstrapServers>
  <keySerializer>org.apache.kafka.common.serialization.StringSerializer</keySerializer>
  <valueSerializer>org.apache.kafka.common.serialization.StringSerializer</valueSerializer>
</producer>

To send a message:

try (final Producer<String, String> producer = ...) {
      producer.send(
        "key2012",
        new KfData<>(
          "newRest28",
          "orders",
          1
        )
      );
    } catch (Exception e) {
        throw new IllegalStateException(e);
  }
}

Consumer API

To create Kafka Consumer you can wrap original KafkaConsumer:

KafkaConsumer origin = ...;
Consumer<String, String> producer = new KfConsumer<>(origin);

Using KfFlexible:

final Consumer<String, String> consumer =
  new KfConsumer<>(
    new KfFlexible<>(
      new KfConsumerParams(
        new KfParams(
          new BootstrapServers("localhost:9092"),
          new GroupId("1"),
          new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
          new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
        )
      )
    )
  );

And XML File approach:

final Consumer<String, String> consumer =
  new KfConsumer<>(
    new KfXmlFlexible<String, String>("consumer.xml")
      .consumer()
  );

Again, XML file should be in the resources look like:

<consumer>
  <bootstrapServers>localhost:9092</bootstrapServers>
  <groupId>1</groupId>
  <keyDeserializer>org.apache.kafka.common.serialization.StringDeserializer</keyDeserializer>
  <valueDeserializer>org.apache.kafka.common.serialization.StringDeserializer</valueDeserializer>
</consumer>

Consuming messages:

try (
  final Consumer<String, String> consumer =
      new KfConsumer<>(
        new KfFlexible<>(
          new KfConsumerParams(
            new KfParams(
              new BootstrapServers(this.severs),
              new GroupId("1"),
              new AutoOffsetReset("earliest"),
              new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
              new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
            )
          )
        )
      )
  ) {
  // you need to be subscribed on a topic to iterate over data in the topic
      consumer.subscribe(new ListOf<>("orders-saga-init")));
      List<Dataized<String>> result = consumer.iterate("orders-saga-init", Duration.ofSeconds(5L));
    }
  }

Config API

Kafka Property eo-kafka API XML tag
bootstrap.servers BootstrapServers bootstrapServers
key.serializer KeySerializer keySerializer
value.serializer ValueSerializer valueSerializer
key.deserializer KeyDeserializer keyDeserializer
value.deserializer ValueDeserializer valueDeserializer
group.id GroupId groupId
auto.offset.reset AutoOffsetReset autoOffsetReset
client.id ClientId clientId
acks Acks acks
security.protocol SecurityProtocol securityProtocol
sasl.jaas.config SaslJaasConfig saslJaasConfig
sasl.mechanism SaslMechanism saslMechanism
batch.size BatchSize batchSize
buffer.memory BufferMemory bufferMemory
linger.ms LingerMs lingerMs
retries Retries retries
retry.backoff.ms RetryBackoffMs retryBackoffMs
compression.type CompressionType compressionType
partition.assignment.strategy PartitionAssignmentStrategy partitionAssignmentStrategy
max.poll.records MaxPollRecords maxPollRecords
heartbeat.interval.ms HeartbeatIntervalMs heartbeatIntervalMs
enable.auto.commit EnableAutoCommit enableAutoCommit
session.timeout.ms SessionTimeoutMs sessionTimeoutMs
max.partition.fetch.bytes MaxPartitionFetchBytes maxPartitionFetchBytes
fetch.max.wait.ms FetchMaxWaitMs fetchMaxWaitMs
fetch.min.bytes FetchMinBytes fetchMinBytes
send.buffer.bytes SendBufferBytes sendBufferBytes
receive.buffer.bytes ReceiveBufferBytes receiveBufferBytes
max.block.ms MaxBlockMs maxBlockMs
max.request.size MaxRqSize maxRequestSize
group.instance.id GroupInstanceId groupInstanceId
max.in.flight.requests.per.connection MaxInFlightRq maxInFlightRequestsPerConnection

How to Contribute

Fork repository, make changes, send us a pull request. We will review your changes and apply them to the master branch shortly, provided they don't violate our quality standards. To avoid frustration, before sending us your pull request please run full Maven build:

$ mvn clean install

You will need Maven 3.8.7+ and Java 17+.

If you want to contribute to the next release version of eo-kafka, please check the project board.

Our rultor image for CI/CD.

About

EO Kafka Producers and consumers for working with Apache Kafka message broker

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Java 99.4%
  • Groovy 0.6%