Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Surabhi Pandit committed Jun 21, 2018
1 parent 658f384 commit b7ce02d
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 14 deletions.
4 changes: 3 additions & 1 deletion app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class KafkaAdminClient(context: => ActorContext, adminClientActorPath: ActorPath


object KafkaManagedOffsetCache {
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0)
val supportedVersions: Set[KafkaVersion] = Set(Kafka_0_8_2_0, Kafka_0_8_2_1, Kafka_0_8_2_2, Kafka_0_9_0_0, Kafka_0_9_0_1, Kafka_0_10_0_0, Kafka_0_10_0_1, Kafka_0_10_1_0, Kafka_0_10_1_1, Kafka_0_10_2_0, Kafka_0_10_2_1, Kafka_0_11_0_0, Kafka_0_11_0_2, Kafka_1_0_0, Kafka_1_1_0)
val ConsumerOffsetTopic = "__consumer_offsets"

def isSupported(version: KafkaVersion) : Boolean = {
Expand Down Expand Up @@ -504,6 +504,8 @@ trait OffsetCache extends Logging {
t.start()
}
}
} else {
throw new IllegalArgumentException(s"Unsupported Kafka Version: ${clusterContext.config.version}")
}
}

Expand Down
5 changes: 3 additions & 2 deletions app/kafka/manager/utils/LogkafkaNewConfigs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package kafka.manager.utils

import java.util.Properties

import kafka.manager.model._
import kafka.manager.model.{Kafka_1_0_0, _}

trait LogkafkaNewConfigs {
def configNames : Set[String]
Expand All @@ -32,7 +32,8 @@ object LogkafkaNewConfigs {
Kafka_0_10_2_1 -> logkafka82.LogConfig,
Kafka_0_11_0_0 -> logkafka82.LogConfig,
Kafka_0_11_0_2 -> logkafka82.LogConfig,
Kafka_1_0_0 -> logkafka82.LogConfig
Kafka_1_0_0 -> logkafka82.LogConfig,
Kafka_1_1_0 -> logkafka82.LogConfig
)

def configNames(version: KafkaVersion) : Set[String] = {
Expand Down
16 changes: 12 additions & 4 deletions test/kafka/manager/TestKafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
hlConsumerThread = Option(new Thread() {
override def run(): Unit = {
while(!hlShutdown.get()) {
hlConsumer.map(_.read { ba =>
hlConsumer.map(_.read { ba => {
println(s"Read ba: $ba")
Option(ba).map(asString).foreach( s => println(s"hl consumer read message : $s"))
}
})
Thread.sleep(500)
}
Expand Down Expand Up @@ -94,6 +96,11 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
})
simpleProducerThread.foreach(_.start())
Thread.sleep(1000)

//val future = kafkaManager.addCluster("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManager.defaultTuning), securityProtocol="PLAINTEXT")
//val result = Await.result(future,duration)
//assert(result.isRight === true)
//Thread.sleep(2000)
}

override protected def afterAll(): Unit = {
Expand Down Expand Up @@ -206,21 +213,22 @@ class TestKafkaManager extends CuratorAwareTest with BaseTest {
}

test("get consumer list passive mode") {
//Thread.sleep(2000)
val future = kafkaManager.getConsumerListExtended("dev")
val result = Await.result(future,duration)
assert(result.isRight === true, s"Failed : ${result}")
assert(result.toOption.get.clusterContext.config.activeOffsetCacheEnabled === false, s"Failed : ${result}")
assert(result.toOption.get.list.map(_._1).contains((newConsumer.get.groupId, KafkaManagedConsumer)), s"Failed : ${result}")
assert(result.toOption.get.list.map(_._1).contains((hlConsumer.get.groupId, ZKManagedConsumer)), s"Failed : ${result}")
assert(result.toOption.get.list.map(_._1).contains((hlConsumer.get.groupId, KafkaManagedConsumer)), s"Failed : ${result}")
}

test("get consumer identity passive mode for old consumer") {
/*test("get consumer identity passive mode for old consumer") {
val future = kafkaManager.getConsumerIdentity("dev", hlConsumer.get.groupId, "ZK")
val result = Await.result(future,duration)
assert(result.isRight === true, s"Failed : ${result}")
assert(result.toOption.get.clusterContext.config.activeOffsetCacheEnabled === false, s"Failed : ${result}")
assert(result.toOption.get.topicMap.head._1 === seededTopic, s"Failed : ${result}")
}
}*/

test("get consumer identity passive mode for new consumer") {
val future = kafkaManager.getConsumerIdentity("dev", newConsumer.get.groupId, "KF")
Expand Down
1 change: 1 addition & 0 deletions test/kafka/test/KafkaTestBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class KafkaTestBroker(zookeeper: CuratorFramework, zookeeperConnectionString: St
p.setProperty("log.dirs", getLogDir)
p.setProperty("log.retention.hours", "1")
p.setProperty("offsets.topic.replication.factor", "1")
p.setProperty("delete.topic.enable", "false") // Mark the topic for deletion in zookeeper
new KafkaConfig(p)
}

Expand Down
24 changes: 17 additions & 7 deletions test/kafka/test/SeededBroker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.test.TestingServer
import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.streams.kstream.{ForeachAction, KStream}
import org.apache.kafka.streams.kstream.{ForeachAction, KStream, Printed}
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder}
import org.apache.kafka.clients.consumer.ConsumerConfig._
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, Serdes}
import org.apache.kafka.streams.StreamsConfig

import scala.util.Try

Expand All @@ -47,7 +49,7 @@ class SeededBroker(seedTopic: String, partitions: Int) {
}

private[this] val commonConsumerConfig = new Properties()
commonConsumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, s"localhost:${broker.getPort}")
commonConsumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, getBrokerConnectionString)
commonConsumerConfig.put(REQUEST_TIMEOUT_MS_CONFIG, "11000")
commonConsumerConfig.put(SESSION_TIMEOUT_MS_CONFIG, "10000")
commonConsumerConfig.put(RECEIVE_BUFFER_CONFIG, s"${64 * 1024}")
Expand Down Expand Up @@ -119,16 +121,24 @@ case class HighLevelConsumer(topic: String,
commonConsumerConfig: Properties,
readFromStartOfStream: Boolean = true) extends Logging {

commonConsumerConfig.put("application.id", "test-app")
commonConsumerConfig.put(GROUP_ID_CONFIG, groupId)
commonConsumerConfig.put(AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
commonConsumerConfig.put(AUTO_OFFSET_RESET_CONFIG, if(readFromStartOfStream) "earliest" else "latest")

commonConsumerConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, groupId)
commonConsumerConfig.put(StreamsConfig.CLIENT_ID_CONFIG, groupId)
commonConsumerConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass.getName)
commonConsumerConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass.getName)
commonConsumerConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100" )
commonConsumerConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")

info("setup:start topic=%s for bk=%s and groupId=%s".format(topic,commonConsumerConfig.getProperty(BOOTSTRAP_SERVERS_CONFIG),groupId))
val streamsBuilder = new StreamsBuilder
val kstream : KStream[Array[Byte], Array[Byte]] = streamsBuilder.stream(topic)

val kafkaStreams = new KafkaStreams(streamsBuilder.build(), commonConsumerConfig)
kafkaStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
error("Failed to initialize KafkStreams", e)
}
})

kafkaStreams.start()
info("setup:complete topic=%s for bk=%s and groupId=%s".format(topic,commonConsumerConfig.getProperty(BOOTSTRAP_SERVERS_CONFIG),groupId))

Expand Down

0 comments on commit b7ce02d

Please sign in to comment.