Skip to content

Commit 14a098b

Browse files
authored
KAFKA-17600: Add nextOffsets to the ConsumerRecords (apache#17414)
This PR implements KIP-1094. Reviewers: Andrew Schofield <[email protected]>, Kirk True <[email protected]>, Lucas Brutschy <[email protected]>
1 parent 9d65ff8 commit 14a098b

File tree

22 files changed

+340
-51
lines changed

22 files changed

+340
-51
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,19 @@
3232
* partition returned by a {@link Consumer#poll(java.time.Duration)} operation.
3333
*/
3434
public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
35-
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Collections.emptyMap());
35+
public static final ConsumerRecords<Object, Object> EMPTY = new ConsumerRecords<>(Map.of(), Map.of());
3636

3737
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
38+
private final Map<TopicPartition, OffsetAndMetadata> nextOffsets;
3839

40+
@Deprecated
3941
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
42+
this(records, Map.of());
43+
}
44+
45+
public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records, final Map<TopicPartition, OffsetAndMetadata> nextOffsets) {
4046
this.records = records;
47+
this.nextOffsets = Map.copyOf(nextOffsets);
4148
}
4249

4350
/**
@@ -53,6 +60,14 @@ public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
5360
return Collections.unmodifiableList(recs);
5461
}
5562

63+
/**
64+
* Get the next offsets and metadata corresponding to all topic partitions for which the position have been advanced in this poll call
65+
* @return the next offsets that the consumer will consume
66+
*/
67+
public Map<TopicPartition, OffsetAndMetadata> nextOffsets() {
68+
return nextOffsets;
69+
}
70+
5671
/**
5772
* Get just the records for the given topic
5873
*/

clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
235235

236236
// update the consumed offset
237237
final Map<TopicPartition, List<ConsumerRecord<K, V>>> results = new HashMap<>();
238+
final Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata = new HashMap<>();
238239
final List<TopicPartition> toClear = new ArrayList<>();
239240

240241
for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : this.records.entrySet()) {
@@ -253,14 +254,15 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
253254
SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(
254255
rec.offset() + 1, rec.leaderEpoch(), leaderAndEpoch);
255256
subscriptions.position(entry.getKey(), newPosition);
257+
nextOffsetAndMetadata.put(entry.getKey(), new OffsetAndMetadata(rec.offset() + 1, rec.leaderEpoch(), ""));
256258
}
257259
}
258260
toClear.add(entry.getKey());
259261
}
260262
}
261263

262264
toClear.forEach(records::remove);
263-
return new ConsumerRecords<>(results);
265+
return new ConsumerRecords<>(results, nextOffsetAndMetadata);
264266
}
265267

266268
public synchronized void addRecord(ConsumerRecord<K, V> record) {

clients/src/main/java/org/apache/kafka/clients/consumer/MockShareConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public synchronized ConsumerRecords<K, V> poll(Duration timeout) {
9191
}
9292

9393
records.clear();
94-
return new ConsumerRecords<>(results);
94+
return new ConsumerRecords<>(results, Map.of());
9595
}
9696

9797
@Override

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,7 @@ public ConsumerRecords<K, V> poll(final Duration timeout) {
713713
+ "since the consumer's position has advanced for at least one topic partition");
714714
}
715715

716-
return interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
716+
return interceptors.onConsume(new ConsumerRecords<>(fetch.records(), fetch.nextOffsets()));
717717
}
718718
// We will wait for retryBackoffMs
719719
} while (timer.notExpired());

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ private ConsumerRecords<K, V> poll(final Timer timer) {
637637
+ "since the consumer's position has advanced for at least one topic partition");
638638
}
639639

640-
return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records()));
640+
return this.interceptors.onConsume(new ConsumerRecords<>(fetch.records(), fetch.nextOffsets()));
641641
}
642642
} while (timer.notExpired());
643643

clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.clients.consumer.internals;
1818

1919
import org.apache.kafka.clients.consumer.ConsumerRecord;
20+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2021
import org.apache.kafka.common.TopicPartition;
2122

2223
import java.util.ArrayList;
@@ -33,30 +34,35 @@ public class Fetch<K, V> {
3334
private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
3435
private boolean positionAdvanced;
3536
private int numRecords;
37+
private Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata;
3638

3739
public static <K, V> Fetch<K, V> empty() {
38-
return new Fetch<>(new HashMap<>(), false, 0);
40+
return new Fetch<>(new HashMap<>(), false, 0, new HashMap<>());
3941
}
4042

4143
public static <K, V> Fetch<K, V> forPartition(
4244
TopicPartition partition,
4345
List<ConsumerRecord<K, V>> records,
44-
boolean positionAdvanced
46+
boolean positionAdvanced,
47+
OffsetAndMetadata nextOffsetAndMetadata
4548
) {
4649
Map<TopicPartition, List<ConsumerRecord<K, V>>> recordsMap = records.isEmpty()
4750
? new HashMap<>()
4851
: mkMap(mkEntry(partition, records));
49-
return new Fetch<>(recordsMap, positionAdvanced, records.size());
52+
Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadataMap = mkMap(mkEntry(partition, nextOffsetAndMetadata));
53+
return new Fetch<>(recordsMap, positionAdvanced, records.size(), nextOffsetAndMetadataMap);
5054
}
5155

5256
private Fetch(
5357
Map<TopicPartition, List<ConsumerRecord<K, V>>> records,
5458
boolean positionAdvanced,
55-
int numRecords
59+
int numRecords,
60+
Map<TopicPartition, OffsetAndMetadata> nextOffsetAndMetadata
5661
) {
5762
this.records = records;
5863
this.positionAdvanced = positionAdvanced;
5964
this.numRecords = numRecords;
65+
this.nextOffsetAndMetadata = nextOffsetAndMetadata;
6066
}
6167

6268
/**
@@ -70,6 +76,7 @@ public void add(Fetch<K, V> fetch) {
7076
Objects.requireNonNull(fetch);
7177
addRecords(fetch.records);
7278
this.positionAdvanced |= fetch.positionAdvanced;
79+
this.nextOffsetAndMetadata.putAll(fetch.nextOffsetAndMetadata);
7380
}
7481

7582
/**
@@ -95,6 +102,13 @@ public int numRecords() {
95102
return numRecords;
96103
}
97104

105+
/**
106+
* @return the next offsets and metadata that the consumer will consume (last epoch is included)
107+
*/
108+
public Map<TopicPartition, OffsetAndMetadata> nextOffsets() {
109+
return Map.copyOf(nextOffsetAndMetadata);
110+
}
111+
98112
/**
99113
* @return {@code true} if and only if this fetch did not return any user-visible (i.e., non-control) records, and
100114
* did not cause the consumer position to advance for any topic partitions

clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.consumer.ConsumerConfig;
2020
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2122
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
2223
import org.apache.kafka.common.KafkaException;
2324
import org.apache.kafka.common.TopicPartition;
@@ -195,7 +196,7 @@ private Fetch<K, V> fetchRecords(final CompletedFetch nextInLineFetch, int maxRe
195196
metricsManager.recordPartitionLead(tp, lead);
196197
}
197198

198-
return Fetch.forPartition(tp, partRecords, positionAdvanced);
199+
return Fetch.forPartition(tp, partRecords, positionAdvanced, new OffsetAndMetadata(nextInLineFetch.nextFetchOffset(), nextInLineFetch.lastEpoch(), ""));
199200
} else {
200201
// these records aren't next in line based on the last consumed position, ignore them
201202
// they must be from an obsolete request

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -580,7 +580,7 @@ public synchronized ConsumerRecords<K, V> poll(final Duration timeout) {
580580
final ShareFetch<K, V> fetch = pollForFetches(timer);
581581
if (!fetch.isEmpty()) {
582582
currentFetch = fetch;
583-
return new ConsumerRecords<>(fetch.records());
583+
return new ConsumerRecords<>(fetch.records(), Map.of());
584584
}
585585

586586
metadata.maybeThrowAnyException();

clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerRecordsTest.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Arrays;
2727
import java.util.Collection;
2828
import java.util.Collections;
29+
import java.util.HashMap;
2930
import java.util.Iterator;
3031
import java.util.LinkedHashMap;
3132
import java.util.List;
@@ -81,6 +82,7 @@ public void testRecordsByPartition() {
8182

8283
ConsumerRecords<Integer, String> consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
8384

85+
assertEquals(partitionSize * topics.size(), consumerRecords.nextOffsets().size());
8486
for (String topic : topics) {
8587
for (int partition = 0; partition < partitionSize; partition++) {
8688
TopicPartition topicPartition = new TopicPartition(topic, partition);
@@ -90,6 +92,8 @@ public void testRecordsByPartition() {
9092
assertTrue(records.isEmpty());
9193
} else {
9294
assertEquals(recordSize, records.size());
95+
final ConsumerRecord<Integer, String> lastRecord = records.get(recordSize - 1);
96+
assertEquals(new OffsetAndMetadata(lastRecord.offset() + 1, lastRecord.leaderEpoch(), ""), consumerRecords.nextOffsets().get(topicPartition));
9397
for (int i = 0; i < records.size(); i++) {
9498
ConsumerRecord<Integer, String> record = records.get(i);
9599
validateRecordPayload(topic, record, partition, i, recordSize);
@@ -117,6 +121,8 @@ public void testRecordsByTopic() {
117121

118122
ConsumerRecords<Integer, String> consumerRecords = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, topics);
119123

124+
assertEquals(partitionSize * topics.size(), consumerRecords.nextOffsets().size());
125+
120126
for (String topic : topics) {
121127
Iterable<ConsumerRecord<Integer, String>> records = consumerRecords.records(topic);
122128
int recordCount = 0;
@@ -156,6 +162,7 @@ public void testRecordsAreImmutable() {
156162
ConsumerRecords<Integer, String> records = buildTopicTestRecords(recordSize, partitionSize, emptyPartitionIndex, Collections.singleton(topic));
157163
ConsumerRecords<Integer, String> emptyRecords = ConsumerRecords.empty();
158164

165+
assertEquals(partitionSize, records.nextOffsets().size());
159166
// check records(TopicPartition) / partitions by add method
160167
// check iterator / records(String) by remove method
161168
// check data count after all operations
@@ -178,6 +185,7 @@ private ConsumerRecords<Integer, String> buildTopicTestRecords(int recordSize,
178185
int emptyPartitionIndex,
179186
Collection<String> topics) {
180187
Map<TopicPartition, List<ConsumerRecord<Integer, String>>> partitionToRecords = new LinkedHashMap<>();
188+
Map<TopicPartition, OffsetAndMetadata> nextOffsets = new HashMap<>();
181189
for (String topic : topics) {
182190
for (int i = 0; i < partitionSize; i++) {
183191
List<ConsumerRecord<Integer, String>> records = new ArrayList<>(recordSize);
@@ -189,11 +197,13 @@ private ConsumerRecords<Integer, String> buildTopicTestRecords(int recordSize,
189197
);
190198
}
191199
}
192-
partitionToRecords.put(new TopicPartition(topic, i), records);
200+
final TopicPartition tp = new TopicPartition(topic, i);
201+
partitionToRecords.put(tp, records);
202+
nextOffsets.put(tp, new OffsetAndMetadata(recordSize, Optional.empty(), ""));
193203
}
194204
}
195205

196-
return new ConsumerRecords<>(partitionToRecords);
206+
return new ConsumerRecords<>(partitionToRecords, nextOffsets);
197207
}
198208

199209
private void validateEmptyPartition(ConsumerRecord<Integer, String> record, int emptyPartitionIndex) {

0 commit comments

Comments
 (0)