Skip to content

Commit

Permalink
KAFKA-1086 Improve GetOffsetShell to find metadata automatically; rev…
Browse files Browse the repository at this point in the history
…iewed by Jun Rao
  • Loading branch information
nehanarkhede committed Oct 15, 2013
1 parent e351039 commit a160f10
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 17 deletions.
63 changes: 47 additions & 16 deletions core/src/main/scala/kafka/tools/GetOffsetShell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ import joptsimple._
import java.net.URI
import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
import kafka.common.TopicAndPartition
import kafka.client.ClientUtils


object GetOffsetShell {

def main(args: Array[String]): Unit = {
val parser = new OptionParser
val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
.withRequiredArg
.describedAs("kafka://hostname:port")
.describedAs("hostname:port,...,hostname:port")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val partitionOpt = parser.accepts("partition", "partition id")
val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids. If not specified, it will find offsets for all partitions")
.withRequiredArg
.describedAs("partition id")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
.describedAs("partition ids")
.ofType(classOf[String])
.defaultsTo("")
val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
.withRequiredArg
.describedAs("timestamp/-1(latest)/-2(earliest)")
Expand All @@ -51,28 +52,58 @@ object GetOffsetShell {
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)

val options = parser.parse(args : _*)

for(arg <- List(urlOpt, topicOpt, timeOpt)) {
for(arg <- List(brokerListOpt, topicOpt, timeOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}

val url = new URI(options.valueOf(urlOpt))
val clientId = "GetOffsetShell"
val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
val topic = options.valueOf(topicOpt)
val partition = options.valueOf(partitionOpt).intValue
var partitionList = options.valueOf(partitionOpt)
var time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
println("get " + offsets.length + " results")
for (offset <- offsets)
println(offset)
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()

val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the topic does not exist, run ").format(topic) +
"kafka-list-topic.sh to verify")
System.exit(1)
}
val partitions =
if(partitionList == "") {
topicsMetadata.head.partitionsMetadata.map(_.partitionId)
} else {
partitionList.mkString(",").map(_.toInt)
}
partitions.foreach { partitionId =>
val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId == partitionId)
partitionMetadataOpt match {
case Some(metadata) =>
metadata.leader match {
case Some(leader) =>
val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId)
val topicAndPartition = TopicAndPartition(topic, partitionId)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets)))
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets

println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
case None => System.err.println("Error: partition %d does not have a leader. Skip getting offsets".format(partitionId))
}
case None => System.err.println("Error: partition %d does not exist".format(partitionId))
}
}
}
}
2 changes: 1 addition & 1 deletion kafka-patch-review.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def main():

comment="Created reviewboard "
if not opt.reviewboard:
print 'Created a new reviewboard ',rb_url
print 'Created a new reviewboard ',rb_url,' against branch ',opt.branch
else:
print 'Updated reviewboard',opt.reviewboard
comment="Updated reviewboard "
Expand Down

0 comments on commit a160f10

Please sign in to comment.