Skip to content

Commit

Permalink
Merge pull request eugenp#17799 from balasr21/master
Browse files Browse the repository at this point in the history
BAEL-5972: added implementation for performing seek in Java
  • Loading branch information
davidmartinezbarua authored Oct 18, 2024
2 parents 99f4d52 + 84386f3 commit 07e7651
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 0 deletions.
13 changes: 13 additions & 0 deletions spring-kafka-4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,24 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${spring-kafka.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
Expand Down
13 changes: 13 additions & 0 deletions spring-kafka-4/src/main/java/com/baeldung/seek/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.baeldung.seek;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.baeldung.seek;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.AbstractConsumerSeekAware;
import org.springframework.stereotype.Component;

@Component
class ConsumerListener extends AbstractConsumerSeekAware {

public static final Map<String, String> MESSAGES = new HashMap<>();

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet()
.forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, false));
}

@KafkaListener(id = "test-seek", topics = "test-seek-topic")
public void listen(ConsumerRecord<String, String> in) {
MESSAGES.put(in.key(), in.value());
}
}
5 changes: 5 additions & 0 deletions spring-kafka-4/src/main/java/com/baeldung/seek/Response.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.baeldung.seek;

public record Response(int partition, long offset, String value) {

}
75 changes: 75 additions & 0 deletions spring-kafka-4/src/main/java/com/baeldung/seek/SeekController.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.baeldung.seek;

import java.time.Duration;
import java.util.Collections;
import java.util.Iterator;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/seek/api/v1/")
public class SeekController {

public static final String TOPIC_NAME = "test-topic";

private final DefaultKafkaConsumerFactory<String, String> consumerFactory;

public SeekController(DefaultKafkaConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}

@GetMapping("partition/{partition}/offset/{offset}")
public ResponseEntity<Response> getOneByPartitionAndOffset(@PathVariable("partition") int partition, @PathVariable("offset") int offset) {
try (KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
if (recordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = recordIterator.next();
Response response = new Response(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value());
return new ResponseEntity<>(response, HttpStatus.OK);
}
}
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}

@GetMapping("partition/{partition}/beginning")
public ResponseEntity<Response> getOneByPartitionToBeginningOffset(@PathVariable("partition") int partition) {
try (KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seekToBeginning(Collections.singleton(topicPartition));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator();
if (recordIterator.hasNext()) {
ConsumerRecord<String, String> consumerRecord = recordIterator.next();
Response response = new Response(consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value());
return new ResponseEntity<>(response, HttpStatus.OK);
}
}
return new ResponseEntity<>(HttpStatus.NOT_FOUND);
}

@GetMapping("partition/{partition}/end")
public ResponseEntity<Long> getOneByPartitionToEndOffset(@PathVariable("partition") int partition) {
try (KafkaConsumer<String, String> consumer = (KafkaConsumer<String, String>) consumerFactory.createConsumer()) {
TopicPartition topicPartition = new TopicPartition(TOPIC_NAME, partition);
consumer.assign(Collections.singletonList(topicPartition));
consumer.seekToEnd(Collections.singleton(topicPartition));
return new ResponseEntity<>(consumer.position(topicPartition), HttpStatus.OK);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.baeldung.seek;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;

@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class ConsumerListenerSeekLiveTest {

protected static ListAppender<ILoggingEvent> listAppender;

@Autowired
ConsumerListener consumerListener;

@Container
private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
private static KafkaProducer<String, String> testKafkaProducer;

@DynamicPropertySource
static void setProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
}

@BeforeAll
static void beforeAll() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
testKafkaProducer = new KafkaProducer<>(props);
IntStream.range(0, 5)
.forEach(m -> {
ProducerRecord<String, String> record = new ProducerRecord<>("test-seek-topic", 0, String.valueOf(m), "Message no : %s".formatted(m));
try {
testKafkaProducer.send(record)
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
testKafkaProducer.flush();
}

@Test
void givenKafkaBrokerExists_whenMessagesAreSent_thenLastMessageShouldBeRetrieved() {
Map<String, String> messages = consumerListener.MESSAGES;
Assertions.assertEquals(1, messages.size());
Assertions.assertEquals("Message no : 4", messages.get("4"));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.baeldung.seek;

import static com.baeldung.seek.SeekController.TOPIC_NAME;

import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.DynamicPropertyRegistry;
import org.springframework.test.context.DynamicPropertySource;
import org.springframework.test.web.reactive.server.WebTestClient;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@AutoConfigureWebTestClient
class SeekControllerLiveTest {

@Autowired
private WebTestClient webClient;

@Container
private static KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
private static KafkaProducer<String, String> testKafkaProducer;

@DynamicPropertySource
static void setProps(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", KAFKA::getBootstrapServers);
}

@BeforeAll
static void beforeAll() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.getBootstrapServers());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
testKafkaProducer = new KafkaProducer<>(props);
int partition = 0;
IntStream.range(0, 5)
.forEach(m -> {
String key = String.valueOf(new Random().nextInt());
String value = "Message no : %s".formatted(m);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partition, key, value);
try {
testKafkaProducer.send(record)
.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
}

@Test
void givenKafkaBrokerExists_whenSeekByPartitionAndOffset_thenMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/offset/2")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"partition\":0,\"offset\":2,\"value\":\"Message no : 2\"}");
}

@Test
void givenKafkaBrokerExists_whenSeekByBeginning_thenFirstMessageShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/beginning")
.exchange()
.expectStatus()
.isOk()
.expectBody(String.class)
.isEqualTo("{\"partition\":0,\"offset\":0,\"value\":\"Message no : 0\"}");
}

@Test
void givenKafkaBrokerExists_whenSeekByEnd_thenLatestOffsetShouldBeRetrieved() {
this.webClient.get()
.uri("/seek/api/v1/partition/0/end")
.exchange()
.expectStatus()
.isOk()
.expectBody(Long.class)
.isEqualTo(5L);
}

}

0 comments on commit 07e7651

Please sign in to comment.