Skip to content

Commit

Permalink
Make SASL configs optional, fix update cluster bug
Browse files Browse the repository at this point in the history
  • Loading branch information
patelh committed Jul 28, 2018
1 parent 1e5f3c7 commit f779d1f
Show file tree
Hide file tree
Showing 21 changed files with 143 additions and 92 deletions.
16 changes: 8 additions & 8 deletions app/controllers/Cluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
case Success(_) => Valid
}
}
val validateSASLmechanism: Constraint[String] = Constraint("validate SASL mechanism") { string =>
val validateSASLmechanism: Constraint[Option[String]] = Constraint("validate SASL mechanism") { stringOption =>
Try {
SASLmechanism(string)
stringOption.foreach(SASLmechanism.from)
} match {
case Failure(t) => Invalid(t.getMessage)
case Success(_) => Valid
Expand Down Expand Up @@ -122,7 +122,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
, "saslMechanism" -> nonEmptyText.verifying(validateSASLmechanism)
, "saslMechanism" -> optional(text).verifying(validateSASLmechanism)
, "jaasConfig" -> optional(text)
)(ClusterConfig.apply)(ClusterConfig.customUnapply)
)
Expand Down Expand Up @@ -166,7 +166,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
)(ClusterTuning.apply)(ClusterTuning.unapply)
)
, "securityProtocol" -> nonEmptyText.verifying(validateSecurityProtocol)
, "saslMechanism" -> nonEmptyText.verifying(validateSASLmechanism)
, "saslMechanism" -> optional(text).verifying(validateSASLmechanism)
, "jaasConfig" -> optional(text)
)(ClusterOperation.apply)(ClusterOperation.customUnapply)
)
Expand All @@ -188,7 +188,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
,false
,Option(defaultTuning)
,PLAINTEXT
,PLAIN
,None
,None
)
}
Expand Down Expand Up @@ -238,7 +238,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
cc.displaySizeEnabled,
cc.tuning,
cc.securityProtocol.stringId,
cc.saslMechanism.stringId,
cc.saslMechanism.map(_.stringId),
cc.jaasConfig
))
}))
Expand All @@ -263,7 +263,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
clusterConfig.filterConsumers,
clusterConfig.tuning,
clusterConfig.securityProtocol.stringId,
clusterConfig.saslMechanism.stringId,
clusterConfig.saslMechanism.map(_.stringId),
clusterConfig.jaasConfig,
clusterConfig.logkafkaEnabled,
clusterConfig.activeOffsetCacheEnabled,
Expand Down Expand Up @@ -334,7 +334,7 @@ class Cluster (val messagesApi: MessagesApi, val kafkaManagerContext: KafkaManag
clusterOperation.clusterConfig.filterConsumers,
clusterOperation.clusterConfig.tuning,
clusterOperation.clusterConfig.securityProtocol.stringId,
clusterOperation.clusterConfig.saslMechanism.stringId,
clusterOperation.clusterConfig.saslMechanism.map(_.stringId),
clusterOperation.clusterConfig.jaasConfig,
clusterOperation.clusterConfig.logkafkaEnabled,
clusterOperation.clusterConfig.activeOffsetCacheEnabled,
Expand Down
4 changes: 2 additions & 2 deletions app/kafka/manager/KafkaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
securityProtocol: String,
saslMechanism: String,
saslMechanism: Option[String],
jaasConfig: Option[String],
logkafkaEnabled: Boolean = false,
activeOffsetCacheEnabled: Boolean = false,
Expand Down Expand Up @@ -302,7 +302,7 @@ class KafkaManager(akkaConfig: Config) extends Logging {
filterConsumers: Boolean,
tuning: Option[ClusterTuning],
securityProtocol: String,
saslMechanism: String,
saslMechanism: Option[String],
jaasConfig: Option[String],
logkafkaEnabled: Boolean = false,
activeOffsetCacheEnabled: Boolean = false,
Expand Down
3 changes: 3 additions & 0 deletions app/kafka/manager/actor/KafkaManagerActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,16 @@ class KafkaManagerActor(kafkaManagerConfig: KafkaManagerActorConfig)
&& newConfig.jmxEnabled == currentConfig.jmxEnabled
&& newConfig.jmxUser == currentConfig.jmxUser
&& newConfig.jmxPass == currentConfig.jmxPass
&& newConfig.jmxSsl == currentConfig.jmxSsl
&& newConfig.logkafkaEnabled == currentConfig.logkafkaEnabled
&& newConfig.pollConsumers == currentConfig.pollConsumers
&& newConfig.filterConsumers == currentConfig.filterConsumers
&& newConfig.activeOffsetCacheEnabled == currentConfig.activeOffsetCacheEnabled
&& newConfig.displaySizeEnabled == currentConfig.displaySizeEnabled
&& newConfig.tuning == currentConfig.tuning
&& newConfig.securityProtocol == currentConfig.securityProtocol
&& newConfig.saslMechanism == currentConfig.saslMechanism
&& newConfig.jaasConfig == currentConfig.jaasConfig
) {
//nothing changed
false
Expand Down
31 changes: 21 additions & 10 deletions app/kafka/manager/actor/cluster/KafkaStateActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,15 @@ case class KafkaAdminClientActor(config: KafkaAdminClientActorConfig) extends Ba
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, config.clusterContext.config.securityProtocol.stringId)
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerListStr)
props.put(SaslConfigs.SASL_MECHANISM, config.clusterContext.config.saslMechanism.stringId)
if(!config.clusterContext.config.jaasConfig.isEmpty){
if(config.clusterContext.config.saslMechanism.nonEmpty){
props.put(SaslConfigs.SASL_MECHANISM, config.clusterContext.config.saslMechanism.get.stringId)
log.info(s"SASL Mechanism =${config.clusterContext.config.saslMechanism.get}")
}
if(config.clusterContext.config.jaasConfig.nonEmpty){
props.put(SaslConfigs.SASL_JAAS_CONFIG, config.clusterContext.config.jaasConfig.get)
log.info(s"SASL JAAS config=${config.clusterContext.config.jaasConfig}")
log.info(s"SASL JAAS config=${config.clusterContext.config.jaasConfig.get}")
}
log.info(s"Creating admin client with security protocol=${config.clusterContext.config.securityProtocol.stringId} , mechanism=${config.clusterContext.config.saslMechanism.stringId}, broker list : $brokerListStr")
log.info(s"Creating admin client with security protocol=${config.clusterContext.config.securityProtocol.stringId} , broker list : $brokerListStr")
AdminClient.create(props)
}

Expand Down Expand Up @@ -263,9 +266,13 @@ case class KafkaManagedOffsetCache(clusterContext: ClusterContext
cp => props.putAll(cp)
}
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, clusterContext.config.securityProtocol.stringId)
props.put(SaslConfigs.SASL_MECHANISM, clusterContext.config.saslMechanism.stringId)
if(!clusterContext.config.jaasConfig.isEmpty){
props.put(SaslConfigs.SASL_JAAS_CONFIG, clusterContext.config.jaasConfig.get)
if(clusterContext.config.saslMechanism.nonEmpty){
props.put(SaslConfigs.SASL_MECHANISM, clusterContext.config.saslMechanism.get.stringId)
info(s"SASL Mechanism =${clusterContext.config.saslMechanism.get}")
if(clusterContext.config.jaasConfig.nonEmpty){
props.put(SaslConfigs.SASL_JAAS_CONFIG, clusterContext.config.jaasConfig.get)
info(s"SASL JAAS config=${clusterContext.config.jaasConfig.get}")
}
}
Try {
info("Constructing new kafka consumer client using these properties: ")
Expand Down Expand Up @@ -1470,9 +1477,13 @@ class KafkaStateActor(config: KafkaStateActorConfig) extends BaseClusterQueryCom
consumerProperties.put(BOOTSTRAP_SERVERS_CONFIG, s"${broker.host}:$port")
consumerProperties.put(SECURITY_PROTOCOL_CONFIG, securityProtocol.stringId)
// Use secure endpoint if available
if(!kaConfig.clusterContext.config.jaasConfig.isEmpty){
consumerProperties.put(SaslConfigs.SASL_MECHANISM, kaConfig.clusterContext.config.saslMechanism.stringId)
consumerProperties.put(SaslConfigs.SASL_JAAS_CONFIG, kaConfig.clusterContext.config.jaasConfig.get);
if(kaConfig.clusterContext.config.saslMechanism.nonEmpty){
consumerProperties.put(SaslConfigs.SASL_MECHANISM, kaConfig.clusterContext.config.saslMechanism.get.stringId)
log.info(s"SASL Mechanism =${kaConfig.clusterContext.config.saslMechanism.get}")
}
if(kaConfig.clusterContext.config.jaasConfig.nonEmpty){
consumerProperties.put(SaslConfigs.SASL_JAAS_CONFIG, kaConfig.clusterContext.config.jaasConfig.get)
log.info(s"SASL JAAS config=${kaConfig.clusterContext.config.jaasConfig.get}")
}
var kafkaConsumer: Option[KafkaConsumer[Any, Any]] = None
try {
Expand Down
42 changes: 23 additions & 19 deletions app/kafka/manager/model/model.scala
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ object ClusterConfig {
, displaySizeEnabled: Boolean = false
, tuning: Option[ClusterTuning]
, securityProtocol: String
, saslMechanism: String
, saslMechanism: Option[String]
, jaasConfig: Option[String]
) : ClusterConfig = {
val kafkaVersion = KafkaVersion(version)
Expand All @@ -192,17 +192,17 @@ object ClusterConfig {
, displaySizeEnabled
, tuning
, SecurityProtocol(securityProtocol)
, SASLmechanism(saslMechanism)
, saslMechanism.flatMap(SASLmechanism.from)
, jaasConfig
)
}

def customUnapply(cc: ClusterConfig) : Option[(
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, String, Option[String])] = {
String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Some((
cc.name, cc.version.toString, cc.curatorConfig.zkConnect, cc.curatorConfig.zkMaxRetry,
cc.jmxEnabled, cc.jmxUser, cc.jmxPass, cc.jmxSsl, cc.pollConsumers, cc.filterConsumers,
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId, cc.saslMechanism.stringId, cc.jaasConfig
cc.logkafkaEnabled, cc.activeOffsetCacheEnabled, cc.displaySizeEnabled, cc.tuning, cc.securityProtocol.stringId, cc.saslMechanism.map(_.stringId), cc.jaasConfig
)
)
}
Expand Down Expand Up @@ -246,7 +246,7 @@ object ClusterConfig {
:: ("displaySizeEnabled" -> toJSON(config.displaySizeEnabled))
:: ("tuning" -> toJSON(config.tuning))
:: ("securityProtocol" -> toJSON(config.securityProtocol.stringId))
:: ("saslMechanism" -> toJSON(config.saslMechanism.stringId))
:: ("saslMechanism" -> toJSON(config.saslMechanism.map(_.stringId)))
:: ("jaasConfig" -> toJSON(config.jaasConfig))
:: Nil)
compact(render(json)).getBytes(StandardCharsets.UTF_8)
Expand All @@ -273,8 +273,8 @@ object ClusterConfig {
val clusterTuning = fieldExtended[Option[ClusterTuning]]("tuning")(json)
val securityProtocolString = fieldExtended[String]("securityProtocol")(json)
val securityProtocol = securityProtocolString.map(SecurityProtocol.apply).getOrElse(PLAINTEXT)
val saslMechanismString = fieldExtended[String]("saslMechanism")(json)
val saslMechanism = saslMechanismString.map(SASLmechanism.apply).getOrElse(PLAIN)
val saslMechanismString = fieldExtended[Option[String]]("saslMechanism")(json)
val saslMechanism = saslMechanismString.map(_.flatMap(SASLmechanism.from))
val jaasConfig = fieldExtended[Option[String]]("jaasConfig")(json)

ClusterConfig.apply(
Expand All @@ -292,7 +292,7 @@ object ClusterConfig {
displaySizeEnabled.getOrElse(false),
clusterTuning.getOrElse(None),
securityProtocol,
saslMechanism,
saslMechanism.getOrElse(None),
jaasConfig.getOrElse(None)
)
}
Expand Down Expand Up @@ -424,7 +424,7 @@ case class ClusterConfig (name: String
, displaySizeEnabled: Boolean
, tuning: Option[ClusterTuning]
, securityProtocol: SecurityProtocol
, saslMechanism: SASLmechanism
, saslMechanism: Option[SASLmechanism]
, jaasConfig: Option[String]
)

Expand Down Expand Up @@ -463,29 +463,33 @@ object SecurityProtocol {
sealed trait SASLmechanism {
def stringId: String
}
case object PLAIN extends SASLmechanism {
case object SASL_MECHANISM_PLAIN extends SASLmechanism {
val stringId = "PLAIN"
}

case object GSSAPI extends SASLmechanism {
case object SASL_MECHANISM_GSSAPI extends SASLmechanism {
val stringId = "GSSAPI"
}

case object SCRAM256 extends SASLmechanism {
case object SASL_MECHANISM_SCRAM256 extends SASLmechanism {
val stringId = "SCRAM-SHA-256"
}
case object SCRAM512 extends SASLmechanism {
case object SASL_MECHANISM_SCRAM512 extends SASLmechanism {
val stringId = "SCRAM-SHA-512"
}

object SASLmechanism {
private[this] val typesMap: Map[String, SASLmechanism] = Map(
PLAIN.stringId -> PLAIN
, GSSAPI.stringId -> GSSAPI
, SCRAM256.stringId -> SCRAM256
, SCRAM512.stringId -> SCRAM512
SASL_MECHANISM_PLAIN.stringId -> SASL_MECHANISM_PLAIN
, SASL_MECHANISM_GSSAPI.stringId -> SASL_MECHANISM_GSSAPI
, SASL_MECHANISM_SCRAM256.stringId -> SASL_MECHANISM_SCRAM256
, SASL_MECHANISM_SCRAM512.stringId -> SASL_MECHANISM_SCRAM512
)

val formSelectList : IndexedSeq[(String,String)] = typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId))
def apply(s: String) : SASLmechanism = typesMap(s.toUpperCase)
val formSelectList : IndexedSeq[(String,String)] = IndexedSeq(("DEFAULT", "DEFAULT")) ++ typesMap.toIndexedSeq.map(t => (t._1,t._2.stringId))
private def apply(s: String) : SASLmechanism = typesMap(s.toUpperCase)
def from(s: String) : Option[SASLmechanism] = s.toUpperCase match {
case "DEFAULT" => None
case other => Option(apply(other))
}
}
6 changes: 3 additions & 3 deletions app/models/form/ClusterOperation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,20 @@ object ClusterOperation {
, displaySizeEnabled: Boolean
, tuning: Option[ClusterTuning]
, securityProtocol: String
, saslMechanism: String
, saslMechanism: Option[String]
, jaasConfig: Option[String]
): ClusterOperation = {
ClusterOperation(operation,ClusterConfig(name, version, zkHosts, zkMaxRetry, jmxEnabled, jmxUser, jmxPass, jmxSsl,
pollConsumers, filterConsumers, logkafkaEnabled, activeOffsetCacheEnabled, displaySizeEnabled, tuning, securityProtocol, saslMechanism, jaasConfig))
}

def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, String, Option[String])] = {
def customUnapply(co: ClusterOperation) : Option[(String, String, String, String, Int, Boolean, Option[String], Option[String], Boolean, Boolean, Boolean, Boolean, Boolean, Boolean, Option[ClusterTuning], String, Option[String], Option[String])] = {
Option((co.op.toString, co.clusterConfig.name, co.clusterConfig.version.toString,
co.clusterConfig.curatorConfig.zkConnect, co.clusterConfig.curatorConfig.zkMaxRetry,
co.clusterConfig.jmxEnabled, co.clusterConfig.jmxUser, co.clusterConfig.jmxPass, co.clusterConfig.jmxSsl,
co.clusterConfig.pollConsumers, co.clusterConfig.filterConsumers, co.clusterConfig.logkafkaEnabled,
co.clusterConfig.activeOffsetCacheEnabled, co.clusterConfig.displaySizeEnabled, co.clusterConfig.tuning, co.clusterConfig.securityProtocol.stringId,
co.clusterConfig.saslMechanism.stringId,
co.clusterConfig.saslMechanism.map(_.stringId),
co.clusterConfig.jaasConfig))
}
}
Expand Down
4 changes: 2 additions & 2 deletions app/views/cluster/addCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@
@b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize")
@b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays")
@b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" )
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config")
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism (only applies to SASL based security)" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config (only applies to SASL based security)")
@b3.submit('class -> "submit-button btn btn-primary"){ Save }
<a href="@routes.Application.index()" class="cancel-button btn btn-default" role="button">Cancel</a>
</fieldset>
Expand Down
3 changes: 0 additions & 3 deletions app/views/cluster/clusterList.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
<input type="hidden" name="kafkaVersion" value="@cluster.version.toString">
<input type="hidden" name="zkHosts" value="@cluster.curatorConfig.zkConnect">
<input type="hidden" name="securityProtocol" value="@cluster.securityProtocol.stringId">
<input type="hidden" name="saslMechanism" value="@cluster.saslMechanism.stringId">
<input type="hidden" name="operation" value="Disable">
@b3.submit('class -> "btn btn-warning ops-button"){ Disable }
}
Expand All @@ -40,7 +39,6 @@
<input type="hidden" name="kafkaVersion" value="@cluster.version.toString">
<input type="hidden" name="zkHosts" value="@cluster.curatorConfig.zkConnect">
<input type="hidden" name="securityProtocol" value="@cluster.securityProtocol.stringId">
<input type="hidden" name="saslMechanism" value="@cluster.saslMechanism.stringId">
<input type="hidden" name="operation" value="Enable">
@b3.submit('class -> "btn btn-success ops-button"){ Enable }
}
Expand All @@ -49,7 +47,6 @@
<input type="hidden" name="kafkaVersion" value="@cluster.version.toString">
<input type="hidden" name="zkHosts" value="@cluster.curatorConfig.zkConnect">
<input type="hidden" name="securityProtocol" value="@cluster.securityProtocol.stringId">
<input type="hidden" name="saslMechanism" value="@cluster.saslMechanism.stringId">
<input type="hidden" name="operation" value="Delete">
@b3.submit('class -> "btn btn-danger ops-button"){ Delete }
}
Expand Down
4 changes: 2 additions & 2 deletions app/views/cluster/updateCluster.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
@b3.number(form("tuning.kafkaManagedOffsetGroupCacheSize"), '_label -> "kafkaManagedOffsetGroupCacheSize")
@b3.number(form("tuning.kafkaManagedOffsetGroupExpireDays"), '_label -> "kafkaManagedOffsetGroupExpireDays")
@b3.select( form("securityProtocol"), options = kafka.manager.model.SecurityProtocol.formSelectList, '_label -> "Security Protocol" )
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config")
@b3.select( form("saslMechanism"), options = kafka.manager.model.SASLmechanism.formSelectList, '_label -> "SASL Mechanism (only applies to SASL based security)" )
@b3.text(form("jaasConfig"), '_label -> "SASL JAAS Config (only applies to SASL based security)")
@b3.submit('class -> "submit-button btn btn-primary btn"){ Save }
<a href="@routes.Application.index()" class="cancel-button btn btn-default" role="button">Cancel</a>
</fieldset>
Expand Down
Loading

0 comments on commit f779d1f

Please sign in to comment.