Skip to content

Commit

Permalink
Add backfill of member metadata using admin client
Browse files Browse the repository at this point in the history
  • Loading branch information
patelh committed Feb 16, 2016
1 parent 1ba2323 commit f6d12c2
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 41 deletions.
1 change: 1 addition & 0 deletions app/kafka/manager/actor/cluster/ClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class ClusterManagerActor(cmConfig: ClusterManagerActorConfig)

private[this] val ksConfig = KafkaStateActorConfig(
sharedClusterCurator,
cmConfig.pinnedDispatcherName,
clusterContext,
LongRunningPoolConfig(Runtime.getRuntime.availableProcessors(), 1000),
cmConfig.partitionOffsetCacheTimeoutSecs,
Expand Down
219 changes: 187 additions & 32 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ package kafka.manager.actor.cluster
import java.io.Closeable
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.{ConcurrentLinkedDeque, TimeUnit}

import akka.actor.{Props, ActorRef, ActorContext, ActorPath}
import akka.pattern._
import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import grizzled.slf4j.Logging
import kafka.admin.AdminClient
import kafka.api.{OffsetRequest, PartitionOffsetRequestInfo}
import kafka.common.{OffsetAndMetadata, TopicAndPartition}
import kafka.consumer._
import kafka.coordinator.{GroupMetadataKey, OffsetKey, GroupTopicPartition}
import kafka.coordinator.{GroupSummary, GroupMetadataKey, OffsetKey}
import kafka.manager._
import kafka.manager.base.cluster.BaseClusterQueryCommandActor
import kafka.manager.base.cluster.{BaseClusterQueryActor, BaseClusterQueryCommandActor}
import kafka.manager.base.{LongRunningPoolActor, LongRunningPoolConfig}
import kafka.manager.features.{ClusterFeatures, KMPollConsumersFeature, KMDeleteTopicFeature}
import kafka.manager.model.ActorModel._
Expand All @@ -44,34 +46,125 @@ import kafka.manager.utils._

import scala.collection.JavaConverters._

object KafkaManagedOffsetCache {
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0)
val ConsumerOffsetTopic = "__consumer_offsets"
case class KafkaAdminClientActorConfig(clusterContext: ClusterContext, longRunningPoolConfig: LongRunningPoolConfig, kafkaStateActorPath: ActorPath)
case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends BaseClusterQueryActor with LongRunningPoolActor {

private[this] var adminClientOption : Option[AdminClient] = None

protected implicit val clusterContext: ClusterContext = config.clusterContext
override protected def longRunningPoolConfig: LongRunningPoolConfig = config.longRunningPoolConfig

override protected def longRunningQueueFull(): Unit = {
log.error("Long running pool queue full, skipping!")
}

@scala.throws[Exception](classOf[Exception])
override def preStart() = {
super.preStart()
log.info(config.toString)
}

@scala.throws[Exception](classOf[Exception])
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
log.error(reason, "Restarting due to [{}] when processing [{}]",
reason.getMessage, message.getOrElse(""))
super.preRestart(reason, message)
}

implicit class TryLogErrorHelperp[T](t: Try[T]) extends Logging {
def logError(s: => String) : Try[T] = {
t match {
case Failure(e) =>
error(s, e)
case _ => //do nothing
@scala.throws[Exception](classOf[Exception])
override def postStop(): Unit = {
log.info("Closing admin client...")
Try(adminClientOption.foreach(_.close()))
log.info("Stopped actor %s".format(self.path))
}

private def createAdminClient(bl: BrokerList): AdminClient = {
val targetBrokers : IndexedSeq[BrokerIdentity] = bl.list
var brokerListStr: String = targetBrokers.map(b => s"${b.host}:${b.port}").mkString(",")

log.info(s"Creating admin client with broker list : $brokerListStr")
AdminClient.createSimplePlaintext(brokerListStr)
}

override def processQueryRequest(request: QueryRequest): Unit = {
if(adminClientOption.isEmpty) {
context.actorSelection(config.kafkaStateActorPath).tell(KSGetBrokers, self)
log.error(s"AdminClient not initialized yet, cannot process request : $request")
} else {
implicit val ec = longRunningExecutionContext
request match {
case KAGetGroupSummary(groupList: Seq[String], enqueue: java.util.Queue[(String, kafka.coordinator.GroupSummary)]) =>
Future {
groupList.foreach {
group =>
try {
adminClientOption.foreach {
client =>
val summary = client.describeGroup(group)
if(summary != null) {
enqueue.offer(group -> summary)
}
}
} catch {
case e: Exception =>
log.error(e, s"Failed to get group summary with admin client : $group")
log.error(e, s"Forcing new admin client initialization...")
Try { adminClientOption.foreach(_.close()) }
adminClientOption = None
}
}
}
case any: Any => log.warning("kac : processQueryRequest : Received unknown message: {}", any.toString)
}
t
}

}

override def processActorResponse(response: ActorResponse): Unit = {
response match {
case bl: BrokerList =>
if(bl.list.nonEmpty) {
Try {
adminClientOption = Option(createAdminClient(bl))
}.logError(s"Failed to create admin client with brokerlist : $bl")
}
case any: Any => log.warning("kac : processActorResponse : Received unknown message: {}", any.toString)
}
}
}

class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath) {
def enqueueGroupMetadata(groupList: Seq[String], queue: java.util.Queue[(String, GroupSummary)]) : Unit = {
Try {
context.actorSelection(adminClientActorPath).tell(KAGetGroupSummary(groupList, queue), ActorRef.noSender)
}
}
}


object KafkaManagedOffsetCache {
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0)
val ConsumerOffsetTopic = "__consumer_offsets"

def isSupported(version: KafkaVersion) : Boolean = {
supportedVersions(version)
}
}

case class KafkaManagedOffsetCache(clusterContext: ClusterContext) extends Runnable with Closeable with Logging {
case class KafkaManagedOffsetCache(clusterContext: ClusterContext
, adminClient: KafkaAdminClient
, groupMemberMetadataCheckMillis: Int = 30000) extends Runnable with Closeable with Logging {
val groupTopicPartitionOffsetMap = new TrieMap[(String, String, Int), OffsetAndMetadata]()
val topicConsumerSetMap = new TrieMap[String, mutable.Set[String]]()
val consumerTopicSetMap = new TrieMap[String, mutable.Set[String]]()
val groupTopicPartitionMemberMap = new TrieMap[(String, String, Int), MemberMetadata]()

private[this] val queue = new ConcurrentLinkedDeque[(String, GroupSummary)]()

@volatile
private[this] var lastUpdateTimeMillis : Long = Long.MaxValue
private[this] var lastUpdateTimeMillis : Long = 0

private[this] var lastGroupMemberMetadataCheckMillis : Long = System.currentTimeMillis()

import KafkaManagedOffsetCache._
import kafka.manager.utils.zero90.GroupMetadataManager._
Expand All @@ -91,12 +184,49 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext) extends Runna
Consumer.create(new ConsumerConfig(props))
}

private[this] def performGroupMetadataCheck() : Unit = {
val currentMillis = System.currentTimeMillis()
if((lastGroupMemberMetadataCheckMillis + groupMemberMetadataCheckMillis) < currentMillis) {
val diff = groupTopicPartitionOffsetMap.keySet.filterNot(groupTopicPartitionMemberMap.contains)
if(diff.nonEmpty) {
val groupsToBackfill = diff.map(_._1).toSeq
info(s"Backfilling group metadata for $groupsToBackfill")
adminClient.enqueueGroupMetadata(groupsToBackfill, queue)
}
lastGroupMemberMetadataCheckMillis = System.currentTimeMillis()
lastUpdateTimeMillis = System.currentTimeMillis()
}
}

private[this] def dequeueAndProcessBackFill(): Unit = {
while(!queue.isEmpty) {
val (groupId, summary) = queue.pop()
summary.members.foreach {
member =>
try {
val mm = MemberMetadata.from(groupId, summary, member)
mm.assignment.foreach {
case (topic, part) =>
val k = (groupId, topic, part)
//only add it if it hasn't already been added through a new update via the offset topic
if(!groupTopicPartitionMemberMap.contains(k)) {
groupTopicPartitionMemberMap += (groupId, topic, part) -> mm
}
}
} catch {
case e: Exception =>
error(s"Failed to get member metadata from group summary and member summary : $groupId : $member", e)
}
}
}
}

override def run(): Unit = {
for {
consumer <- Try(createKafkaConsumerConnector()).logError(s"Failed to create consumer for offset topic for cluster ${clusterContext.config.name}")
} {
try {
logger.info(s"Consumer created for kafka offset topic consumption for cluster ${clusterContext.config.name}")
info(s"Consumer created for kafka offset topic consumption for cluster ${clusterContext.config.name}")
for {
stream <- Try {
val offsetStream: KafkaStream[Array[Byte], Array[Byte]] = consumer
Expand All @@ -108,6 +238,14 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext) extends Runna
} {
while (!shutdown) {
try {
try {
dequeueAndProcessBackFill()
performGroupMetadataCheck()
} catch {
case e: Exception =>
error("Failed to backfill group metadata", e)
}

val messageAndMetadata: MessageAndMetadata[Array[Byte], Array[Byte]] = iterator.next()
readMessageKey(ByteBuffer.wrap(messageAndMetadata.key())) match {
case OffsetKey(version, key) =>
Expand Down Expand Up @@ -154,6 +292,7 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext) extends Runna
}
}
} finally {
info(s"Shutting down consumer for $ConsumerOffsetTopic on cluster ${clusterContext.config.name}")
Try(consumer.shutdown())
}
}
Expand Down Expand Up @@ -194,6 +333,8 @@ object ConsumerInstanceSubscriptions extends Logging {

trait OffsetCache extends Logging {

def kafkaAdminClient: KafkaAdminClient

def clusterContext: ClusterContext

def getKafkaVersion: KafkaVersion
Expand Down Expand Up @@ -309,7 +450,7 @@ trait OffsetCache extends Logging {
if(kafkaManagedOffsetCache.isEmpty) {
info("Starting kafka managed offset cache ...")
Try {
val of = new KafkaManagedOffsetCache(clusterContext)
val of = new KafkaManagedOffsetCache(clusterContext, kafkaAdminClient)
kafkaManagedOffsetCache = Option(of)
val t = new Thread(of, "KafkaManagedOffsetCache")
t.start()
Expand Down Expand Up @@ -442,13 +583,14 @@ trait OffsetCache extends Logging {
}

case class OffsetCacheActive(curator: CuratorFramework,
clusterContext: ClusterContext,
partitionLeaders: String => Option[List[(Int, Option[BrokerIdentity])]],
topicDescriptions: (String, Boolean) => Option[TopicDescription],
cacheTimeoutSecs: Int,
socketTimeoutMillis: Int,
kafkaVersion: KafkaVersion)
(implicit protected[this] val ec: ExecutionContext, val cf: ClusterFeatures) extends OffsetCache {
kafkaAdminClient: KafkaAdminClient,
clusterContext: ClusterContext,
partitionLeaders: String => Option[List[(Int, Option[BrokerIdentity])]],
topicDescriptions: (String, Boolean) => Option[TopicDescription],
cacheTimeoutSecs: Int,
socketTimeoutMillis: Int,
kafkaVersion: KafkaVersion)
(implicit protected[this] val ec: ExecutionContext, val cf: ClusterFeatures) extends OffsetCache {

def getKafkaVersion: KafkaVersion = kafkaVersion

Expand Down Expand Up @@ -555,13 +697,14 @@ case class OffsetCacheActive(curator: CuratorFramework,
}

case class OffsetCachePassive(curator: CuratorFramework,
clusterContext: ClusterContext,
partitionLeaders: String => Option[List[(Int, Option[BrokerIdentity])]],
topicDescriptions: (String, Boolean) => Option[TopicDescription],
cacheTimeoutSecs: Int,
socketTimeoutMillis: Int,
kafkaVersion: KafkaVersion)
(implicit protected[this] val ec: ExecutionContext, val cf: ClusterFeatures) extends OffsetCache {
kafkaAdminClient: KafkaAdminClient,
clusterContext: ClusterContext,
partitionLeaders: String => Option[List[(Int, Option[BrokerIdentity])]],
topicDescriptions: (String, Boolean) => Option[TopicDescription],
cacheTimeoutSecs: Int,
socketTimeoutMillis: Int,
kafkaVersion: KafkaVersion)
(implicit protected[this] val ec: ExecutionContext, val cf: ClusterFeatures) extends OffsetCache {

def getKafkaVersion: KafkaVersion = kafkaVersion

Expand Down Expand Up @@ -668,6 +811,7 @@ case class OffsetCachePassive(curator: CuratorFramework,
}

case class KafkaStateActorConfig(curator: CuratorFramework,
pinnedDispatcherName: String,
clusterContext: ClusterContext,
longRunningPoolConfig: LongRunningPoolConfig,
partitionOffsetCacheTimeoutSecs: Int, simpleConsumerSocketTimeoutMillis: Int)
Expand All @@ -683,6 +827,15 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
log.error("Long running pool queue full, skipping!")
}

private[this] val kaConfig = KafkaAdminClientActorConfig(
clusterContext,
LongRunningPoolConfig(Runtime.getRuntime.availableProcessors(), 1000),
self.path
)
private[this] val kaProps = Props(classOf[KafkaAdminClientActor],kaConfig)
private[this] val kafkaAdminClientActor : ActorPath = context.actorOf(kaProps.withDispatcher(config.pinnedDispatcherName),"kafka-admin-client").path
private[this] val kafkaAdminClient = new KafkaAdminClient(context, kafkaAdminClientActor)

// e.g. /brokers/topics/analytics_content/partitions/0/state
private[this] val topicsTreeCache = new TreeCache(config.curator,ZkUtils.BrokerTopicsPath)

Expand Down Expand Up @@ -772,6 +925,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
if(config.clusterContext.config.activeOffsetCacheEnabled)
new OffsetCacheActive(
config.curator,
kafkaAdminClient,
config.clusterContext,
getPartitionLeaders,
getTopicDescription,
Expand All @@ -781,6 +935,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
else
new OffsetCachePassive(
config.curator,
kafkaAdminClient,
config.clusterContext,
getPartitionLeaders,
getTopicDescription,
Expand Down Expand Up @@ -919,7 +1074,7 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
case scalaz.Success(bi) => bi
}.toIndexedSeq.sortBy(_.id)
}

private[this] def asyncPipeToSender[T](fn: => T): Unit = {
implicit val ec = longRunningExecutionContext
val result: Future[T] = Future {
Expand Down
14 changes: 14 additions & 0 deletions app/kafka/manager/actor/cluster/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,26 @@

package kafka.manager.actor

import grizzled.slf4j.Logging
import kafka.manager.features.{ClusterFeatures, ClusterFeature}

import scala.util.{Failure, Try}

/**
* Created by hiral on 12/1/15.
*/
package object cluster {
implicit class TryLogErrorHelper[T](t: Try[T]) extends Logging {
def logError(s: => String) : Try[T] = {
t match {
case Failure(e) =>
error(s, e)
case _ => //do nothing
}
t
}
}

def featureGate[T](af: ClusterFeature)(fn: => Unit)(implicit features: ClusterFeatures) : Unit = {
if(features.features(af)) {
fn
Expand Down
4 changes: 2 additions & 2 deletions app/kafka/manager/base/LongRunningPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ trait LongRunningPoolActor extends BaseActor {
super.postStop()
}

protected def longRunning[T](fn: => Future[T])(implicit ec: ExecutionContext, ct: ClassTag[T]) : Unit = {
protected def longRunning[T](fn: => Future[T])(implicit ec: ExecutionContext, ct: ClassTag[T]) : Unit = {
if(longRunningExecutor.getQueue.remainingCapacity() == 0) {
longRunningQueueFull()
} else {
fn match {
case _ if ct.runtimeClass == classOf[Unit] =>
//do nothing with unit
//do nothing with unit
case f =>
f pipeTo sender
}
Expand Down
Loading

0 comments on commit f6d12c2

Please sign in to comment.