Skip to content

Commit

Permalink
KAFKA-7920; Do not permit zstd produce requests until IBP is updated …
Browse files Browse the repository at this point in the history
…to 2.1 (apache#6256)

Fail produce requests using zstd until the inter.broker.protocol.version is large enough that replicas are ensured to support it. Otherwise, followers receive the `UNSUPPORTED_COMPRESSION_TYPE` when fetching zstd data and ISRs shrink.

Reviewers: Jason Gustafson <[email protected]>
  • Loading branch information
dongjinleekr authored and Jason Gustafson committed Feb 20, 2019
1 parent 4cb8f56 commit 71a7219
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ public static void validateRecords(short version, MemoryRecords records) {
throw new InvalidRecordException("Produce requests with version " + version + " are only allowed to " +
"contain record batches with magic version 2");
if (version < 7 && entry.compressionType() == CompressionType.ZSTD) {
throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are note allowed to " +
throw new UnsupportedCompressionTypeException("Produce requests with version " + version + " are not allowed to " +
"use ZStandard compression");
}

Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.Optional
import java.util.concurrent.locks.ReentrantReadWriteLock

import com.yammer.metrics.core.Gauge
import kafka.api.{LeaderAndIsr, Request}
import kafka.api.{ApiVersion, LeaderAndIsr, Request}
import kafka.common.UnexpectedAppendOffsetException
import kafka.controller.KafkaController
import kafka.log._
Expand Down Expand Up @@ -49,6 +49,7 @@ object Partition {
new Partition(topicPartition,
isOffline = false,
replicaLagTimeMaxMs = replicaManager.config.replicaLagTimeMaxMs,
interBrokerProtocolVersion = replicaManager.config.interBrokerProtocolVersion,
localBrokerId = replicaManager.config.brokerId,
time = time,
replicaManager = replicaManager,
Expand All @@ -63,6 +64,7 @@ object Partition {
class Partition(val topicPartition: TopicPartition,
val isOffline: Boolean,
private val replicaLagTimeMaxMs: Long,
private val interBrokerProtocolVersion: ApiVersion,
private val localBrokerId: Int,
private val time: Time,
private val replicaManager: ReplicaManager,
Expand Down Expand Up @@ -748,7 +750,9 @@ class Partition(val topicPartition: TopicPartition,
s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
}

val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient,
interBrokerProtocolVersion)

// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderReplica))

Expand Down
16 changes: 10 additions & 6 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, Time
import java.util.regex.Pattern

import com.yammer.metrics.core.Gauge
import kafka.api.KAFKA_0_10_0_IV0
import kafka.api.{ApiVersion, KAFKA_0_10_0_IV0}
import kafka.common.{LogSegmentOffsetOverflowException, LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
import kafka.metrics.KafkaMetricsGroup
Expand Down Expand Up @@ -787,11 +787,13 @@ class Log(@volatile var dir: File,
*
* @param records The records to append
* @param isFromClient Whether or not this append is from a producer
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true): LogAppendInfo = {
append(records, isFromClient, assignOffsets = true, leaderEpoch)
def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true,
interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = {
append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch)
}

/**
Expand All @@ -802,7 +804,7 @@ class Log(@volatile var dir: File,
* @return Information about the appended messages including the first and last offset.
*/
def appendAsFollower(records: MemoryRecords): LogAppendInfo = {
append(records, isFromClient = false, assignOffsets = false, leaderEpoch = -1)
append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1)
}

/**
Expand All @@ -813,14 +815,15 @@ class Log(@volatile var dir: File,
*
* @param records The log records to append
* @param isFromClient Whether or not this append is from a producer
* @param interBrokerProtocolVersion Inter-broker message protocol version
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
* @param leaderEpoch The partition's leader epoch which will be applied to messages when offsets are assigned on the leader
* @throws KafkaStorageException If the append fails due to an I/O error.
* @throws OffsetsOutOfOrderException If out of order offsets found in 'records'
* @throws UnexpectedAppendOffsetException If the first or last offset in append is less than next offset
* @return Information about the appended messages including the first and last offset.
*/
private def append(records: MemoryRecords, isFromClient: Boolean, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient)

Expand Down Expand Up @@ -851,7 +854,8 @@ class Log(@volatile var dir: File,
config.messageTimestampType,
config.messageTimestampDifferenceMaxMs,
leaderEpoch,
isFromClient)
isFromClient,
interBrokerProtocolVersion)
} catch {
case e: IOException =>
throw new KafkaException(s"Error validating messages while appending to log $name", e)
Expand Down
18 changes: 11 additions & 7 deletions core/src/main/scala/kafka/log/LogValidator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package kafka.log

import java.nio.ByteBuffer

import kafka.api.{ApiVersion, KAFKA_2_1_IV0}
import kafka.common.LongRef
import kafka.message.{CompressionCodec, NoCompressionCodec}
import kafka.message.{CompressionCodec, NoCompressionCodec, ZStdCompressionCodec}
import kafka.utils.Logging
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.errors.{InvalidTimestampException, UnsupportedCompressionTypeException, UnsupportedForMessageFormatException}
import org.apache.kafka.common.record.{AbstractRecords, CompressionType, InvalidRecordException, MemoryRecords, Record, RecordBatch, RecordConversionStats, TimestampType}
import org.apache.kafka.common.utils.Time

Expand Down Expand Up @@ -56,7 +57,8 @@ private[kafka] object LogValidator extends Logging {
timestampType: TimestampType,
timestampDiffMaxMs: Long,
partitionLeaderEpoch: Int,
isFromClient: Boolean): ValidationAndOffsetAssignResult = {
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
if (!records.hasMatchingMagic(magic))
Expand All @@ -68,7 +70,7 @@ private[kafka] object LogValidator extends Logging {
partitionLeaderEpoch, isFromClient, magic)
} else {
validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, time, now, sourceCodec, targetCodec, compactedTopic,
magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient)
magic, timestampType, timestampDiffMaxMs, partitionLeaderEpoch, isFromClient, interBrokerProtocolVersion)
}
}

Expand Down Expand Up @@ -245,8 +247,8 @@ private[kafka] object LogValidator extends Logging {
timestampType: TimestampType,
timestampDiffMaxMs: Long,
partitionLeaderEpoch: Int,
isFromClient: Boolean): ValidationAndOffsetAssignResult = {

isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion): ValidationAndOffsetAssignResult = {
// No in place assignment situation 1 and 2
var inPlaceAssignment = sourceCodec == targetCodec && toMagic > RecordBatch.MAGIC_VALUE_V0

Expand All @@ -265,10 +267,12 @@ private[kafka] object LogValidator extends Logging {
inPlaceAssignment = true

for (record <- batch.asScala) {
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)
if (sourceCodec != NoCompressionCodec && record.isCompressed)
throw new InvalidRecordException("Compressed outer record should not have an inner record with a " +
s"compression attribute set: $record")
if (targetCodec == ZStdCompressionCodec && interBrokerProtocolVersion < KAFKA_2_1_IV0)
throw new UnsupportedCompressionTypeException("Produce requests to inter.broker.protocol.version < 2.1 broker " + "are not allowed to use ZStandard compression")
validateRecord(batch, record, now, timestampType, timestampDiffMaxMs, compactedTopic)

uncompressedSizeInBytes += record.sizeInBytes()
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && toMagic > RecordBatch.MAGIC_VALUE_V0) {
Expand Down
9 changes: 7 additions & 2 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package kafka.server
import java.util
import java.util.{Collections, Properties}

import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1}
import kafka.api.{ApiVersion, ApiVersionValidator, KAFKA_0_10_0_IV1, KAFKA_2_1_IV0}
import kafka.cluster.EndPoint
import kafka.coordinator.group.OffsetConfig
import kafka.coordinator.transaction.{TransactionLog, TransactionStateManager}
import kafka.message.{BrokerCompressionCodec, CompressionCodec}
import kafka.message.{BrokerCompressionCodec, CompressionCodec, ZStdCompressionCodec}
import kafka.utils.CoreUtils
import kafka.utils.Implicits._
import org.apache.kafka.clients.CommonClientConfigs
Expand Down Expand Up @@ -1486,6 +1486,11 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
s"is set to version ${ApiVersion.minSupportedFor(recordVersion).shortVersion} or higher")

if (offsetsTopicCompressionCodec == ZStdCompressionCodec)
require(interBrokerProtocolVersion.recordVersion.value >= KAFKA_2_1_IV0.recordVersion.value,
"offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " +
s"is set to version ${KAFKA_2_1_IV0.shortVersion} or higher")

val interBrokerUsesSasl = interBrokerSecurityProtocol == SecurityProtocol.SASL_PLAINTEXT || interBrokerSecurityProtocol == SecurityProtocol.SASL_SSL
require(!interBrokerUsesSasl || saslInterBrokerHandshakeRequestEnable || saslMechanismInterBrokerProtocol == SaslConfigs.GSSAPI_MECHANISM,
s"Only GSSAPI mechanism is supported for inter-broker communication with SASL when inter.broker.protocol.version is set to $interBrokerProtocolVersionString")
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ object ReplicaManager {
val OfflinePartition: Partition = new Partition(new TopicPartition("", -1),
isOffline = true,
replicaLagTimeMaxMs = 0L,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = -1,
time = null,
replicaManager = null,
Expand Down
5 changes: 4 additions & 1 deletion core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.{Optional, Properties}
import java.util.concurrent.{CountDownLatch, Executors, TimeUnit, TimeoutException}
import java.util.concurrent.atomic.AtomicBoolean

import kafka.api.Request
import kafka.api.{ApiVersion, Request}
import kafka.common.UnexpectedAppendOffsetException
import kafka.log.{Defaults => _, _}
import kafka.server._
Expand Down Expand Up @@ -590,6 +590,7 @@ class PartitionTest {
val partition = new Partition(topicPartition,
isOffline = false,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
replicaManager,
Expand Down Expand Up @@ -685,6 +686,7 @@ class PartitionTest {
val partition = new Partition(topicPartition,
isOffline = false,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
replicaManager,
Expand Down Expand Up @@ -897,6 +899,7 @@ class PartitionTest {
val partition = new Partition(tp,
isOffline = false,
replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
interBrokerProtocolVersion = ApiVersion.latestVersion,
localBrokerId = brokerId,
time,
replicaManager,
Expand Down
Loading

0 comments on commit 71a7219

Please sign in to comment.