Skip to content

Commit

Permalink
Offloaders: fix metrics (apache#16405)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli authored Jul 14, 2022
1 parent 596fc3c commit ac3f3ee
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public final class LedgerOffloaderStatsImpl implements LedgerOffloaderStats, Run
private final Summary readLedgerLatency;
private final Counter writeStorageError;
private final Counter readOffloadError;
private final Counter readOffloadBytes;
private final Gauge readOffloadRate;
private final Summary readOffloadIndexLatency;
private final Summary readOffloadDataLatency;
Expand Down Expand Up @@ -86,15 +87,30 @@ private LedgerOffloaderStatsImpl(boolean exposeTopicLevelMetrics,
.labelNames(labels).create().register();
this.readOffloadRate = Gauge.build("brk_ledgeroffloader_read_offload_rate", "-")
.labelNames(labels).create().register();
this.readOffloadBytes = Counter.build("brk_ledgeroffloader_read_bytes", "-")
.labelNames(labels).create().register();
this.writeStorageError = Counter.build("brk_ledgeroffloader_write_storage_error", "-")
.labelNames(labels).create().register();

this.readOffloadIndexLatency = Summary.build("brk_ledgeroffloader_read_offload_index_latency", "-")
.labelNames(labels).create().register();
.labelNames(labels).quantile(0.50, 0.01)
.quantile(0.95, 0.01)
.quantile(0.99, 0.01)
.quantile(1, 0.01)
.create().register();
this.readOffloadDataLatency = Summary.build("brk_ledgeroffloader_read_offload_data_latency", "-")
.labelNames(labels).create().register();
.labelNames(labels)
.quantile(0.50, 0.01)
.quantile(0.95, 0.01)
.quantile(0.99, 0.01)
.quantile(1, 0.01)
.create().register();
this.readLedgerLatency = Summary.build("brk_ledgeroffloader_read_ledger_latency", "-")
.labelNames(labels).create().register();
.labelNames(labels).quantile(0.50, 0.01)
.quantile(0.95, 0.01)
.quantile(0.99, 0.01)
.quantile(1, 0.01)
.create().register();

String[] deleteOpsLabels = exposeTopicLevelMetrics
? new String[]{NAMESPACE_LABEL, TOPIC_LABEL, STATUS} : new String[]{NAMESPACE_LABEL, STATUS};
Expand Down Expand Up @@ -156,6 +172,8 @@ public void recordReadOffloadBytes(String topic, long size) {
Pair<LongAdder, LongAdder> pair = this.offloadAndReadOffloadBytesMap
.computeIfAbsent(topic, __ -> new ImmutablePair<>(new LongAdder(), new LongAdder()));
pair.getRight().add(size);
String[] labelValues = this.labelValues(topic);
this.readOffloadBytes.labels(labelValues).inc(size);
this.addOrUpdateTopicAccess(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private OrderedScheduler offloaderScheduler;
private OffloadersCache offloadersCache = new OffloadersCache();
private LedgerOffloader defaultOffloader;
private final LedgerOffloaderStats offloaderStats;
private LedgerOffloaderStats offloaderStats;
private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>();
private ScheduledFuture<?> loadReportTask = null;
private ScheduledFuture<?> loadSheddingTask = null;
Expand Down Expand Up @@ -346,8 +346,8 @@ public PulsarService(ServiceConfiguration config,

int interval = config.getManagedLedgerStatsPeriodSeconds();
boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();
this.offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
exposeTopicMetrics, this.getOffloaderScheduler(), interval);
// here in the constructor we don't have the offloader scheduler yet
this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
}

public MetadataStore createConfigurationMetadataStore() throws MetadataStoreException {
Expand Down Expand Up @@ -741,8 +741,17 @@ public void start() throws PulsarServerException {
schemaRegistryService = SchemaRegistryService.create(
schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this.executor);

this.defaultOffloader = createManagedLedgerOffloader(
OffloadPoliciesImpl.create(this.getConfiguration().getProperties()));
OffloadPoliciesImpl defaultOffloadPolicies =
OffloadPoliciesImpl.create(this.getConfiguration().getProperties());
this.defaultOffloader = createManagedLedgerOffloader(defaultOffloadPolicies);

OrderedScheduler offloaderScheduler = getOffloaderScheduler(defaultOffloadPolicies);
int interval = config.getManagedLedgerStatsPeriodSeconds();
boolean exposeTopicMetrics = config.isExposeTopicLevelMetricsInPrometheus();

offloaderStats = LedgerOffloaderStats.create(config.isExposeManagedLedgerMetricsInPrometheus(),
exposeTopicMetrics, offloaderScheduler, interval);

this.brokerInterceptor = BrokerInterceptors.load(config);
brokerService.setInterceptor(getBrokerInterceptor());
this.brokerInterceptor.initialize(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ private boolean refillBufferIfNeeded() throws IOException {
try {
long startReadTime = System.nanoTime();
Blob blob = blobStore.getBlob(bucket, key, new GetOptions().range(startRange, endRange));
if (this.offloaderStats != null) {
this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - startRange + 1);
}
versionCheck.check(key, blob);

try (InputStream stream = blob.getPayload().openStream()) {
Expand All @@ -110,6 +105,15 @@ private boolean refillBufferIfNeeded() throws IOException {
}
cursor += buffer.readableBytes();
}

// here we can get the metrics
// because JClouds streams the content
// and actually the HTTP call finishes when the stream is fully read
if (this.offloaderStats != null) {
this.offloaderStats.recordReadOffloadDataLatency(managedLedgerName,
System.nanoTime() - startReadTime, TimeUnit.NANOSECONDS);
this.offloaderStats.recordReadOffloadBytes(managedLedgerName, endRange - startRange + 1);
}
} catch (Throwable e) {
if (null != this.offloaderStats) {
this.offloaderStats.recordReadOffloadError(this.managedLedgerName);
Expand Down

0 comments on commit ac3f3ee

Please sign in to comment.