Skip to content

Commit

Permalink
[hotfix] [cep] Fix serialization problem and single state NFAs
Browse files Browse the repository at this point in the history
The start computation states have a null event associated. When serializing these states
one has to check whether the event is null or not, because not all serializer can handle
null values.

A single state NFA failed to compute a matching pattern because it was regarded as a
terminal state in the pattern extraction algorithm. By letting the first states start
with a two level dewey number this problem is fixed.
  • Loading branch information
tillrohrmann committed Feb 25, 2016
1 parent 56e660d commit a5ecb18
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,25 +227,28 @@ private Collection<ComputationState<T>> computeNextStates(
break;
case TAKE:
final State<T> newState = stateTransition.getTargetState();
final DeweyNumber newVersion;
final DeweyNumber oldVersion;
final DeweyNumber newComputationStateVersion;
final State<T> previousState = computationState.getState();
final T previousEvent = computationState.getEvent();
final long previousTimestamp;
final long startTimestamp;

if (computationState.isStartState()) {
newVersion = new DeweyNumber(startEventCounter++);
oldVersion = new DeweyNumber(startEventCounter++);
newComputationStateVersion = oldVersion.addStage();
startTimestamp = timestamp;
previousTimestamp = -1L;

} else {
startTimestamp = computationState.getStartTimestamp();
previousTimestamp = computationState.getTimestamp();
oldVersion = computationState.getVersion();

if (newState.equals(computationState.getState())) {
newVersion = computationState.getVersion().increase();
newComputationStateVersion = oldVersion.increase();
} else {
newVersion = computationState.getVersion().addStage();
newComputationStateVersion = oldVersion.addStage();
}
}

Expand All @@ -256,7 +259,7 @@ private Collection<ComputationState<T>> computeNextStates(
previousState,
previousEvent,
previousTimestamp,
newVersion);
oldVersion);

// a new computation state is referring to the shared entry
sharedBuffer.lock(newState, event, timestamp);
Expand All @@ -265,7 +268,7 @@ private Collection<ComputationState<T>> computeNextStates(
newState,
event,
timestamp,
newVersion,
newComputationStateVersion,
startTimestamp));
break;
}
Expand Down Expand Up @@ -366,9 +369,15 @@ private void writeComputationState(final ComputationState<T> computationState, f
oos.writeObject(computationState.getVersion());
oos.writeLong(computationState.getStartTimestamp());

DataOutputViewStreamWrapper output = new DataOutputViewStreamWrapper(oos);

nonDuplicatingTypeSerializer.serialize(computationState.getEvent(), output);
if (computationState.getEvent() == null) {
// write that we don't have an event associated
oos.writeBoolean(false);
} else {
// write that we have an event associated
oos.writeBoolean(true);
DataOutputViewStreamWrapper output = new DataOutputViewStreamWrapper(oos);
nonDuplicatingTypeSerializer.serialize(computationState.getEvent(), output);
}
}

@SuppressWarnings("unchecked")
Expand All @@ -378,8 +387,15 @@ private ComputationState<T> readComputationState(ObjectInputStream ois) throws I
final DeweyNumber version = (DeweyNumber)ois.readObject();
final long startTimestamp = ois.readLong();

DataInputViewStreamWrapper input = new DataInputViewStreamWrapper(ois);
final T event = nonDuplicatingTypeSerializer.deserialize(input);
final boolean hasEvent = ois.readBoolean();
final T event;

if (hasEvent) {
DataInputViewStreamWrapper input = new DataInputViewStreamWrapper(ois);
event = nonDuplicatingTypeSerializer.deserialize(input);
} else {
event = null;
}

return new ComputationState<>(state, event, timestamp, version, startTimestamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,36 @@ public String select(Map<String, Event> pattern) {

env.execute();
}

@Test
public void testSimplePatternWithSingleState() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer, Integer>> input = env.fromElements(
new Tuple2<>(0, 1),
new Tuple2<>(0, 2));

Pattern<Tuple2<Integer, Integer>, ?> pattern =
Pattern.<Tuple2<Integer, Integer>>begin("start")
.where(new FilterFunction<Tuple2<Integer, Integer>>() {
@Override
public boolean filter(Tuple2<Integer, Integer> rec) throws Exception {
return rec.f1 == 1;
}
});

PatternStream<Tuple2<Integer, Integer>> pStream = CEP.pattern(input, pattern);

DataStream<Tuple2<Integer, Integer>> result = pStream.select(new PatternSelectFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() {
@Override
public Tuple2<Integer, Integer> select(Map<String, Tuple2<Integer, Integer>> pattern) throws Exception {
return pattern.get("start");
}
});

result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);

expected = "(0,1)";

env.execute();
}
}
27 changes: 27 additions & 0 deletions flink-libraries/flink-cep/src/test/resources/log4j-test.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
log4j.rootLogger=OFF, testlogger

# A1 is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
34 changes: 34 additions & 0 deletions flink-libraries/flink-cep/src/test/resources/logback-test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>

<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>

<logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
<logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
<logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
<logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
</configuration>

0 comments on commit a5ecb18

Please sign in to comment.