Skip to content

Commit

Permalink
MINOR: log new coordinator partition load schedule time (apache#15017)
Browse files Browse the repository at this point in the history
The current load summary exposes the time from when the partition load operation is scheduled to when the load completes. We are missing the information of how long the scheduled operation stays in the scheduler. Log that information.

Reviewers: David Jacot <[email protected]>
  • Loading branch information
jeffkbkim authored Jan 18, 2024
1 parent acab465 commit 96f852f
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class CoordinatorLoaderImpl[T](
future: CompletableFuture[LoadSummary],
startTimeMs: Long
): Unit = {
val schedulerQueueTimeMs = time.milliseconds() - startTimeMs
try {
replicaManager.getLog(tp) match {
case None =>
Expand Down Expand Up @@ -193,7 +194,7 @@ class CoordinatorLoaderImpl[T](
s"Stopped loading records from $tp because the partition is not online or is no longer the leader."
))
} else if (isRunning.get) {
future.complete(new LoadSummary(startTimeMs, endTimeMs, numRecords, numBytes))
future.complete(new LoadSummary(startTimeMs, endTimeMs, schedulerQueueTimeMs, numRecords, numBytes))
} else {
future.completeExceptionally(new RuntimeException("Coordinator loader is closed."))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ public short unknownType() {
class LoadSummary {
private final long startTimeMs;
private final long endTimeMs;
private final long schedulerQueueTimeMs;
private final long numRecords;
private final long numBytes;

public LoadSummary(long startTimeMs, long endTimeMs, long numRecords, long numBytes) {
public LoadSummary(long startTimeMs, long endTimeMs, long schedulerQueueTimeMs, long numRecords, long numBytes) {
this.startTimeMs = startTimeMs;
this.endTimeMs = endTimeMs;
this.schedulerQueueTimeMs = schedulerQueueTimeMs;
this.numRecords = numRecords;
this.numBytes = numBytes;
}
Expand All @@ -71,6 +73,10 @@ public long endTimeMs() {
return endTimeMs;
}

public long schedulerQueueTimeMs() {
return schedulerQueueTimeMs;
}

public long numRecords() {
return numRecords;
}
Expand All @@ -84,6 +90,7 @@ public String toString() {
return "LoadSummary(" +
"startTimeMs=" + startTimeMs +
", endTimeMs=" + endTimeMs +
", schedulerQueueTimeMs=" + schedulerQueueTimeMs +
", numRecords=" + numRecords +
", numBytes=" + numBytes + ")";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1630,10 +1630,12 @@ public void scheduleLoadOperation(
ctx.transitionTo(CoordinatorState.ACTIVE);
if (summary != null) {
runtimeMetrics.recordPartitionLoadSensor(summary.startTimeMs(), summary.endTimeMs());
log.info("Finished loading of metadata from {} in {}ms with epoch {} where {}ms " +
"was spent in the scheduler. Loaded {} records which total to {} bytes.",
summary.endTimeMs() - summary.startTimeMs(), tp, partitionEpoch,
summary.schedulerQueueTimeMs(), summary.numRecords(), summary.numBytes()
);
}
log.info("Finished loading of metadata from {} with epoch {} and LoadSummary={}.",
tp, partitionEpoch, summary
);
} catch (Throwable ex) {
log.error("Failed to load metadata from {} with epoch {} due to {}.",
tp, partitionEpoch, ex.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,7 @@ public void testPartitionLoadSensor() {
new CoordinatorLoader.LoadSummary(
startTimeMs,
startTimeMs + 1000,
startTimeMs + 500,
30,
3000),
Collections.emptyList(),
Expand Down Expand Up @@ -2385,6 +2386,7 @@ public void testPartitionLoadGeneratesSnapshotAtHighWatermark() {
new CoordinatorLoader.LoadSummary(
1000,
2000,
1500,
30,
3000),
Arrays.asList(5L, 15L, 27L),
Expand Down Expand Up @@ -2440,6 +2442,7 @@ public void testPartitionLoadGeneratesSnapshotAtHighWatermarkNoRecordsLoaded() {
new CoordinatorLoader.LoadSummary(
1000,
2000,
1500,
30,
3000),
Collections.emptyList(),
Expand Down

0 comments on commit 96f852f

Please sign in to comment.