diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 80d3fc645da9d..7f7ff69f2a825 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1404,10 +1404,6 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats topicStatsStream.endObject(); nsStats.msgReplBacklog += rStat.replicationBacklog; - // replication delay for a namespace is the max repl-delay among all the topics under this namespace - if (rStat.replicationDelayInSeconds > nsStats.maxMsgReplDelayInSeconds) { - nsStats.maxMsgReplDelayInSeconds = rStat.replicationDelayInSeconds; - } if (replStats.isMetricsEnabled()) { String namespaceClusterKey = replStats.getKeyName(namespace, cluster); @@ -1424,6 +1420,10 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats if (update) { replStats.put(namespaceClusterKey, replicationMetrics); } + // replication delay for a namespace is the max repl-delay among all the topics under this namespace + if (rStat.replicationDelayInSeconds > replicationMetrics.maxMsgReplDelayInSeconds) { + replicationMetrics.maxMsgReplDelayInSeconds = rStat.replicationDelayInSeconds; + } } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ReplicationMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ReplicationMetrics.java index dbdefa0d21397..0d675a0b54cd4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ReplicationMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/ReplicationMetrics.java @@ -33,6 +33,7 @@ public class ReplicationMetrics { public double msgRateOut; public double msgThroughputOut; public double msgReplBacklog; + public double maxMsgReplDelayInSeconds; public int connected; public void reset() { @@ -40,6 +41,7 @@ public void reset() { msgThroughputOut = 0; msgReplBacklog = 0; connected = 0; + maxMsgReplDelayInSeconds = 0; } public static ReplicationMetrics get() { @@ -81,6 +83,7 @@ public Metrics add(String namespace, String local, String remote) { dMetrics.put("brk_repl_out_tp_rate", msgThroughputOut); dMetrics.put("brk_replication_backlog", msgReplBacklog); dMetrics.put("brk_repl_is_connected", connected); + dMetrics.put("brk_max_replication_delay_second", maxMsgReplDelayInSeconds); return dMetrics;