Skip to content

Commit

Permalink
Bootstrapping an empty topic for zookeeper producer
Browse files Browse the repository at this point in the history
  • Loading branch information
nehanarkhede committed May 18, 2011
1 parent 9facced commit 9014610
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
2 changes: 2 additions & 0 deletions core/src/main/scala/kafka/producer/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ class Producer[K,V](config: ProducerConfig,
def send(producerData: ProducerData[K,V]*) {
val producerPoolRequests = producerData.map { pd =>
// find the number of broker partitions registered for this topic
logger.debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
val numBrokerPartitions = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
logger.debug("Broker partitions registered for topic: " + pd.getTopic + " = " + numBrokerPartitions)
val totalNumPartitions = numBrokerPartitions.length
var brokerIdPartition: Partition = null
var partition: Int = 0
Expand Down
35 changes: 25 additions & 10 deletions core/src/main/scala/kafka/producer/ZKBrokerPartitionInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,20 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
* @param topic the topic for which this information is to be returned
* @return a sequence of (brokerId, numPartitions)
*/
def getBrokerPartitionInfo(topic: String): SortedSet[Partition] = {
def getBrokerPartitionInfo(topic: String): scala.collection.immutable.SortedSet[Partition] = {
val brokerPartitions = topicBrokerPartitions.get(topic)
var numBrokerPartitions = SortedSet.empty[Partition]
brokerPartitions match {
case Some(bp) => numBrokerPartitions = TreeSet[Partition]() ++ brokerPartitions.get
case Some(bp) =>
bp.size match {
case 0 => // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
numBrokerPartitions = bootstrapWithExistingBrokers(topic)
topicBrokerPartitions += (topic -> numBrokerPartitions)
case _ => numBrokerPartitions = TreeSet[Partition]() ++ bp
}
case None => // no brokers currently registered for this topic. Find the list of all brokers in the cluster.
val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.brokerIdsPath)
// since we do not have the in formation about number of partitions on these brokers, just assume single partition
// i.e. pick partition 0 from each broker as a candidate
numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0))
// add the rest of the available brokers with default 1 partition for this topic, so all of the brokers
// participate in hosting this topic.
numBrokerPartitions = bootstrapWithExistingBrokers(topic)
topicBrokerPartitions += (topic -> numBrokerPartitions)
logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString)
}
numBrokerPartitions
}
Expand All @@ -120,7 +120,22 @@ private[producer] class ZKBrokerPartitionInfo(config: ZKConfig, producerCbk: (In
def getAllBrokerInfo: Map[Int, Broker] = allBrokers

def close = zkClient.close


private def bootstrapWithExistingBrokers(topic: String): scala.collection.immutable.SortedSet[Partition] = {
logger.debug("Currently, no brokers are registered under topic: " + topic)
logger.debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default " +
"number of partitions = 1")
val allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.brokerIdsPath)
logger.trace("List of all brokers currently registered in zookeeper = " + allBrokersIds.toString)
// since we do not have the in formation about number of partitions on these brokers, just assume single partition
// i.e. pick partition 0 from each broker as a candidate
val numBrokerPartitions = TreeSet[Partition]() ++ allBrokersIds.map(b => new Partition(b.toInt, 0))
// add the rest of the available brokers with default 1 partition for this topic, so all of the brokers
// participate in hosting this topic.
logger.debug("Adding following broker id, partition id for NEW topic: " + topic + "=" + numBrokerPartitions.toString)
numBrokerPartitions
}

/**
* Generate a sequence of (brokerId, numPartitions) for all topics
* registered in zookeeper
Expand Down

0 comments on commit 9014610

Please sign in to comment.