Skip to content

Commit

Permalink
KAFKA-13841: Fix a case where we were unable to place on fenced broke…
Browse files Browse the repository at this point in the history
…rs in KRaft mode (apache#12075)

This PR fixes a case where we were unable to place on fenced brokers In KRaft mode. Specifically,
if we had a broker registration in the metadata log, but no associated heartbeat, previously the
HeartbeatManager would not track the fenced broker. This PR fixes this by adding this logic to the
metadata log replay path in ClusterControlManager.

Reviewers: David Arthur <[email protected]>, dengziming <[email protected]>
  • Loading branch information
cmccabe authored Apr 21, 2022
1 parent f28a2ee commit d480c4a
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,22 @@ private boolean hasValidSession(BrokerHeartbeatState broker) {
}
}

/**
* Register this broker if we haven't already, and make sure its fencing state is
* correct.
*
* @param brokerId The broker ID.
* @param fenced True only if the broker is currently fenced.
*/
void register(int brokerId, boolean fenced) {
BrokerHeartbeatState broker = brokers.get(brokerId);
if (broker == null) {
touch(brokerId, fenced, -1);
} else if (broker.fenced() != fenced) {
touch(brokerId, fenced, broker.metadataOffset);
}
}

/**
* Update broker state, including lastContactNs.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
if (!existing.incarnationId().equals(request.incarnationId())) {
// Remove any existing session for the old broker incarnation.
heartbeatManager.remove(brokerId);
existing = null;
}
}
}
Expand Down Expand Up @@ -334,11 +333,7 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
setMaxSupportedVersion(feature.maxSupportedVersion()));
}

if (existing == null) {
heartbeatManager.touch(brokerId, true, -1);
} else {
heartbeatManager.touch(brokerId, existing.fenced(), -1);
}
heartbeatManager.register(brokerId, record.fenced());

List<ApiMessageAndVersion> records = new ArrayList<>();
records.add(new ApiMessageAndVersion(record,
Expand Down Expand Up @@ -366,6 +361,10 @@ public void replay(RegisterBrokerRecord record) {
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced()));
updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
if (heartbeatManager != null) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());
}
if (prevRegistration == null) {
log.info("Registered new broker: {}", record);
} else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
Expand All @@ -385,6 +384,7 @@ public void replay(UnregisterBrokerRecord record) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration with that epoch found", record.toString()));
} else {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
brokerRegistrations.remove(brokerId);
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unregistered broker: {}", record);
Expand All @@ -401,6 +401,7 @@ public void replay(FenceBrokerRecord record) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration with that epoch found", record.toString()));
} else {
if (heartbeatManager != null) heartbeatManager.register(brokerId, true);
brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Fenced broker: {}", record);
Expand All @@ -417,6 +418,7 @@ public void replay(UnfenceBrokerRecord record) {
throw new RuntimeException(String.format("Unable to replay %s: no broker " +
"registration with that epoch found", record.toString()));
} else {
if (heartbeatManager != null) heartbeatManager.register(brokerId, false);
brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unfenced broker: {}", record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ static Map<String, String> translateCreationConfigs(CreateableTopicConfigCollect
/**
* A ClusterDescriber which supplies cluster information to our ReplicaPlacer.
*/
private final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();

private ReplicationControlManager(
SnapshotRegistry snapshotRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.ReplicationControlManager.KRaftClusterDescriber;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.LeaderRecoveryState;
Expand All @@ -73,6 +74,7 @@
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
Expand Down Expand Up @@ -251,7 +253,7 @@ void createPartitions(int count, String name,
void registerBrokers(Integer... brokerIds) throws Exception {
for (int brokerId : brokerIds) {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
setBrokerEpoch(brokerId + 100).setBrokerId(brokerId).setRack(null);
brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
setPort((short) 9092 + brokerId).
Expand Down Expand Up @@ -1725,4 +1727,24 @@ private ElectLeadersResponseData buildElectLeadersResponse(
return response;
}

@Test
public void testKRaftClusterDescriber() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
ctx.unfenceBrokers(2, 3, 4);
ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
ctx.createTestTopic("bar", new int[][]{
new int[]{2, 3, 4}, new int[]{3, 4, 2}}).topicId();
KRaftClusterDescriber describer = replication.clusterDescriber;
HashSet<UsableBroker> brokers = new HashSet<>();
describer.usableBrokers().forEachRemaining(broker -> brokers.add(broker));
assertEquals(new HashSet<>(Arrays.asList(
new UsableBroker(0, Optional.empty(), true),
new UsableBroker(1, Optional.empty(), true),
new UsableBroker(2, Optional.empty(), false),
new UsableBroker(3, Optional.empty(), false),
new UsableBroker(4, Optional.empty(), false))), brokers);
}
}

0 comments on commit d480c4a

Please sign in to comment.