Skip to content

Commit

Permalink
MINOR:Optimize the use of metrics in ReplicaManager and remove checks (
Browse files Browse the repository at this point in the history
…apache#13705)

Co-authored-by: Deqi Hu <[email protected]>

Reviewers: Divij Vaidya <[email protected]>, Manyanda Chitimbo <[email protected]>, Kirk True <[email protected]>
  • Loading branch information
hudeqi authored Jun 17, 2023
1 parent 2aa1555 commit 09e8adb
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 27 deletions.
71 changes: 47 additions & 24 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import kafka.log.remote.RemoteLogManager
import kafka.log.{LogManager, UnifiedLog}
import kafka.server.HostedPartition.Online
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.ReplicaManager.{AtMinIsrPartitionCountMetricName, FailedIsrUpdatesPerSecMetricName, IsrExpandsPerSecMetricName, IsrShrinksPerSecMetricName, LeaderCountMetricName, OfflineReplicaCountMetricName, PartitionCountMetricName, PartitionsWithLateTransactionsCountMetricName, ProducerIdCountMetricName, ReassigningPartitionsMetricName, UnderMinIsrPartitionCountMetricName, UnderReplicatedPartitionsMetricName}
import kafka.server.ReplicaManager.createLogReadResult
import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
import kafka.server.metadata.ZkMetadataCache
Expand Down Expand Up @@ -177,6 +178,39 @@ object HostedPartition {
object ReplicaManager {
val HighWatermarkFilename = "replication-offset-checkpoint"

private val LeaderCountMetricName = "LeaderCount"
private val PartitionCountMetricName = "PartitionCount"
private val OfflineReplicaCountMetricName = "OfflineReplicaCount"
private val UnderReplicatedPartitionsMetricName = "UnderReplicatedPartitions"
private val UnderMinIsrPartitionCountMetricName = "UnderMinIsrPartitionCount"
private val AtMinIsrPartitionCountMetricName = "AtMinIsrPartitionCount"
private val ReassigningPartitionsMetricName = "ReassigningPartitions"
private val PartitionsWithLateTransactionsCountMetricName = "PartitionsWithLateTransactionsCount"
private val ProducerIdCountMetricName = "ProducerIdCount"
private val IsrExpandsPerSecMetricName = "IsrExpandsPerSec"
private val IsrShrinksPerSecMetricName = "IsrShrinksPerSec"
private val FailedIsrUpdatesPerSecMetricName = "FailedIsrUpdatesPerSec"

private[server] val GaugeMetricNames = Set(
LeaderCountMetricName,
PartitionCountMetricName,
OfflineReplicaCountMetricName,
UnderReplicatedPartitionsMetricName,
UnderMinIsrPartitionCountMetricName,
AtMinIsrPartitionCountMetricName,
ReassigningPartitionsMetricName,
PartitionsWithLateTransactionsCountMetricName,
ProducerIdCountMetricName
)

private[server] val MeterMetricNames = Set(
IsrExpandsPerSecMetricName,
IsrShrinksPerSecMetricName,
FailedIsrUpdatesPerSecMetricName
)

private[server] val MetricNames = GaugeMetricNames.union(MeterMetricNames)

def createLogReadResult(highWatermark: Long,
leaderLogStartOffset: Long,
leaderLogEndOffset: Long,
Expand Down Expand Up @@ -282,16 +316,16 @@ class ReplicaManager(val config: KafkaConfig,
// Visible for testing
private[server] val replicaSelectorOpt: Option[ReplicaSelector] = createReplicaSelector()

metricsGroup.newGauge("LeaderCount", () => leaderPartitionsIterator.size)
metricsGroup.newGauge(LeaderCountMetricName, () => leaderPartitionsIterator.size)
// Visible for testing
private[kafka] val partitionCount = metricsGroup.newGauge("PartitionCount", () => allPartitions.size)
metricsGroup.newGauge("OfflineReplicaCount", () => offlinePartitionCount)
metricsGroup.newGauge("UnderReplicatedPartitions", () => underReplicatedPartitionCount)
metricsGroup.newGauge("UnderMinIsrPartitionCount", () => leaderPartitionsIterator.count(_.isUnderMinIsr))
metricsGroup.newGauge("AtMinIsrPartitionCount", () => leaderPartitionsIterator.count(_.isAtMinIsr))
metricsGroup.newGauge("ReassigningPartitions", () => reassigningPartitionsCount)
metricsGroup.newGauge("PartitionsWithLateTransactionsCount", () => lateTransactionsCount)
metricsGroup.newGauge("ProducerIdCount", () => producerIdCount)
private[kafka] val partitionCount = metricsGroup.newGauge(PartitionCountMetricName, () => allPartitions.size)
metricsGroup.newGauge(OfflineReplicaCountMetricName, () => offlinePartitionCount)
metricsGroup.newGauge(UnderReplicatedPartitionsMetricName, () => underReplicatedPartitionCount)
metricsGroup.newGauge(UnderMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isUnderMinIsr))
metricsGroup.newGauge(AtMinIsrPartitionCountMetricName, () => leaderPartitionsIterator.count(_.isAtMinIsr))
metricsGroup.newGauge(ReassigningPartitionsMetricName, () => reassigningPartitionsCount)
metricsGroup.newGauge(PartitionsWithLateTransactionsCountMetricName, () => lateTransactionsCount)
metricsGroup.newGauge(ProducerIdCountMetricName, () => producerIdCount)

def reassigningPartitionsCount: Int = leaderPartitionsIterator.count(_.isReassigning)

Expand All @@ -302,9 +336,9 @@ class ReplicaManager(val config: KafkaConfig,

def producerIdCount: Int = onlinePartitionsIterator.map(_.producerIdCount).sum

val isrExpandRate: Meter = metricsGroup.newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
val isrShrinkRate: Meter = metricsGroup.newMeter("IsrShrinksPerSec", "shrinks", TimeUnit.SECONDS)
val failedIsrUpdatesRate: Meter = metricsGroup.newMeter("FailedIsrUpdatesPerSec", "failedUpdates", TimeUnit.SECONDS)
val isrExpandRate: Meter = metricsGroup.newMeter(IsrExpandsPerSecMetricName, "expands", TimeUnit.SECONDS)
val isrShrinkRate: Meter = metricsGroup.newMeter(IsrShrinksPerSecMetricName, "shrinks", TimeUnit.SECONDS)
val failedIsrUpdatesRate: Meter = metricsGroup.newMeter(FailedIsrUpdatesPerSecMetricName, "failedUpdates", TimeUnit.SECONDS)

def underReplicatedPartitionCount: Int = leaderPartitionsIterator.count(_.isUnderReplicated)

Expand Down Expand Up @@ -2160,18 +2194,7 @@ class ReplicaManager(val config: KafkaConfig,
}

def removeMetrics(): Unit = {
metricsGroup.removeMetric("LeaderCount")
metricsGroup.removeMetric("PartitionCount")
metricsGroup.removeMetric("OfflineReplicaCount")
metricsGroup.removeMetric("UnderReplicatedPartitions")
metricsGroup.removeMetric("UnderMinIsrPartitionCount")
metricsGroup.removeMetric("AtMinIsrPartitionCount")
metricsGroup.removeMetric("ReassigningPartitions")
metricsGroup.removeMetric("PartitionsWithLateTransactionsCount")
metricsGroup.removeMetric("ProducerIdCount")
metricsGroup.removeMetric("IsrExpandsPerSec")
metricsGroup.removeMetric("IsrShrinksPerSec")
metricsGroup.removeMetric("FailedIsrUpdatesPerSec")
ReplicaManager.MetricNames.foreach(metricsGroup.removeMetric)
}

def beginControlledShutdown(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,9 +372,9 @@ class ReplicaManagerTest {
// Use the second instance of metrics group that is constructed. The first instance is constructed by
// ReplicaManager constructor > BrokerTopicStats > BrokerTopicMetrics.
val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(1)
verify(mockMetricsGroup, times(9)).newGauge(anyString(), any())
verify(mockMetricsGroup, times(3)).newMeter(anyString(), anyString(), any(classOf[TimeUnit]))
verify(mockMetricsGroup, times(12)).removeMetric(anyString())
ReplicaManager.GaugeMetricNames.foreach(metricName => verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
ReplicaManager.MeterMetricNames.foreach(metricName => verify(mockMetricsGroup).newMeter(ArgumentMatchers.eq(metricName), anyString(), any(classOf[TimeUnit])))
ReplicaManager.MetricNames.foreach(verify(mockMetricsGroup).removeMetric(_))

// assert that we have verified all invocations on
verifyNoMoreInteractions(mockMetricsGroup)
Expand Down

0 comments on commit 09e8adb

Please sign in to comment.