Skip to content

Commit

Permalink
MINOR: inline metrics in RecordAccumulator (apache#12227)
Browse files Browse the repository at this point in the history
Reviewers: Kvicii <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
dajac authored Jun 1, 2022
1 parent 4c9eeef commit 283a995
Showing 1 changed file with 14 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,11 @@
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.AbstractRecords;
import org.apache.kafka.common.record.CompressionRatioEstimator;
Expand Down Expand Up @@ -195,29 +192,20 @@ public RecordAccumulator(LogContext logContext,
}

private void registerMetrics(Metrics metrics, String metricGrpName) {
MetricName metricName = metrics.metricName("waiting-threads", metricGrpName, "The number of user threads blocked waiting for buffer memory to enqueue their records");
Measurable waitingThreads = new Measurable() {
public double measure(MetricConfig config, long now) {
return free.queued();
}
};
metrics.addMetric(metricName, waitingThreads);

metricName = metrics.metricName("buffer-total-bytes", metricGrpName, "The maximum amount of buffer memory the client can use (whether or not it is currently used).");
Measurable totalBytes = new Measurable() {
public double measure(MetricConfig config, long now) {
return free.totalMemory();
}
};
metrics.addMetric(metricName, totalBytes);

metricName = metrics.metricName("buffer-available-bytes", metricGrpName, "The total amount of buffer memory that is not being used (either unallocated or in the free list).");
Measurable availableBytes = new Measurable() {
public double measure(MetricConfig config, long now) {
return free.availableMemory();
}
};
metrics.addMetric(metricName, availableBytes);
metrics.addMetric(
metrics.metricName("waiting-threads", metricGrpName,
"The number of user threads blocked waiting for buffer memory to enqueue their records"),
(config, now) -> free.queued());

metrics.addMetric(
metrics.metricName("buffer-total-bytes", metricGrpName,
"The maximum amount of buffer memory the client can use (whether or not it is currently used)."),
(config, now) -> free.totalMemory());

metrics.addMetric(
metrics.metricName("buffer-available-bytes", metricGrpName,
"The total amount of buffer memory that is not being used (either unallocated or in the free list)."),
(config, now) -> free.availableMemory());
}

private void setPartition(AppendCallbacks callbacks, int partition) {
Expand Down

0 comments on commit 283a995

Please sign in to comment.