Skip to content

Commit

Permalink
calculate broker/topic sizes only once per update
Browse files Browse the repository at this point in the history
  • Loading branch information
jisookim0513 committed Dec 15, 2015
1 parent 2d43791 commit 3055ebb
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 21 deletions.
32 changes: 25 additions & 7 deletions app/kafka/manager/BrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
})

case BVUpdateBrokerTopicPartitionSizes(id, logInfo) =>
// put topic sizes
for ((topic, partitions) <- logInfo) {
val tMap = brokerTopicPartitionSizes.getOrElse(topic, new mutable.HashMap[Int, mutable.Map[Int, Long]])
for ((partition, info) <- partitions; pMap = tMap.getOrElse(partition, new mutable.HashMap[Int, Long])) {
Expand All @@ -193,6 +194,23 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
brokerTopicPartitionSizes.put(topic, tMap)
}

// update broker metrics and view to reflect topics sizes it has
val metrics = brokerMetrics.get(id)
metrics foreach { case bm =>
val brokerSize = logInfo.map{case (t, p) => p.values.map(_.bytes).sum}.sum
val newBm = BrokerMetrics(
bm.bytesInPerSec,
bm.bytesOutPerSec,
bm.bytesRejectedPerSec,
bm.failedFetchRequestsPerSec,
bm.failedProduceRequestsPerSec,
bm.messagesInPerSec,
bm.oSystemMetrics,
SegmentsMetric(brokerSize)
)
brokerMetrics += (id -> newBm)
}

case any: Any => log.warning("bvca : processActorRequest : Received unknown message: {}", any)
}
}
Expand Down Expand Up @@ -237,8 +255,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
//check for 3*broker list size since we schedule 3 jmx calls for each broker
if (config.clusterContext.clusterFeatures.features(KMJMXMetricsFeature) && config.clusterContext.clusterFeatures.features(KMDisplaySizeFeature) && hasCapacityFor(3*brokerListOption.size)) {
implicit val ec = longRunningExecutionContext
updateTopicMetrics(brokerList, topicPartitionByBroker, shouldGetBrokerSize = true)
updateBrokerMetrics(brokerList, shouldGetBrokerSize = true)
updateTopicMetrics(brokerList, topicPartitionByBroker)
updateBrokerMetrics(brokerList)
updateBrokerTopicPartitionsSize(brokerList)
} else if (config.clusterContext.clusterFeatures.features(KMJMXMetricsFeature) && hasCapacityFor(2*brokerListOption.size)) {
implicit val ec = longRunningExecutionContext
Expand Down Expand Up @@ -275,8 +293,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}

private def updateTopicMetrics(brokerList: BrokerList,
topicPartitionByBroker: Map[Int, IndexedSeq[(TopicIdentity, Int, IndexedSeq[Int])]],
shouldGetBrokerSize: Boolean = false
topicPartitionByBroker: Map[Int, IndexedSeq[(TopicIdentity, Int, IndexedSeq[Int])]]
)(implicit ec: ExecutionContext): Unit = {
val brokerLookup = brokerList.list.map(bi => bi.id -> bi).toMap
topicPartitionByBroker.foreach {
Expand All @@ -293,7 +310,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
topicPartitions.map {
case (topic, id, partitions) =>
(topic.topic,
KafkaMetrics.getBrokerMetrics(config.clusterContext.config.version, mbsc, shouldGetBrokerSize, Option(topic.topic)))
KafkaMetrics.getBrokerMetrics(config.clusterContext.config.version, mbsc, None, Option(topic.topic)))
}
}
val result = tryResult match {
Expand All @@ -312,7 +329,8 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
}
}

private def updateBrokerMetrics(brokerList: BrokerList, shouldGetBrokerSize: Boolean = false)(implicit ec: ExecutionContext): Unit = {
// this only updates broker metrics except for broker size
private def updateBrokerMetrics(brokerList: BrokerList)(implicit ec: ExecutionContext): Unit = {
brokerList.list.foreach {
broker =>
longRunning {
Expand All @@ -321,7 +339,7 @@ class BrokerViewCacheActor(config: BrokerViewCacheActorConfig) extends LongRunni
config.clusterContext.config.jmxUser, config.clusterContext.config.jmxPass
) {
mbsc =>
KafkaMetrics.getBrokerMetrics(config.clusterContext.config.version, mbsc, shouldGetBrokerSize)
KafkaMetrics.getBrokerMetrics(config.clusterContext.config.version, mbsc, brokerMetrics.get(broker.id).map(_.size))
}

val result = tryResult match {
Expand Down
2 changes: 0 additions & 2 deletions app/kafka/manager/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,6 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)
private[this] val ksProps = Props(classOf[KafkaStateActor],ksConfig)
private[this] val kafkaStateActor : ActorPath = context.actorOf(ksProps.withDispatcher(cmConfig.pinnedDispatcherName),"kafka-state").path

// TODO: make this config pluggable
println(cmConfig.brokerViewThreadPoolSize)
private[this] val bvConfig = BrokerViewCacheActorConfig(
kafkaStateActor,
clusterContext,
Expand Down
15 changes: 3 additions & 12 deletions app/kafka/manager/KafkaJMX.scala
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ object KafkaMetrics {
stats.groupBy(_._1).mapValues(_.map(_._2).toMap).toMap
}

def getBrokerMetrics(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, shouldGetBrokerSize: Boolean, topic: Option[String] = None) : BrokerMetrics = {
// return broker metrics with segment metric only when it's provided. if not, it will contain segment metric with value 0L
def getBrokerMetrics(kafkaVersion: KafkaVersion, mbsc: MBeanServerConnection, segmentsMetric: Option[SegmentsMetric] = None, topic: Option[String] = None) : BrokerMetrics = {
BrokerMetrics(
KafkaMetrics.getBytesInPerSec(kafkaVersion, mbsc, topic),
KafkaMetrics.getBytesOutPerSec(kafkaVersion, mbsc, topic),
Expand All @@ -298,19 +299,9 @@ object KafkaMetrics {
KafkaMetrics.getFailedProduceRequestsPerSec(kafkaVersion, mbsc, topic),
KafkaMetrics.getMessagesInPerSec(kafkaVersion, mbsc, topic),
KafkaMetrics.getOSMetric(mbsc),
KafkaMetrics.getSegmentsMetric(mbsc, shouldGetBrokerSize)
segmentsMetric.getOrElse(SegmentsMetric(0L))
)
}

// always contains the total bytes size a broker has
def getSegmentsMetric(mbsc: MBeanServerConnection, shouldGetBrokerSize: Boolean) : SegmentsMetric = {
if (shouldGetBrokerSize) {
val segmentsInfo = getLogSegmentsInfo(mbsc)
SegmentsMetric(segmentsInfo.values.map(_.values.map(_.bytes).sum).sum)
} else {
SegmentsMetric(0L)
}
}
}

case class GaugeMetric(value: Double)
Expand Down

0 comments on commit 3055ebb

Please sign in to comment.