Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#475 from tgroh/backport_1202
Browse files Browse the repository at this point in the history
Use a NavigableSet Instead of a PriorityQueue in WatermarkManager
  • Loading branch information
tgroh authored Oct 31, 2016
2 parents 9bbba9e + 3d603f1 commit 20e9362
Showing 1 changed file with 8 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -382,7 +381,7 @@ private static class SynchronizedProcessingTimeInputWatermark implements Waterma
private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers;
private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers;

private final PriorityQueue<TimerData> pendingTimers;
private final NavigableSet<TimerData> pendingTimers;

private AtomicReference<Instant> earliestHold;

Expand All @@ -391,7 +390,7 @@ public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark>
this.pendingBundles = new HashSet<>();
this.processingTimers = new HashMap<>();
this.synchronizedProcessingTimers = new HashMap<>();
this.pendingTimers = new PriorityQueue<>();
this.pendingTimers = new TreeSet<>();
Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
for (Watermark wm : inputWms) {
initialHold = INSTANT_ORDERING.min(initialHold, wm.get());
Expand Down Expand Up @@ -460,7 +459,7 @@ public synchronized Instant getEarliestTimerTimestamp() {
}
}
if (!pendingTimers.isEmpty()) {
earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest);
earliest = INSTANT_ORDERING.min(pendingTimers.first().getTimestamp(), earliest);
}
return earliest;
}
Expand Down Expand Up @@ -637,7 +636,7 @@ public Iterable<? extends WindowedValue<?>> apply(WindowedValue<?> input) {
};

/**
* For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the
* For each (Object, NavigableSet) pair in the provided map, remove each Timer that is before the
* latestTime argument and put in in the result with the same key, then remove all of the keys
* which have no more pending timers.
*
Expand Down Expand Up @@ -998,19 +997,19 @@ public String toString() {

private static class PerKeyHolds {
private final Map<Object, KeyedHold> keyedHolds;
private final PriorityQueue<KeyedHold> allHolds;
private final NavigableSet<KeyedHold> allHolds;

private PerKeyHolds() {
this.keyedHolds = new HashMap<>();
this.allHolds = new PriorityQueue<>();
this.allHolds = new TreeSet<>();
}

/**
* Gets the minimum hold across all keys in this {@link PerKeyHolds}, or THE_END_OF_TIME if
* there are no holds within this {@link PerKeyHolds}.
*/
public Instant getMinHold() {
return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp();
return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.first().getTimestamp();
}

/**
Expand All @@ -1021,7 +1020,7 @@ public void updateHold(@Nullable Object key, Instant newHold) {
removeHold(key);
KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
keyedHolds.put(key, newKeyedHold);
allHolds.offer(newKeyedHold);
allHolds.add(newKeyedHold);
}

/**
Expand Down

0 comments on commit 20e9362

Please sign in to comment.