Skip to content

Commit ec49a60

Browse files
KAFKA-16540: enforce min.insync.replicas config invariants for ELR (apache#17952)
If ELR is enabled, we need to set a cluster-level min.insync.replicas, and remove all broker-level overrides. The reason for this is that if brokers disagree about which partitions are under min ISR, it breaks the KIP-966 replication invariants. In order to enforce this, when the eligible.leader.replicas.version feature is turned on, we automatically remove all broker-level min.insync.replicas overrides, and create the required cluster-level override if needed. Similarly, if the cluster was created with eligible.leader.replicas.version enabled, we create a similar cluster-level record. In both cases, we don't allow setting overrides for individual brokers afterwards, or removing the cluster-level override. Split ActivationRecordsGeneratorTest up into multiple test cases rather than having it be one giant test case. Fix a bug in QuorumControllerTestEnv where we would replay records manually on objects, racing with the active controller thread. Instead, we should simply ensure that the initial bootstrap records contains what we want. Reviewers: Colin P. McCabe <[email protected]>
1 parent 5efaae6 commit ec49a60

13 files changed

+477
-66
lines changed

checkstyle/suppressions.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@
305305
<suppress checks="(ParameterNumber|ClassDataAbstractionCoupling)"
306306
files="(QuorumController).java"/>
307307
<suppress checks="(CyclomaticComplexity|NPathComplexity)"
308-
files="(PartitionRegistration|PartitionChangeBuilder|ScramParser).java"/>
308+
files="(ConfigurationControlManager|PartitionRegistration|PartitionChangeBuilder|ScramParser).java"/>
309309
<suppress checks="CyclomaticComplexity"
310310
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager|KRaftMigrationDriver|ClusterControlManager|MetaPropertiesEnsemble).java"/>
311311
<suppress checks="NPathComplexity"

metadata/src/main/java/org/apache/kafka/controller/ActivationRecordsGenerator.java

+26-5
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,31 @@
1717

1818
package org.apache.kafka.controller;
1919

20+
import org.apache.kafka.common.config.TopicConfig;
2021
import org.apache.kafka.common.metadata.AbortTransactionRecord;
2122
import org.apache.kafka.common.metadata.BeginTransactionRecord;
23+
import org.apache.kafka.common.metadata.ConfigRecord;
2224
import org.apache.kafka.common.metadata.EndTransactionRecord;
2325
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
2426
import org.apache.kafka.server.common.ApiMessageAndVersion;
27+
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
2528
import org.apache.kafka.server.common.MetadataVersion;
2629

2730
import java.util.ArrayList;
2831
import java.util.List;
2932
import java.util.function.Consumer;
3033

34+
import static org.apache.kafka.common.config.ConfigResource.Type.BROKER;
35+
3136

3237
public class ActivationRecordsGenerator {
3338

3439
static ControllerResult<Void> recordsForEmptyLog(
3540
Consumer<String> activationMessageConsumer,
3641
long transactionStartOffset,
3742
BootstrapMetadata bootstrapMetadata,
38-
MetadataVersion metadataVersion
43+
MetadataVersion metadataVersion,
44+
int defaultMinInSyncReplicas
3945
) {
4046
StringBuilder logMessageBuilder = new StringBuilder("Performing controller activation. ");
4147
List<ApiMessageAndVersion> records = new ArrayList<>();
@@ -87,6 +93,15 @@ static ControllerResult<Void> recordsForEmptyLog(
8793
// initialization, etc.
8894
records.addAll(bootstrapMetadata.records());
8995

96+
// If ELR is enabled, we need to set a cluster-level min.insync.replicas.
97+
if (bootstrapMetadata.featureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME) > 0) {
98+
records.add(new ApiMessageAndVersion(new ConfigRecord().
99+
setResourceType(BROKER.id()).
100+
setResourceName("").
101+
setName(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG).
102+
setValue(Integer.toString(defaultMinInSyncReplicas)), (short) 0));
103+
}
104+
90105
activationMessageConsumer.accept(logMessageBuilder.toString().trim());
91106
if (metadataVersion.isMetadataTransactionSupported()) {
92107
records.add(new ApiMessageAndVersion(new EndTransactionRecord(), (short) 0));
@@ -148,13 +163,19 @@ static ControllerResult<Void> generate(
148163
boolean isEmpty,
149164
long transactionStartOffset,
150165
BootstrapMetadata bootstrapMetadata,
151-
MetadataVersion curMetadataVersion
166+
MetadataVersion curMetadataVersion,
167+
int defaultMinInSyncReplicas
152168
) {
153169
if (isEmpty) {
154-
return recordsForEmptyLog(activationMessageConsumer, transactionStartOffset,
155-
bootstrapMetadata, bootstrapMetadata.metadataVersion());
170+
return recordsForEmptyLog(activationMessageConsumer,
171+
transactionStartOffset,
172+
bootstrapMetadata,
173+
bootstrapMetadata.metadataVersion(),
174+
defaultMinInSyncReplicas);
156175
} else {
157-
return recordsForNonEmptyLog(activationMessageConsumer, transactionStartOffset, curMetadataVersion);
176+
return recordsForNonEmptyLog(activationMessageConsumer,
177+
transactionStartOffset,
178+
curMetadataVersion);
158179
}
159180
}
160181
}

0 commit comments

Comments
 (0)