From 3d323ba10171da6e6416efca832b76da46ea010b Mon Sep 17 00:00:00 2001 From: kkloudas Date: Mon, 5 Feb 2018 14:36:53 +0100 Subject: [PATCH] [FLINK-8561] [cep] Fix SharedBuffer.removeEdges to use .equals(). This closes #5414. --- .../apache/flink/cep/nfa/SharedBuffer.java | 21 ++++++++----------- 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java index f34eb5fbd687f..5f766e23dbb40 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java @@ -175,7 +175,7 @@ public boolean isEmpty() { * @return {@code true} if pruning happened */ public boolean prune(long pruningTimestamp) { - final List> prunedEntries = new ArrayList<>(); + final Set> prunedEntries = new HashSet<>(); final Iterator>> it = pages.entrySet().iterator(); while (it.hasNext()) { @@ -428,7 +428,7 @@ public SharedBufferEntry get(final ValueTimeWrapper valueTime) { * @param pruningTimestamp Timestamp for the pruning * @param prunedEntries a {@link Set} to put the removed {@link SharedBufferEntry SharedBufferEntries}. */ - private void prune(final long pruningTimestamp, final List> prunedEntries) { + private void prune(final long pruningTimestamp, final Set> prunedEntries) { Iterator, SharedBufferEntry>> it = entries.entrySet().iterator(); while (it.hasNext()) { SharedBufferEntry entry = it.next().getValue(); @@ -442,7 +442,7 @@ private void prune(final long pruningTimestamp, final List> prunedEntries) { + private void removeEdges(final Set> prunedEntries) { for (SharedBufferEntry entry : entries.values()) { entry.removeEdges(prunedEntries); } @@ -542,15 +542,12 @@ public void addEdge(SharedBufferEdge edge) { /** * Remove edges with the specified targets. */ - private void removeEdges(final List> prunedEntries) { - Iterator> itor = edges.iterator(); - while (itor.hasNext()) { - SharedBufferEdge edge = itor.next(); - for (SharedBufferEntry prunedEntry : prunedEntries) { - if (prunedEntry == edge.getTarget()) { - itor.remove(); - break; - } + private void removeEdges(final Set> prunedEntries) { + Iterator> it = edges.iterator(); + while (it.hasNext()) { + SharedBufferEdge edge = it.next(); + if (prunedEntries.contains(edge.getTarget())) { + it.remove(); } } }