Skip to content

Commit

Permalink
MINOR: Fix an uncompatible bug in GetOffsetShell (apache#11936)
Browse files Browse the repository at this point in the history
In KIP-815 we replaced KafkaConsumer with AdminClient in GetOffsetShell. In the previous implementation, partitions were just ignored if there is no offset for them, however, we will print -1 instead now, This PR fix this inconsistency.

Reviewers: David Jacot <[email protected]>, Luke Chen <[email protected]>
  • Loading branch information
dengziming authored Mar 31, 2022
1 parent 8965240 commit 669a490
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/tools/GetOffsetShell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import joptsimple._
import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ListTopicsOptions, OffsetSpec}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.requests.ListOffsetsRequest
import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.Utils

import java.util.Properties
Expand Down Expand Up @@ -135,7 +135,11 @@ object GetOffsetShell {
val partitionOffsets = partitionInfos.flatMap { tp =>
try {
val partitionInfo = listOffsetsResult.partitionResult(tp).get
Some((tp, partitionInfo.offset))
if (partitionInfo.offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
Some((tp, partitionInfo.offset))
} else {
None
}
} catch {
case e: ExecutionException =>
e.getCause match {
Expand Down
7 changes: 7 additions & 0 deletions core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
)
}

@Test
def testNoOffsetIfTimestampGreaterThanLatestRecord(): Unit = {
val time = (System.currentTimeMillis() * 2).toString
val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time))
assertEquals(List.empty, offsets)
}

@Test
def testTopicPartitionsArgWithInternalExcluded(): Unit = {
val offsets = executeAndParse(Array("--topic-partitions",
Expand Down

0 comments on commit 669a490

Please sign in to comment.