This nice logo made by @l3r8yJ
Project architect: @h1alexbel
EO Kafka Producers and consumers for working with Apache Kafka message broker.
Read Kafka Producers and Consumers for Elegant Microservices, the blog post about eo-kafka
.
Motivation. We are not happy with Spring Kafka, because it is very procedural and not object-oriented. eo-kafka is suggesting to do almost exactly the same, but through objects.
Principles. These are the design principles behind eo-kafka.
How to use. All you need is this (get the latest version here):
Maven:
<dependency>
<groupId>io.github.eo-cqrs</groupId>
<artifactId>eo-kafka</artifactId>
</dependency>
Gradle:
dependencies {
compile 'io.github.eo-cqrs:eo-kafka:<version>'
}
To create Kafka Message:
Data<String> string =
new KfData<>(
"string-data", //data
"strings", //topic
1 //partition
);
To create Kafka Producer you can wrap original KafkaProducer:
KafkaProducer origin = ...;
Producer<String, String> producer = new KfProducer<>(origin);
Or construct it with KfFlexible:
final Producer<String, String> producer =
new KfProducer<>(
new KfFlexible<>(
new KfProducerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new KeySerializer("org.apache.kafka.common.serialization.StringSerializer"),
new ValueSerializer("org.apache.kafka.common.serialization.StringSerializer")
)
)
)
);
Or create it with XML file:
final Producer<String, String> producer =
new KfProducer<>(
new KfXmlFlexible<String, String>("producer.xml") // file with producer config
.producer()
);
btw, your XML file should be in the resources
look like:
<producer>
<bootstrapServers>localhost:9092</bootstrapServers>
<keySerializer>org.apache.kafka.common.serialization.StringSerializer</keySerializer>
<valueSerializer>org.apache.kafka.common.serialization.StringSerializer</valueSerializer>
</producer>
To send a message:
try (final Producer<String, String> producer = ...) {
producer.send(
"key2012",
new KfData<>(
"newRest28",
"orders",
1
)
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
To create Kafka Consumer you can wrap original KafkaConsumer:
KafkaConsumer origin = ...;
Consumer<String, String> producer = new KfConsumer<>(origin);
Using KfFlexible:
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers("localhost:9092"),
new GroupId("1"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
)
);
And XML File approach:
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfXmlFlexible<String, String>("consumer.xml")
.consumer()
);
Again, XML file should be in the resources
look like:
<consumer>
<bootstrapServers>localhost:9092</bootstrapServers>
<groupId>1</groupId>
<keyDeserializer>org.apache.kafka.common.serialization.StringDeserializer</keyDeserializer>
<valueDeserializer>org.apache.kafka.common.serialization.StringDeserializer</valueDeserializer>
</consumer>
Consuming messages:
try (
final Consumer<String, String> consumer =
new KfConsumer<>(
new KfFlexible<>(
new KfConsumerParams(
new KfParams(
new BootstrapServers(this.severs),
new GroupId("1"),
new AutoOffsetReset("earliest"),
new KeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer"),
new ValueDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
)
)
)
)
) {
// you need to be subscribed on a topic to iterate over data in the topic
consumer.subscribe(new ListOf<>("orders-saga-init")));
List<Dataized<String>> result = consumer.iterate("orders-saga-init", Duration.ofSeconds(5L));
}
}
Kafka Property | eo-kafka API | XML tag |
---|---|---|
bootstrap.servers |
BootstrapServers | bootstrapServers |
key.serializer |
KeySerializer | keySerializer |
value.serializer |
ValueSerializer | valueSerializer |
key.deserializer |
KeyDeserializer | keyDeserializer |
value.deserializer |
ValueDeserializer | valueDeserializer |
group.id |
GroupId | groupId |
auto.offset.reset |
AutoOffsetReset | autoOffsetReset |
client.id |
ClientId | clientId |
acks |
Acks | acks |
security.protocol |
SecurityProtocol | securityProtocol |
sasl.jaas.config |
SaslJaasConfig | saslJaasConfig |
sasl.mechanism |
SaslMechanism | saslMechanism |
batch.size |
BatchSize | batchSize |
buffer.memory |
BufferMemory | bufferMemory |
linger.ms |
LingerMs | lingerMs |
retries |
Retries | retries |
retry.backoff.ms |
RetryBackoffMs | retryBackoffMs |
compression.type |
CompressionType | compressionType |
partition.assignment.strategy |
PartitionAssignmentStrategy | partitionAssignmentStrategy |
max.poll.records |
MaxPollRecords | maxPollRecords |
heartbeat.interval.ms |
HeartbeatIntervalMs | heartbeatIntervalMs |
enable.auto.commit |
EnableAutoCommit | enableAutoCommit |
session.timeout.ms |
SessionTimeoutMs | sessionTimeoutMs |
max.partition.fetch.bytes |
MaxPartitionFetchBytes | maxPartitionFetchBytes |
fetch.max.wait.ms |
FetchMaxWaitMs | fetchMaxWaitMs |
fetch.min.bytes |
FetchMinBytes | fetchMinBytes |
send.buffer.bytes |
SendBufferBytes | sendBufferBytes |
receive.buffer.bytes |
ReceiveBufferBytes | receiveBufferBytes |
max.block.ms |
MaxBlockMs | maxBlockMs |
max.request.size |
MaxRqSize | maxRequestSize |
group.instance.id |
GroupInstanceId | groupInstanceId |
max.in.flight.requests.per.connection |
MaxInFlightRq | maxInFlightRequestsPerConnection |
Fork repository, make changes, send us a pull request.
We will review your changes and apply them to the master
branch shortly,
provided they don't violate our quality standards. To avoid frustration,
before sending us your pull request please run full Maven build:
$ mvn clean install
You will need Maven 3.8.7+ and Java 17+.
If you want to contribute to the next release version of eo-kafka, please check the project board.
Our rultor image for CI/CD.