Skip to content

Commit

Permalink
Fix case of missing properties
Browse files Browse the repository at this point in the history
  • Loading branch information
Surabhi Pandit committed Jun 27, 2018
1 parent c341897 commit 55c2ab8
Showing 1 changed file with 13 additions and 4 deletions.
17 changes: 13 additions & 4 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import org.apache.kafka.clients.consumer.ConsumerConfig._
import org.apache.kafka.common.serialization.Serdes

/**
* @author hiral
Expand Down Expand Up @@ -1390,11 +1391,10 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
return
}

var tpList = broker2TopicPartitionMap(broker)
val tpList = broker2TopicPartitionMap(broker)
val port: Int = broker.endpoints(PLAINTEXT)
require(kaConfig.consumerProperties.isDefined, "Cannot instantiate KafkaConsumer, consumer properties missing")
kaConfig.consumerProperties.get.put(BOOTSTRAP_SERVERS_CONFIG, s"${broker.host}:$port")
val kafkaConsumer = new KafkaConsumer(kaConfig.consumerProperties.get)
val consumerProperties = kaConfig.consumerProperties.getOrElse(getDefaultConsumerProperties(s"${broker.host}:$port"))
val kafkaConsumer = new KafkaConsumer(consumerProperties)
try {
val request = tpList.toList.map( f => new TopicPartition(f._1.topic, f._1.partition)).toList
var tpOffsetMap = kafkaConsumer.endOffsets(request)
Expand Down Expand Up @@ -1435,6 +1435,15 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
def close(): Unit = {
this.shutdown = true
}

def getDefaultConsumerProperties(bootstrapServers: String): Properties = {
val properties = new Properties()
properties.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
properties.put(GROUP_ID_CONFIG, getClass.getCanonicalName)
properties.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
properties
}
}
}

0 comments on commit 55c2ab8

Please sign in to comment.