Skip to content

Commit

Permalink
KAFKA-15853: Move PasswordEncoder to server-common (apache#15246)
Browse files Browse the repository at this point in the history
Reviewers: Luke Chen <[email protected]>, Omnia Ibrahim <[email protected]>
  • Loading branch information
mimaison authored Jan 30, 2024
1 parent 4c6f975 commit 3e9ef70
Show file tree
Hide file tree
Showing 27 changed files with 660 additions and 417 deletions.
8 changes: 8 additions & 0 deletions checkstyle/import-control-server-common.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@
<allow pkg="org.apache.kafka.common.protocol" />
</subpackage>

<subpackage name="security">
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.config.types" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="javax.crypto" />
<allow pkg="javax.crypto.spec" />
</subpackage>

<subpackage name="server">
<allow pkg="org.apache.kafka.common" />
<allow pkg="joptsimple" />
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control-server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

<!-- utilities and reusable classes from server-common -->
<allow pkg="org.apache.kafka.queue" />
<allow pkg="org.apache.kafka.security" />
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
Expand Down
25 changes: 13 additions & 12 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit
import java.util.{Collections, Properties}
import joptsimple._
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.server.{DynamicBrokerConfig, DynamicConfig, KafkaConfig}
import kafka.utils.{Exit, Logging, PasswordEncoder}
import kafka.server.DynamicConfig.QuotaConfigs
import kafka.utils.Implicits._
import kafka.utils.{Exit, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
Expand All @@ -35,7 +35,8 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity,
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism}
import org.apache.kafka.common.utils.{Sanitizer, Time, Utils}
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, Defaults}
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
import org.apache.kafka.security.{PasswordEncoder, PasswordEncoderConfigs}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig
import org.apache.zookeeper.client.ZKClientConfig
Expand Down Expand Up @@ -211,19 +212,19 @@ object ConfigCommand extends Logging {
}

private[admin] def createPasswordEncoder(encoderConfigs: Map[String, String]): PasswordEncoder = {
encoderConfigs.get(KafkaConfig.PasswordEncoderSecretProp)
val encoderSecret = encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderSecretProp,
encoderConfigs.get(PasswordEncoderConfigs.SECRET)
val encoderSecret = encoderConfigs.getOrElse(PasswordEncoderConfigs.SECRET,
throw new IllegalArgumentException("Password encoder secret not specified"))
PasswordEncoder.encrypting(new Password(encoderSecret),
None,
encoderConfigs.getOrElse(KafkaConfig.PasswordEncoderCipherAlgorithmProp, Defaults.PASSWORD_ENCODER_CIPHER_ALGORITHM),
encoderConfigs.get(KafkaConfig.PasswordEncoderKeyLengthProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_KEY_LENGTH),
encoderConfigs.get(KafkaConfig.PasswordEncoderIterationsProp).map(_.toInt).getOrElse(Defaults.PASSWORD_ENCODER_ITERATIONS))
null,
encoderConfigs.getOrElse(PasswordEncoderConfigs.CIPHER_ALGORITHM, PasswordEncoderConfigs.DEFAULT_CIPHER_ALGORITHM),
encoderConfigs.get(PasswordEncoderConfigs.KEY_LENGTH).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_KEY_LENGTH),
encoderConfigs.get(PasswordEncoderConfigs.ITERATIONS).map(_.toInt).getOrElse(PasswordEncoderConfigs.DEFAULT_ITERATIONS))
}

/**
* Pre-process broker configs provided to convert them to persistent format.
* Password configs are encrypted using the secret `KafkaConfig.PasswordEncoderSecretProp`.
* Password configs are encrypted using the secret `PasswordEncoderConfigs.SECRET`.
* The secret is removed from `configsToBeAdded` and will not be persisted in ZooKeeper.
*/
private def preProcessBrokerConfigs(configsToBeAdded: Properties, perBrokerConfig: Boolean): Unit = {
Expand All @@ -238,8 +239,8 @@ object ConfigCommand extends Logging {
DynamicBrokerConfig.validateConfigs(configsToBeAdded, perBrokerConfig)
val passwordConfigs = configsToBeAdded.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
if (passwordConfigs.nonEmpty) {
require(passwordEncoderConfigs.containsKey(KafkaConfig.PasswordEncoderSecretProp),
s"${KafkaConfig.PasswordEncoderSecretProp} must be specified to update $passwordConfigs." +
require(passwordEncoderConfigs.containsKey(PasswordEncoderConfigs.SECRET),
s"${PasswordEncoderConfigs.SECRET} must be specified to update $passwordConfigs." +
" Other password encoder configs like cipher algorithm and iterations may also be specified" +
" to override the default encoding parameters. Password encoder configs will not be persisted" +
" in ZooKeeper."
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{AclPublisher, ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{KafkaZkClient, ZkMigrationClient}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
Expand All @@ -44,6 +44,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator}
import org.apache.kafka.metadata.publisher.FeaturesPublisher
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.NodeToControllerChannelManager
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
Expand Down Expand Up @@ -285,7 +286,7 @@ class ControllerServer(
config.passwordEncoderCipherAlgorithm,
config.passwordEncoderKeyLength,
config.passwordEncoderIterations)
case None => PasswordEncoder.noop()
case None => PasswordEncoder.NOOP
}
val migrationClient = ZkMigrationClient(zkClient, zkConfigEncoder)
val propagator: LegacyPropagator = new MigrationPropagator(config.nodeId, config)
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
Expand All @@ -35,6 +35,7 @@ import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigEntityName, ConfigType, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
Expand Down Expand Up @@ -223,7 +224,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
} else {
Some(PasswordEncoder.noop())
Some(PasswordEncoder.NOOP)
}

private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
Expand Down
22 changes: 12 additions & 10 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor
import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.security.PasswordEncoderConfigs
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{MetadataVersion, MetadataVersionValidator}
import org.apache.kafka.server.common.MetadataVersion._
import org.apache.kafka.server.config.{Defaults, ServerTopicConfigSynonyms}
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.Csv
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.log.LogConfig.MessageFormatVersion
import org.apache.zookeeper.client.ZKClientConfig

import scala.annotation.nowarn
Expand Down Expand Up @@ -435,12 +437,12 @@ object KafkaConfig {
val DelegationTokenExpiryCheckIntervalMsProp = "delegation.token.expiry.check.interval.ms"

/** ********* Password encryption configuration for dynamic configs *********/
val PasswordEncoderSecretProp = "password.encoder.secret"
val PasswordEncoderOldSecretProp = "password.encoder.old.secret"
val PasswordEncoderKeyFactoryAlgorithmProp = "password.encoder.keyfactory.algorithm"
val PasswordEncoderCipherAlgorithmProp = "password.encoder.cipher.algorithm"
val PasswordEncoderKeyLengthProp = "password.encoder.key.length"
val PasswordEncoderIterationsProp = "password.encoder.iterations"
val PasswordEncoderSecretProp = PasswordEncoderConfigs.SECRET
val PasswordEncoderOldSecretProp = PasswordEncoderConfigs.OLD_SECRET
val PasswordEncoderKeyFactoryAlgorithmProp = PasswordEncoderConfigs.KEYFACTORY_ALGORITHM
val PasswordEncoderCipherAlgorithmProp = PasswordEncoderConfigs.CIPHER_ALGORITHM
val PasswordEncoderKeyLengthProp = PasswordEncoderConfigs.KEY_LENGTH
val PasswordEncoderIterationsProp = PasswordEncoderConfigs.ITERATIONS

/** Internal Configurations **/
val UnstableApiVersionsEnableProp = "unstable.api.versions.enable"
Expand Down Expand Up @@ -1873,7 +1875,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
def passwordEncoderSecret = Option(getPassword(KafkaConfig.PasswordEncoderSecretProp))
def passwordEncoderOldSecret = Option(getPassword(KafkaConfig.PasswordEncoderOldSecretProp))
def passwordEncoderCipherAlgorithm = getString(KafkaConfig.PasswordEncoderCipherAlgorithmProp)
def passwordEncoderKeyFactoryAlgorithm = Option(getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp))
def passwordEncoderKeyFactoryAlgorithm = getString(KafkaConfig.PasswordEncoderKeyFactoryAlgorithmProp)
def passwordEncoderKeyLength = getInt(KafkaConfig.PasswordEncoderKeyLengthProp)
def passwordEncoderIterations = getInt(KafkaConfig.PasswordEncoderIterationsProp)

Expand Down Expand Up @@ -1935,7 +1937,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami

private def getMap(propName: String, propValue: String): Map[String, String] = {
try {
CoreUtils.parseCsvMap(propValue)
Csv.parseCsvMap(propValue).asScala
} catch {
case e: Exception => throw new IllegalArgumentException("Error parsing configuration property '%s': %s".format(propName, e.getMessage))
}
Expand Down
19 changes: 1 addition & 18 deletions core/src/main/scala/kafka/utils/CoreUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import com.typesafe.scalalogging.Logger

import javax.management._
import scala.collection._
import scala.collection.{Seq, mutable}
import scala.collection.Seq
import kafka.cluster.EndPoint
import org.apache.commons.validator.routines.InetAddressValidator
import org.apache.kafka.common.network.ListenerName
Expand Down Expand Up @@ -109,23 +109,6 @@ object CoreUtils {
}
}

/**
* This method gets comma separated values which contains key,value pairs and returns a map of
* key value pairs. the format of allCSVal is key1:val1, key2:val2 ....
* Also supports strings with multiple ":" such as IpV6 addresses, taking the last occurrence
* of the ":" in the pair as the split, eg a:b:c:val1, d:e:f:val2 => a:b:c -> val1, d:e:f -> val2
*/
def parseCsvMap(str: String): Map[String, String] = {
val map = new mutable.HashMap[String, String]
if ("".equals(str))
return map
val keyVals = str.split("\\s*,\\s*").map(s => {
val lio = s.lastIndexOf(":")
(s.substring(0,lio).trim, s.substring(lio + 1).trim)
})
keyVals.toMap
}

/**
* Parse a comma separated string into a sequence of strings.
* Whitespace surrounding the comma will be removed.
Expand Down
Loading

0 comments on commit 3e9ef70

Please sign in to comment.