Skip to content

Commit

Permalink
Remove default value for user/pass
Browse files Browse the repository at this point in the history
  • Loading branch information
patelh committed Dec 9, 2015
1 parent ddde5bd commit f510b6f
Show file tree
Hide file tree
Showing 15 changed files with 45 additions and 45 deletions.
24 changes: 12 additions & 12 deletions app/kafka/manager/KafkaJMX.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@ import scala.math
object KafkaJMX {

private[this] lazy val logger = LoggerFactory.getLogger(this.getClass)


private[this] val defaultJmxConnectorProperties : java.util.Map[String, _] = {
import scala.collection.JavaConverters._
Map(
"jmx.remote.x.request.waiting.timeout" -> "3000",
"jmx.remote.x.notification.fetch.timeout" -> "3000",
"sun.rmi.transport.connectionTimeout" -> "3000",
"sun.rmi.transport.tcp.handshakeTimeout" -> "3000",
"sun.rmi.transport.tcp.responseTimeout" -> "3000"
).asJava
}

def doWithConnection[T](jmxHost: String, jmxPort: Int, jmxUser: Option[String], jmxPass: Option[String])(fn: MBeanServerConnection => T) : Try[T] = {
val urlString = s"service:jmx:rmi:///jndi/rmi://$jmxHost:$jmxPort/jmxrmi"
val url = new JMXServiceURL(urlString)
try {
require(jmxPort > 0, "No jmx port but jmx polling enabled!")
val defaultJmxConnectorProperties : java.util.Map[String, _] = {
import scala.collection.JavaConverters._
Map(
"jmx.remote.x.request.waiting.timeout" -> "3000",
"jmx.remote.x.notification.fetch.timeout" -> "3000",
"sun.rmi.transport.connectionTimeout" -> "3000",
"sun.rmi.transport.tcp.handshakeTimeout" -> "3000",
"sun.rmi.transport.tcp.responseTimeout" -> "3000"
//"com.sun.management.jmxremote.ssl" -> "false"
).asJava
}
val jmxConnectorProperties : java.util.Map[String, _] = {
val withCreds: Option[java.util.Map[String, _]] = for {
user <- jmxUser
Expand Down
8 changes: 4 additions & 4 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ class KafkaManager(akkaConfig: Config)
version: String,
zkHosts: String,
jmxEnabled: Boolean,
jmxUser: Option[String] = Some(""),
jmxPass: Option[String] = Some(""),
jmxUser: Option[String],
jmxPass: Option[String],
pollConsumers: Boolean,
filterConsumers: Boolean,
logkafkaEnabled: Boolean = false,
Expand Down Expand Up @@ -221,8 +221,8 @@ class KafkaManager(akkaConfig: Config)
version: String,
zkHosts: String,
jmxEnabled: Boolean,
jmxUser: Option[String] = Some(""),
jmxPass: Option[String] = Some(""),
jmxUser: Option[String],
jmxPass: Option[String],
pollConsumers: Boolean,
filterConsumers: Boolean,
logkafkaEnabled: Boolean = false,
Expand Down
8 changes: 4 additions & 4 deletions app/kafka/manager/KafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ object ClusterConfig {
zkHosts: String,
zkMaxRetry: Int = 100,
jmxEnabled: Boolean,
jmxUser: Option[String] = Some(""),
jmxPass: Option[String] = Some(""),
jmxUser: Option[String],
jmxPass: Option[String],
pollConsumers: Boolean,
filterConsumers: Boolean,
logkafkaEnabled: Boolean = false,
Expand Down Expand Up @@ -187,8 +187,8 @@ object ClusterConfig {
curatorConfig,
enabled,version,
jmxEnabled.getOrElse(false),
jmxUser.getOrElse(Some("")),
jmxPass.getOrElse(Some("")),
jmxUser.getOrElse(None),
jmxPass.getOrElse(None),
pollConsumers.getOrElse(false),
filterConsumers.getOrElse(true),
logkafkaEnabled.getOrElse(false),
Expand Down
2 changes: 1 addition & 1 deletion test/controller/api/TestKafkaStateCheck.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TestKafkaStateCheck extends CuratorAwareTest with KafkaServerInTest {
}

private[this] def createCluster() = {
val future = KafkaManagerContext.getKafkaManager.addCluster(testClusterName,"0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val future = KafkaManagerContext.getKafkaManager.addCluster(testClusterName,"0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
val result = Await.result(future,duration)
result.toEither.left.foreach(apiError => sys.error(apiError.msg))
Thread.sleep(3000)
Expand Down
4 changes: 2 additions & 2 deletions test/kafka/manager/TestBrokerViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ class TestBrokerViewCacheActor extends KafkaServerInTest {
private[this] implicit val timeout: Timeout = 10.seconds

private[this] var brokerViewCacheActor : Option[ActorRef] = None
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true)
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig)

override protected def beforeAll(): Unit = {
super.beforeAll()
val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
val clusterContext = ClusterContext(ClusterFeatures.from(clusterConfig), clusterConfig)
val ksConfig = KafkaStateActorConfig(sharedCurator, clusterContext, LongRunningPoolConfig(2,100), 5, 10000)
val props = Props(classOf[KafkaStateActor],ksConfig)
Expand Down
2 changes: 1 addition & 1 deletion test/kafka/manager/TestClusterManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TestClusterManagerActor extends CuratorAwareTest {

override protected def beforeAll(): Unit = {
super.beforeAll()
val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true)
val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None)
val curatorConfig = CuratorConfig(testServer.getConnectString)
val config = ClusterManagerActorConfig("pinned-dispatcher","/kafka-manager/clusters/dev",curatorConfig,clusterConfig,FiniteDuration(1,SECONDS))
val props = Props(classOf[ClusterManagerActor],config)
Expand Down
8 changes: 4 additions & 4 deletions test/kafka/manager/TestKafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class TestKafkaManager extends CuratorAwareTest {
}

test("add cluster") {
val future = kafkaManager.addCluster("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val future = kafkaManager.addCluster("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
val result = Await.result(future,duration)
assert(result.isRight === true)
Thread.sleep(2000)
Expand Down Expand Up @@ -332,7 +332,7 @@ class TestKafkaManager extends CuratorAwareTest {
}

test("update cluster zkhost") {
val future = kafkaManager.updateCluster("dev","0.8.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val future = kafkaManager.updateCluster("dev","0.8.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
val result = Await.result(future,duration)
assert(result.isRight === true)

Expand Down Expand Up @@ -365,7 +365,7 @@ class TestKafkaManager extends CuratorAwareTest {
}

test("update cluster version") {
val future = kafkaManager.updateCluster("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val future = kafkaManager.updateCluster("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
val result = Await.result(future,duration)
assert(result.isRight === true)

Expand All @@ -386,7 +386,7 @@ class TestKafkaManager extends CuratorAwareTest {
}

test("update cluster logkafka enabled and activeOffsetCache enabled") {
val future = kafkaManager.updateCluster("dev","0.8.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true)
val future = kafkaManager.updateCluster("dev","0.8.2.0",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None)
val result = Await.result(future,duration)
assert(result.isRight === true)

Expand Down
10 changes: 5 additions & 5 deletions test/kafka/manager/TestKafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class TestKafkaManagerActor extends CuratorAwareTest {
}

test("add cluster") {
val cc = ClusterConfig("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val cc = ClusterConfig("dev","0.8.1.1",testServer.getConnectString, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
withKafkaManagerActor(KMAddCluster(cc)) { result: KMCommandResult =>
result.result.get
Thread.sleep(1000)
Expand All @@ -74,7 +74,7 @@ class TestKafkaManagerActor extends CuratorAwareTest {
}

test("update cluster zkhost") {
val cc2 = ClusterConfig("dev","0.8.1.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val cc2 = ClusterConfig("dev","0.8.1.1",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand Down Expand Up @@ -106,7 +106,7 @@ class TestKafkaManagerActor extends CuratorAwareTest {
}

test("update cluster version") {
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand All @@ -133,7 +133,7 @@ class TestKafkaManagerActor extends CuratorAwareTest {
println(result)
result.msg.contains("dev")
}
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
withKafkaManagerActor(KMAddCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(1000)
Expand All @@ -150,7 +150,7 @@ class TestKafkaManagerActor extends CuratorAwareTest {
}

test("update cluster logkafka enabled") {
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true)
val cc2 = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None)
withKafkaManagerActor(KMUpdateCluster(cc2)) { result: KMCommandResult =>
result.result.get
Thread.sleep(3000)
Expand Down
2 changes: 1 addition & 1 deletion test/kafka/manager/TestKafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TestKafkaStateActor extends KafkaServerInTest {
override val kafkaServerZkPath = broker.getZookeeperConnectionString
private[this] var kafkaStateActor : Option[ActorRef] = None
private[this] implicit val timeout: Timeout = 10.seconds
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true)
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig)

override protected def beforeAll(): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion test/kafka/manager/TestLogkafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TestLogkafkaStateActor extends KafkaServerInTest {
override val kafkaServerZkPath = broker.getZookeeperConnectionString
private[this] var logkafkaStateActor : Option[ActorRef] = None
private[this] implicit val timeout: Timeout = 10.seconds
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true)
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig)

override protected def beforeAll(): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions test/kafka/manager/TestLogkafkaViewCacheActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ class TestLogkafkaViewCacheActor extends KafkaServerInTest {
private[this] implicit val timeout: Timeout = 10.seconds

private[this] var logkafkaViewCacheActor : Option[ActorRef] = None
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true)
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig)

override protected def beforeAll(): Unit = {
super.beforeAll()
val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true)
val clusterConfig = ClusterConfig("dev","0.8.2.0",kafkaServerZkPath, jmxEnabled = false, pollConsumers = true, filterConsumers = true, logkafkaEnabled = true, jmxUser = None, jmxPass = None)
val clusterContext = ClusterContext(ClusterFeatures.from(clusterConfig), clusterConfig)
val props = Props(classOf[KafkaStateActor],sharedCurator, defaultClusterContext)

Expand Down
10 changes: 5 additions & 5 deletions test/kafka/manager/utils/TestClusterConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ class TestClusterConfig extends FunSuite with Matchers {

test("invalid name") {
intercept[IllegalArgumentException] {
ClusterConfig("qa!","0.8.1.1","localhost",jmxEnabled = false, pollConsumers = true, filterConsumers = true)
ClusterConfig("qa!","0.8.1.1","localhost",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
}
}

test("invalid kafka version") {
intercept[IllegalArgumentException] {
ClusterConfig("qa","0.8.1","localhost:2181",jmxEnabled = false, pollConsumers = true, filterConsumers = true)
ClusterConfig("qa","0.8.1","localhost:2181",jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
}
}

test("serialize and deserialize") {
val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true)
val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = true, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
val serialize: String = ClusterConfig.serialize(cc)
val deserialize = ClusterConfig.deserialize(serialize)
assert(deserialize.isSuccess === true)
Expand All @@ -41,7 +41,7 @@ class TestClusterConfig extends FunSuite with Matchers {
}

test("deserialize without version and jmxEnabled") {
val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true)
val cc = ClusterConfig("qa","0.8.2.0","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
val serialize: String = ClusterConfig.serialize(cc)
val noverison = serialize.replace(""","kafkaVersion":"0.8.2.0"""","")
assert(!noverison.contains("kafkaVersion"))
Expand All @@ -51,7 +51,7 @@ class TestClusterConfig extends FunSuite with Matchers {
}

test("deserialize from 0.8.2-beta as 0.8.2.0") {
val cc = ClusterConfig("qa","0.8.2-beta","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true)
val cc = ClusterConfig("qa","0.8.2-beta","localhost:2181", jmxEnabled = false, pollConsumers = true, filterConsumers = true, activeOffsetCacheEnabled = true, jmxUser = None, jmxPass = None)
val serialize: String = ClusterConfig.serialize(cc)
val noverison = serialize.replace(""","kafkaVersion":"0.8.2.0"""",""","kafkaVersion":"0.8.2-beta"""")
val deserialize = ClusterConfig.deserialize(noverison)
Expand Down
2 changes: 1 addition & 1 deletion test/kafka/manager/utils/TestCreateLogkafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class TestCreateLogkafka extends CuratorAwareTest {
import logkafka82.LogkafkaConfigErrors._

private[this] val adminUtils = new LogkafkaAdminUtils(Kafka_0_8_2_0)
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true)
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig)
private[this] val createLogkafkaLogkafkaId = "km-unit-test-logkafka-logkafka_id"
private[this] val createLogkafkaLogPath = "/km-unit-test-logkafka-logpath"
Expand Down
2 changes: 1 addition & 1 deletion test/kafka/manager/utils/TestCreateTopic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import scala.concurrent.Future
class TestCreateTopic extends CuratorAwareTest {

private[this] val adminUtils = new AdminUtils(Kafka_0_8_2_0)
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true)
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig)

test("create topic with empty name") {
Expand Down
2 changes: 1 addition & 1 deletion test/kafka/manager/utils/TestReassignPartitions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class TestReassignPartitions extends CuratorAwareTest {

private[this] val brokerList = IndexedSeq(1,2,3)

private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true)
private[this] val defaultClusterConfig = ClusterConfig("test","0.8.2.0","localhost:2818",100,false, pollConsumers = true, filterConsumers = true, jmxUser = None, jmxPass = None)
private[this] val defaultClusterContext = ClusterContext(ClusterFeatures.from(defaultClusterConfig), defaultClusterConfig)

private[this] def mytopic1 : TopicIdentity = getTopicIdentity("mytopic1")
Expand Down

0 comments on commit f510b6f

Please sign in to comment.