Skip to content

Commit

Permalink
[FLINK-35886][task] Support markStart and markEnd listeners in TimerG…
Browse files Browse the repository at this point in the history
…auge
  • Loading branch information
pnowojski committed Aug 17, 2024
1 parent 8ad0346 commit 8476664
Showing 2 changed files with 108 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -24,6 +24,9 @@
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import java.util.ArrayList;
import java.util.Collection;

/**
* {@link TimerGauge} measures how much time is spent in a given state, with entry into that state
* being signaled by {@link #markStart()}. Measuring is stopped by {@link #markEnd()}. This class in
@@ -37,6 +40,8 @@ public class TimerGauge implements Gauge<Long>, View {

private final Clock clock;

private final Collection<StartStopListener> startStopListeners = new ArrayList<>();

/** The time-span over which the average is calculated. */
private final int timeSpanInSeconds;
/** Circular array containing the history of values. */
@@ -82,21 +87,43 @@ public TimerGauge(Clock clock, int timeSpanInSeconds) {
this.values = new long[this.timeSpanInSeconds / UPDATE_INTERVAL_SECONDS];
}

public synchronized void registerListener(StartStopListener listener) {
if (currentMeasurementStartTS != 0) {
listener.markStart();
}
startStopListeners.add(listener);
}

public synchronized void unregisterListener(StartStopListener listener) {
if (currentMeasurementStartTS != 0) {
listener.markEnd();
}
startStopListeners.remove(listener);
}

public synchronized void markStart() {
if (currentMeasurementStartTS == 0) {
currentUpdateTS = clock.absoluteTimeMillis();
currentMeasurementStartTS = currentUpdateTS;
if (currentMeasurementStartTS != 0) {
return;
}
currentUpdateTS = clock.absoluteTimeMillis();
currentMeasurementStartTS = currentUpdateTS;
for (StartStopListener startStopListener : startStopListeners) {
startStopListener.markStart();
}
}

public synchronized void markEnd() {
if (currentMeasurementStartTS != 0) {
long currentMeasurement = clock.absoluteTimeMillis() - currentMeasurementStartTS;
currentCount += currentMeasurement;
accumulatedCount += currentMeasurement;
currentMaxSingleMeasurement = Math.max(currentMaxSingleMeasurement, currentMeasurement);
currentUpdateTS = 0;
currentMeasurementStartTS = 0;
if (currentMeasurementStartTS == 0) {
return;
}
long currentMeasurement = clock.absoluteTimeMillis() - currentMeasurementStartTS;
currentCount += currentMeasurement;
accumulatedCount += currentMeasurement;
currentMaxSingleMeasurement = Math.max(currentMaxSingleMeasurement, currentMeasurement);
currentUpdateTS = 0;
currentMeasurementStartTS = 0;
for (StartStopListener startStopListener : startStopListeners) {
startStopListener.markEnd();
}
}

@@ -164,4 +191,17 @@ public synchronized long getCount() {
public synchronized boolean isMeasuring() {
return currentMeasurementStartTS != 0;
}

/**
* Listens for {@link TimerGauge#markStart()} and {@link TimerGauge#markEnd()} events.
*
* <p>Beware! As it is right now, {@link StartStopListener} is notified under the {@link
* TimerGauge}'s lock, so those callbacks should be very short, without long call stacks that
* acquire more locks. Otherwise, a potential for deadlocks can be introduced.
*/
public interface StartStopListener {
void markStart();

void markEnd();
}
}
Original file line number Diff line number Diff line change
@@ -28,6 +28,7 @@

import org.apache.flink.metrics.View;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.clock.SystemClock;

import org.junit.jupiter.api.Test;

@@ -148,4 +149,61 @@ void testLargerTimespan() {
assertThat(gauge.getMaxSingleMeasurement()).isEqualTo(SLEEP / 2);
assertThat(gauge.getAccumulatedCount()).isEqualTo(3 * SLEEP + SLEEP / 2);
}

@Test
void testListeners() {
TimerGauge gauge = new TimerGauge(SystemClock.getInstance(), View.UPDATE_INTERVAL_SECONDS);
TestStartStopListener listener1 = new TestStartStopListener();
TestStartStopListener listener2 = new TestStartStopListener();

gauge.registerListener(listener1);

gauge.markStart();
listener1.assertCounts(1, 0);
gauge.markEnd();
listener1.assertCounts(1, 1);

gauge.markStart();
gauge.registerListener(listener2);
listener1.assertCounts(2, 1);
listener2.assertCounts(1, 0);
gauge.markEnd();
listener1.assertCounts(2, 2);
listener2.assertCounts(1, 1);

gauge.markStart();
gauge.unregisterListener(listener1);
listener1.assertCounts(3, 3);
listener2.assertCounts(2, 1);

gauge.markEnd();
listener2.assertCounts(2, 2);

gauge.unregisterListener(listener2);

gauge.markStart();
gauge.markEnd();
listener1.assertCounts(3, 3);
listener2.assertCounts(2, 2);
}

static class TestStartStopListener implements TimerGauge.StartStopListener {
long startCount;
long endCount;

@Override
public void markStart() {
startCount++;
}

@Override
public void markEnd() {
endCount++;
}

public void assertCounts(long expectedStart, long expectedEnd) {
assertThat(startCount).isEqualTo(expectedStart);
assertThat(endCount).isEqualTo(expectedEnd);
}
}
}

0 comments on commit 8476664

Please sign in to comment.