Skip to content

Commit

Permalink
KAFKA-9507; AdminClient should check for missing committed offsets (a…
Browse files Browse the repository at this point in the history
…pache#8057)

Addresses exception being thrown by `AdminClient` when `listConsumerGroupOffsets` returns a negative offset. A negative offset indicates the absence of a committed offset for a requested partition, and should result in a null in the returned offset map.

Reviewers: Anna Povzner <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
splett2 authored Feb 8, 2020
1 parent be4a6dd commit 7a2a198
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3100,7 +3100,12 @@ void handleResponse(AbstractResponse abstractResponse) {
final Long offset = partitionData.offset;
final String metadata = partitionData.metadata;
final Optional<Integer> leaderEpoch = partitionData.leaderEpoch;
groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
// Negative offset indicates that the group has no committed offset for this partition
if (offset < 0) {
groupOffsetsListing.put(topicPartition, null);
} else {
groupOffsetsListing.put(topicPartition, new OffsetAndMetadata(offset, leaderEpoch, metadata));
}
} else {
log.warn("Skipping return offset for {} due to error {}.", topicPartition, error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class ListConsumerGroupOffsetsResult {

/**
* Return a future which yields a map of topic partitions to OffsetAndMetadata objects.
* If the group does not have a committed offset for this partition, the corresponding value in the returned map will be null.
*/
public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> partitionsToOffsetAndMetadata() {
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,7 @@ public void testDescribeConsumerGroupOffsets() throws Exception {
TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 0);
TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 1);
TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 2);
TopicPartition myTopicPartition3 = new TopicPartition("my_topic", 3);

final Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = new HashMap<>();
responseData.put(myTopicPartition0, new OffsetFetchResponse.PartitionData(10,
Expand All @@ -1502,15 +1503,19 @@ public void testDescribeConsumerGroupOffsets() throws Exception {
Optional.empty(), "", Errors.NONE));
responseData.put(myTopicPartition2, new OffsetFetchResponse.PartitionData(20,
Optional.empty(), "", Errors.NONE));
responseData.put(myTopicPartition3, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
Optional.empty(), "", Errors.NONE));
env.kafkaClient().prepareResponse(new OffsetFetchResponse(Errors.NONE, responseData));

final ListConsumerGroupOffsetsResult result = env.adminClient().listConsumerGroupOffsets("group-0");
final Map<TopicPartition, OffsetAndMetadata> partitionToOffsetAndMetadata = result.partitionsToOffsetAndMetadata().get();

assertEquals(3, partitionToOffsetAndMetadata.size());
assertEquals(4, partitionToOffsetAndMetadata.size());
assertEquals(10, partitionToOffsetAndMetadata.get(myTopicPartition0).offset());
assertEquals(0, partitionToOffsetAndMetadata.get(myTopicPartition1).offset());
assertEquals(20, partitionToOffsetAndMetadata.get(myTopicPartition2).offset());
assertTrue(partitionToOffsetAndMetadata.containsKey(myTopicPartition3));
assertNull(partitionToOffsetAndMetadata.get(myTopicPartition3));
}
}

Expand Down

0 comments on commit 7a2a198

Please sign in to comment.