Skip to content

Commit

Permalink
Add consumer properties in test
Browse files Browse the repository at this point in the history
  • Loading branch information
Surabhi Pandit committed Jun 25, 2018
1 parent 107f881 commit c341897
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 17 deletions.
18 changes: 10 additions & 8 deletions test/controller/api/TestKafkaStateCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,24 @@

package controller.api

import com.typesafe.config.{ ConfigFactory, Config }
import com.typesafe.config.{Config, ConfigFactory}
import controllers.KafkaManagerContext
import controllers.api.KafkaStateCheck
import features.ApplicationFeatures
import kafka.manager.utils.{ CuratorAwareTest, KafkaServerInTest }
import kafka.manager.KafkaManager
import kafka.manager.utils.{CuratorAwareTest, KafkaServerInTest}
import kafka.test.SeededBroker
import models.navigation.Menus
import org.scalatest.mock.MockitoSugar
import play.api.i18n.MessagesApi
import play.api.{ Configuration, Play }
import play.api.{Configuration, Play}
import play.api.inject.ApplicationLifecycle
import play.api.libs.json.{JsDefined, Json}
import play.api.test.Helpers._
import play.api.test.{ FakeApplication, FakeRequest }
import play.mvc.Http.Status.{ BAD_REQUEST, OK }

import play.api.test.{FakeApplication, FakeRequest}
import play.mvc.Http.Status.{BAD_REQUEST, OK}
import org.scalatest.Matchers._

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Try
Expand All @@ -47,7 +48,8 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M
Map(
"pinned-dispatcher.type" -> "PinnedDispatcher",
"pinned-dispatcher.executor" -> "thread-pool-executor",
"kafka-manager.zkhosts" -> kafkaServerZkPath
"kafka-manager.zkhosts" -> kafkaServerZkPath,
KafkaManager.ConsumerPropertiesFile -> "conf/consumer.properties"
).asJava
)
val conf = new Configuration(config)
Expand All @@ -74,7 +76,7 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest with M

private[this] def createCluster() = {
val future = kafkaManagerContext.get.getKafkaManager.addCluster(
testClusterName,"0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol="PLAINTEXT"
testClusterName,"1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(kafkaManagerContext.get.getKafkaManager.defaultTuning), securityProtocol="PLAINTEXT"
)
val result = Await.result(future,duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
Expand Down
14 changes: 7 additions & 7 deletions test/kafka/manager/TestKafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("add cluster") {
val cc = ClusterConfig("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
val cc = ClusterConfig("dev","1.1.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
withKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult =>
result.result.get
Thread.sleep(1000)
Expand All @@ -79,7 +79,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("update cluster zkhost") {
val cc2 = ClusterConfig("dev","0.8.1.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand Down Expand Up @@ -111,7 +111,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("update cluster version") {
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand All @@ -138,7 +138,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
println(result)
result.msg.contains("dev")
}
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
withKafkaManagerActor(KMAddCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(1000)
Expand All @@ -155,7 +155,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("update cluster logkafka enabled") {
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="PLAINTEXT")
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand All @@ -167,7 +167,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {

test("update cluster tuning") {
val newTuning = getClusterTuning(3, 101, 11)
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false,
val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false,
tuning = Option(newTuning), securityProtocol="PLAINTEXT"
)
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
Expand All @@ -184,7 +184,7 @@ class TestKafkaManagerActor extends CuratorAwareTest with BaseTest {
}

test("update cluster security protocol") {
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT")
val cc2 = ClusterConfig("dev","1.1.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None, jmxSsl = false, tuning = Option(defaultTuning), securityProtocol="SASL_PLAINTEXT")
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand Down
6 changes: 4 additions & 2 deletions test/kafka/manager/model/KafkaVersionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ class KafkaVersionTest extends FunSuite {
"0.10.2.1" -> Kafka_0_10_2_1,
"0.11.0.0" -> Kafka_0_11_0_0,
"0.11.0.2" -> Kafka_0_11_0_2,
"1.0.0" -> Kafka_1_0_0
"1.0.0" -> Kafka_1_0_0,
"1.1.0" -> Kafka_1_1_0
)

test("apply method: supported version.") {
Expand Down Expand Up @@ -61,7 +62,8 @@ class KafkaVersionTest extends FunSuite {
("0.10.2.1","0.10.2.1"),
("0.11.0.0","0.11.0.0"),
("0.11.0.2","0.11.0.2"),
("1.0.0","1.0.0")
("1.0.0","1.0.0"),
("1.1.0","1.1.0")
)
assertResult(expected)(KafkaVersion.formSelectList)
}
Expand Down

0 comments on commit c341897

Please sign in to comment.