Skip to content

Commit

Permalink
KAFKA-14457; Controller metrics should only expose committed data (ap…
Browse files Browse the repository at this point in the history
…ache#12994)

The controller metrics in the controllers has three problems. 1) the active controller exposes uncommitted data in the metrics. 2) the active controller doesn't update the metrics when the uncommitted data gets aborted. 3) the controller doesn't update the metrics when the entire state gets reset.

We fix these issues by only updating the metrics when processing committed metadata records and reset the metrics when the metadata state is reset.

This change adds a new type `ControllerMetricsManager` which processes committed metadata records and updates the metrics accordingly. This change also removes metrics updating responsibilities from the rest of the controller managers. 

Reviewers: Ron Dagostino <[email protected]>
  • Loading branch information
jsancio authored Dec 20, 2022
1 parent 71ea16f commit 44b3177
Show file tree
Hide file tree
Showing 16 changed files with 765 additions and 322 deletions.
6 changes: 6 additions & 0 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,15 @@
<!-- default is 200 -->
<property name="max" value="500"/>
</module>

<!-- Allows the use of the @SuppressWarnings annotation in the code -->
<module name="SuppressWarningsHolder"/>
</module>

<module name="SuppressionFilter">
<property name="file" value="${suppressionsFile}"/>
</module>

<!-- Allows the use of the @SuppressWarnings annotation in the code -->
<module name="SuppressWarningsFilter"/>
</module>
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineInteger;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -105,12 +104,9 @@ public TopicIdPartition next() {
*/
private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers;

private final TimelineInteger offlinePartitionCount;

BrokersToIsrs(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
this.isrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
this.offlinePartitionCount = new TimelineInteger(snapshotRegistry);
}

/**
Expand All @@ -131,9 +127,6 @@ void update(Uuid topicId, int partitionId, int[] prevIsr, int[] nextIsr,
} else {
if (prevLeader == NO_LEADER) {
prev = Replicas.copyWith(prevIsr, NO_LEADER);
if (nextLeader != NO_LEADER) {
offlinePartitionCount.decrement();
}
} else {
prev = Replicas.clone(prevIsr);
}
Expand All @@ -145,9 +138,6 @@ void update(Uuid topicId, int partitionId, int[] prevIsr, int[] nextIsr,
} else {
if (nextLeader == NO_LEADER) {
next = Replicas.copyWith(nextIsr, NO_LEADER);
if (prevLeader != NO_LEADER) {
offlinePartitionCount.increment();
}
} else {
next = Replicas.clone(nextIsr);
}
Expand Down Expand Up @@ -191,9 +181,6 @@ void update(Uuid topicId, int partitionId, int[] prevIsr, int[] nextIsr,
void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
if (topicMap != null) {
if (brokerId == NO_LEADER && topicMap.containsKey(topicId)) {
offlinePartitionCount.set(offlinePartitionCount.get() - topicMap.get(topicId).length);
}
topicMap.remove(topicId);
}
}
Expand Down Expand Up @@ -303,8 +290,4 @@ PartitionsOnReplicaIterator partitionsWithBrokerInIsr(int brokerId) {
boolean hasLeaderships(int brokerId) {
return iterator(brokerId, true).hasNext();
}

int offlinePartitionCount() {
return offlinePartitionCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ static class Builder {
private SnapshotRegistry snapshotRegistry = null;
private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
private ReplicaPlacer replicaPlacer = null;
private ControllerMetrics controllerMetrics = null;
private FeatureControlManager featureControl = null;
private boolean zkMigrationEnabled = false;

Expand Down Expand Up @@ -117,11 +116,6 @@ Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
return this;
}

Builder setControllerMetrics(ControllerMetrics controllerMetrics) {
this.controllerMetrics = controllerMetrics;
return this;
}

Builder setFeatureControlManager(FeatureControlManager featureControl) {
this.featureControl = featureControl;
return this;
Expand All @@ -145,9 +139,6 @@ ClusterControlManager build() {
if (replicaPlacer == null) {
replicaPlacer = new StripedReplicaPlacer(new Random());
}
if (controllerMetrics == null) {
throw new RuntimeException("You must specify ControllerMetrics");
}
if (featureControl == null) {
throw new RuntimeException("You must specify FeatureControlManager");
}
Expand All @@ -157,7 +148,6 @@ ClusterControlManager build() {
snapshotRegistry,
sessionTimeoutNs,
replicaPlacer,
controllerMetrics,
featureControl,
zkMigrationEnabled
);
Expand Down Expand Up @@ -230,11 +220,6 @@ boolean check() {
*/
private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;

/**
* A reference to the controller's metrics registry.
*/
private final ControllerMetrics controllerMetrics;

/**
* The broker heartbeat manager, or null if this controller is on standby.
*/
Expand All @@ -260,7 +245,6 @@ private ClusterControlManager(
SnapshotRegistry snapshotRegistry,
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer,
ControllerMetrics metrics,
FeatureControlManager featureControl,
boolean zkMigrationEnabled
) {
Expand All @@ -274,7 +258,6 @@ private ClusterControlManager(
this.registerBrokerRecordOffsets = new TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
this.controllerMetrics = metrics;
this.featureControl = featureControl;
this.zkMigrationEnabled = zkMigrationEnabled;
}
Expand Down Expand Up @@ -415,8 +398,9 @@ BrokerFeature processRegistrationFeature(
}

public OptionalLong registerBrokerRecordOffset(int brokerId) {
if (registerBrokerRecordOffsets.containsKey(brokerId)) {
return OptionalLong.of(registerBrokerRecordOffsets.get(brokerId));
Long registrationOffset = registerBrokerRecordOffsets.get(brokerId);
if (registrationOffset != null) {
return OptionalLong.of(registrationOffset);
}
return OptionalLong.empty();
}
Expand All @@ -442,7 +426,6 @@ public void replay(RegisterBrokerRecord record, long offset) {
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced(),
record.inControlledShutdown(), record.isMigratingZkBroker()));
updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
if (heartbeatManager != null) {
if (prevRegistration != null) heartbeatManager.remove(brokerId);
heartbeatManager.register(brokerId, record.fenced());
Expand All @@ -469,7 +452,6 @@ public void replay(UnregisterBrokerRecord record) {
} else {
if (heartbeatManager != null) heartbeatManager.remove(brokerId);
brokerRegistrations.remove(brokerId);
updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unregistered broker: {}", record);
}
}
Expand Down Expand Up @@ -498,11 +480,11 @@ public void replay(BrokerRegistrationChangeRecord record) {
BrokerRegistrationFencingChange fencingChange =
BrokerRegistrationFencingChange.fromValue(record.fenced()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for fenced field: %d", record, record.fenced())));
"value for fenced field: %x", record, record.fenced())));
BrokerRegistrationInControlledShutdownChange inControlledShutdownChange =
BrokerRegistrationInControlledShutdownChange.fromValue(record.inControlledShutdown()).orElseThrow(
() -> new IllegalStateException(String.format("Unable to replay %s: unknown " +
"value for inControlledShutdown field: %d", record, record.inControlledShutdown())));
"value for inControlledShutdown field: %x", record, record.inControlledShutdown())));
replayRegistrationChange(
record,
record.brokerId(),
Expand Down Expand Up @@ -533,7 +515,6 @@ private void replayRegistrationChange(
);
if (!curRegistration.equals(nextRegistration)) {
brokerRegistrations.put(brokerId, nextRegistration);
updateMetrics(curRegistration, nextRegistration);
} else {
log.info("Ignoring no-op registration change for {}", curRegistration);
}
Expand All @@ -547,35 +528,6 @@ private void replayRegistrationChange(
}
}

private void updateMetrics(BrokerRegistration prevRegistration, BrokerRegistration registration) {
if (registration == null) {
if (prevRegistration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
} else {
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
}
log.info("Removed broker: {}", prevRegistration.id());
} else if (prevRegistration == null) {
if (registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
log.info("Added new fenced broker: {}", registration.id());
} else {
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
log.info("Added new unfenced broker: {}", registration.id());
}
} else {
if (prevRegistration.fenced() && !registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
log.info("Unfenced broker: {}", registration.id());
} else if (!prevRegistration.fenced() && registration.fenced()) {
controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
log.info("Fenced broker: {}", registration.id());
}
}
}

Iterator<UsableBroker> usableBrokers() {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
Expand All @@ -588,7 +540,7 @@ Iterator<UsableBroker> usableBrokers() {
* Returns true if the broker is unfenced; Returns false if it is
* not or if it does not exist.
*/
public boolean unfenced(int brokerId) {
public boolean isUnfenced(int brokerId) {
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) return false;
return !registration.fenced();
Expand Down Expand Up @@ -618,7 +570,7 @@ public boolean inControlledShutdown(int brokerId) {
* Returns true if the broker is active. Active means not fenced nor in controlled
* shutdown; Returns false if it is not active or if it does not exist.
*/
public boolean active(int brokerId) {
public boolean isActive(int brokerId) {
BrokerRegistration registration = brokerRegistrations.get(brokerId);
if (registration == null) return false;
return !registration.inControlledShutdown() && !registration.fenced();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public interface ControllerMetrics extends AutoCloseable {

int activeBrokerCount();

void setGlobalTopicsCount(int topicCount);
void setGlobalTopicCount(int topicCount);

int globalTopicsCount();
int globalTopicCount();

void setGlobalPartitionCount(int partitionCount);

Expand Down
Loading

0 comments on commit 44b3177

Please sign in to comment.