Skip to content

Commit

Permalink
[FLINK-7008] [cep] Update NFA state only when the NFA changes
Browse files Browse the repository at this point in the history
This closes apache#4195.
  • Loading branch information
dianfu authored and dawidwys committed Jun 29, 2017
1 parent f8842f6 commit e3bef55
Show file tree
Hide file tree
Showing 6 changed files with 535 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Helper class which encapsulates the state of the NFA computation. It points to the current state,
Expand Down Expand Up @@ -114,6 +115,28 @@ public DeweyNumber getVersion() {
return version;
}

@Override
public boolean equals(Object obj) {
if (obj instanceof ComputationState) {
ComputationState other = (ComputationState) obj;
return Objects.equals(state, other.state) &&
Objects.equals(event, other.event) &&
counter == other.counter &&
timestamp == other.timestamp &&
Objects.equals(version, other.version) &&
startTimestamp == other.startTimestamp &&
Objects.equals(previousState, other.previousState);

} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(state, event, counter, timestamp, version, startTimestamp, previousState);
}

public static <T> ComputationState<T> createStartState(final NFA<T> nfa, final State<T> state) {
Preconditions.checkArgument(state.isStart());
return new ComputationState<>(nfa, state, null, null, 0, -1L, new DeweyNumber(1), -1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ public class NFA<T> implements Serializable {

private TypeSerializer<T> eventSerializer;

/**
* Flag indicating whether the matching status of the state machine has changed.
*/
private boolean nfaChanged;

public NFA(
final TypeSerializer<T> eventSerializer,
final long windowTime,
Expand All @@ -164,6 +169,7 @@ public NFA(
this.eventSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
this.computationStates = new LinkedList<>();
this.states = new HashSet<>();
this.nfaChanged = false;
}

public Set<State<T>> getStates() {
Expand Down Expand Up @@ -195,6 +201,22 @@ public boolean isEmpty() {
return eventSharedBuffer.isEmpty();
}

/**
* Check if the matching status of the NFA has changed so far.
*
* @return {@code true} if matching status has changed, {@code false} otherwise
*/
public boolean isNFAChanged() {
return nfaChanged;
}

/**
* Reset {@link #nfaChanged} to {@code false}.
*/
public void resetNFAChanged() {
this.nfaChanged = false;
}

/**
* Processes the next input event. If some of the computations reach a final state then the
* resulting event sequences are returned. If computations time out and timeout handling is
Expand Down Expand Up @@ -237,8 +259,15 @@ public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, Li
computationState.getCounter());

newComputationStates = Collections.emptyList();
nfaChanged = true;
} else if (event != null) {
newComputationStates = computeNextStates(computationState, event, timestamp);

if (newComputationStates.size() != 1) {
nfaChanged = true;
} else if (!newComputationStates.iterator().next().equals(computationState)) {
nfaChanged = true;
}
} else {
newComputationStates = Collections.singleton(computationState);
}
Expand Down Expand Up @@ -302,7 +331,9 @@ public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, Li

// remove all elements which are expired
// with respect to the window length
eventSharedBuffer.prune(pruningTimestamp);
if (eventSharedBuffer.prune(pruningTimestamp)) {
nfaChanged = true;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,20 +189,26 @@ public boolean isEmpty() {
*
* @param pruningTimestamp The time which is used for pruning. All elements whose timestamp is
* lower than the pruning timestamp will be removed.
* @return {@code true} if pruning happened
*/
public void prune(long pruningTimestamp) {
public boolean prune(long pruningTimestamp) {
Iterator<Map.Entry<K, SharedBufferPage<K, V>>> iter = pages.entrySet().iterator();
boolean pruned = false;

while (iter.hasNext()) {
SharedBufferPage<K, V> page = iter.next().getValue();

page.prune(pruningTimestamp);
if (page.prune(pruningTimestamp)) {
pruned = true;
}

if (page.isEmpty()) {
// delete page if it is empty
iter.remove();
}
}

return pruned;
}

/**
Expand Down Expand Up @@ -488,20 +494,25 @@ public SharedBufferEntry<K, V> get(final ValueTimeWrapper<V> valueTime) {
* Removes all entries from the map whose timestamp is smaller than the pruning timestamp.
*
* @param pruningTimestamp Timestamp for the pruning
* @return {@code true} if pruning happened
*/
public void prune(long pruningTimestamp) {
public boolean prune(long pruningTimestamp) {
Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>> iterator = entries.entrySet().iterator();
boolean continuePruning = true;
boolean pruned = false;

while (iterator.hasNext() && continuePruning) {
SharedBufferEntry<K, V> entry = iterator.next().getValue();

if (entry.getValueTime().getTimestamp() <= pruningTimestamp) {
iterator.remove();
pruned = true;
} else {
continuePruning = false;
}
}

return pruned;
}

public boolean isEmpty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,13 @@ private NFA<IN> getNFA() throws IOException {
}

private void updateNFA(NFA<IN> nfa) throws IOException {
if (nfa.isEmpty()) {
nfaOperatorState.clear();
} else {
nfaOperatorState.update(nfa);
if (nfa.isNFAChanged()) {
if (nfa.isEmpty()) {
nfaOperatorState.clear();
} else {
nfa.resetNFAChanged();
nfaOperatorState.update(nfa);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cep.nfa;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;

import org.junit.Test;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Tests if the {@link NFA} status ({@link NFA#computationStates} or {@link NFA#eventSharedBuffer})
* is changed after processing events.
*/
public class NFAStatusChangeITCase {

@Test
public void testNFAChange() {
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 1858562682635302605L;

@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).followedByAny("middle").where(new IterativeCondition<Event>() {
private static final long serialVersionUID = 8061969839441121955L;

@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return value.getName().equals("b");
}
}).oneOrMore().optional().allowCombinations().followedBy("middle2").where(new IterativeCondition<Event>() {
private static final long serialVersionUID = 8061969839441121955L;

@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return value.getName().equals("d");
}
}).followedBy("end").where(new IterativeCondition<Event>() {
private static final long serialVersionUID = 8061969839441121955L;

@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return value.getName().equals("e");
}
}).within(Time.milliseconds(10));

NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
NFA<Event> nfa = nfaFactory.createNFA();

nfa.process(new Event(1, "b", 1.0), 1L);
assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfa.isNFAChanged());

nfa.resetNFAChanged();
nfa.process(new Event(2, "a", 1.0), 2L);
assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfa.isNFAChanged());

// the status of the queue of ComputationStatus changed,
// more than one ComputationStatus is generated by the event from some ComputationStatus
nfa.resetNFAChanged();
nfa.process(new Event(3, "f", 1.0), 3L);
assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfa.isNFAChanged());

// both the queue of ComputationStatus and eventSharedBuffer have not changed
nfa.resetNFAChanged();
nfa.process(new Event(4, "f", 1.0), 4L);
assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfa.isNFAChanged());

// both the queue of ComputationStatus and eventSharedBuffer have changed
nfa.resetNFAChanged();
nfa.process(new Event(5, "b", 1.0), 5L);
assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfa.isNFAChanged());

// both the queue of ComputationStatus and eventSharedBuffer have changed
nfa.resetNFAChanged();
nfa.process(new Event(6, "d", 1.0), 6L);
assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfa.isNFAChanged());

// both the queue of ComputationStatus and eventSharedBuffer have not changed
// as the timestamp is within the window
nfa.resetNFAChanged();
nfa.process(null, 8L);
assertFalse("NFA status should not change as the timestamp is within the window", nfa.isNFAChanged());

// timeout ComputationStatus will be removed from the queue of ComputationStatus and timeout event will
// be removed from eventSharedBuffer as the timeout happens
nfa.resetNFAChanged();
Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.process(null, 12L).f1;
assertTrue("NFA status should change as timeout happens", nfa.isNFAChanged() && !timeoutResults.isEmpty());
}

@Test
public void testNFAChangedOnOneNewComputationState() {
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
}).followedBy("a*").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 1858562682635302605L;

@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
}).oneOrMore().optional().next("end").where(new IterativeCondition<Event>() {
private static final long serialVersionUID = 8061969839441121955L;

@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return value.getName().equals("b");
}
}).within(Time.milliseconds(10));

NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
NFA<Event> nfa = nfaFactory.createNFA();

nfa.resetNFAChanged();
nfa.process(new Event(6, "start", 1.0), 6L);

nfa.resetNFAChanged();
nfa.process(new Event(6, "a", 1.0), 7L);
assertTrue(nfa.isNFAChanged());
}

@Test
public void testNFAChangedOnTimeoutWithoutPrune() {
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return value.getName().equals("start");
}
}).followedBy("end").where(new IterativeCondition<Event>() {
private static final long serialVersionUID = 8061969839441121955L;

@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
return value.getName().equals("end");
}
}).within(Time.milliseconds(10));

NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
NFA<Event> nfa = nfaFactory.createNFA();

nfa.resetNFAChanged();
nfa.process(new Event(6, "start", 1.0), 6L);

nfa.resetNFAChanged();
nfa.process(new Event(6, "end", 1.0), 17L);
assertTrue(nfa.isNFAChanged());
}
}
Loading

0 comments on commit e3bef55

Please sign in to comment.