Skip to content

Commit

Permalink
Enable consumer drilldown without polling
Browse files Browse the repository at this point in the history
  • Loading branch information
patelh committed Dec 17, 2015
1 parent 5a0f78f commit df772e7
Showing 1 changed file with 8 additions and 11 deletions.
19 changes: 8 additions & 11 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import kafka.consumer.SimpleConsumer
import kafka.manager._
import kafka.manager.base.cluster.BaseClusterQueryCommandActor
import kafka.manager.base.{LongRunningPoolActor, LongRunningPoolConfig}
import kafka.manager.features.KMDeleteTopicFeature
import kafka.manager.features.{KMPollConsumersFeature, KMDeleteTopicFeature}
import kafka.manager.model.ActorModel._
import kafka.manager.model.{ClusterContext, KafkaVersion, Kafka_0_8_1_1}
import kafka.manager.utils.ZkUtils
Expand Down Expand Up @@ -252,7 +252,7 @@ case class OffsetCacheActive(curator: CuratorFramework,

def getSimpleConsumerSocketTimeoutMillis: Int = socketTimeoutMillis

val loadOffsets = clusterContext.config.pollConsumers
val loadOffsets = featureGateFold(KMPollConsumersFeature)(false, true)

private[this] val consumersTreeCacheListener = new TreeCacheListener {
override def childEvent(client: CuratorFramework, event: TreeCacheEvent): Unit = {
Expand Down Expand Up @@ -380,7 +380,7 @@ case class OffsetCachePassive(curator: CuratorFramework,

def getSimpleConsumerSocketTimeoutMillis: Int = socketTimeoutMillis

val loadOffsets = clusterContext.config.pollConsumers
val loadOffsets = featureGateFold(KMPollConsumersFeature)(false, true)

private[this] val consumersPathChildrenCacheListener = new PathChildrenCacheListener {
override def childEvent(client: CuratorFramework, event: PathChildrenCacheEvent): Unit = {
Expand Down Expand Up @@ -616,10 +616,9 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
log.info("Adding admin path cache listener...")
adminPathCache.getListenable.addListener(adminPathCacheListener)

if (config.clusterContext.config.pollConsumers) {
log.info("Starting offset cache...")
offsetCache.start()
}
//the offset cache does not poll on its own so it can be started safely
log.info("Starting offset cache...")
offsetCache.start()
}

@scala.throws[Exception](classOf[Exception])
Expand All @@ -634,10 +633,8 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
override def postStop(): Unit = {
log.info("Stopped actor %s".format(self.path))

if (config.clusterContext.config.pollConsumers) {
log.info("Stopping offset cache...")
Try(offsetCache.stop())
}
log.info("Stopping offset cache...")
Try(offsetCache.stop())

log.info("Removing admin path cache listener...")
Try(adminPathCache.getListenable.removeListener(adminPathCacheListener))
Expand Down

0 comments on commit df772e7

Please sign in to comment.