Skip to content

Commit

Permalink
[FLINK-7293] [cep] Support custom order by in PatternStream
Browse files Browse the repository at this point in the history
This closes apache#4418.
  • Loading branch information
dianfu authored and dawidwys committed Aug 8, 2017
1 parent 8c9642f commit e2d3e1f
Show file tree
Hide file tree
Showing 16 changed files with 486 additions and 85 deletions.
10 changes: 7 additions & 3 deletions docs/dev/libs/cep.md
Original file line number Diff line number Diff line change
Expand Up @@ -1164,24 +1164,28 @@ pattern.within(Time.seconds(10))

After specifying the pattern sequence you are looking for, it is time to apply it to your input stream to detect
potential matches. In order to run a stream of events against your pattern sequence, you have to create a `PatternStream`.
Given an input stream `input` and a pattern `pattern`, you create the `PatternStream` by calling:
Given an input stream `input`, a pattern `pattern` and an optional comparator `comparator` which is used to
sort events with the same timestamp in case of EventTime or that arrived at the same moment, you create the
`PatternStream` by calling:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...
EventComparator<Event> comparator = ... // optional

PatternStream<Event> patternStream = CEP.pattern(input, pattern);
PatternStream<Event> patternStream = CEP.pattern(input, pattern, comparator);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val input : DataStream[Event] = ...
val pattern : Pattern[Event, _] = ...
var comparator : EventComparator[Event] = ... // optional

val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern, comparator)
{% endhighlight %}
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.cep.scala

import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.{CEP => JCEP}
import org.apache.flink.cep.{EventComparator, CEP => JCEP}
import org.apache.flink.streaming.api.scala.DataStream

/**
Expand All @@ -36,8 +36,26 @@ object CEP {
* @tparam T Type of the input events
* @return Resulting pattern stream
*/
def pattern[T](input: DataStream[T], pattern: Pattern[T, _]): PatternStream[T] = {
def pattern[T](input: DataStream[T], pattern: Pattern[T, _ <: T]): PatternStream[T] = {
wrapPatternStream(JCEP.pattern(input.javaStream, pattern.wrappedPattern))
}

/**
* Transforms a [[DataStream]] into a [[PatternStream]] in the Scala API.
* See [[org.apache.flink.cep.CEP}]]for a more detailed description how the underlying
* Java API works.
*
* @param input DataStream containing the input events
* @param pattern Pattern specification which shall be detected
* @param comparator Comparator to sort events with equal timestamps
* @tparam T Type of the input events
* @return Resulting pattern stream
*/
def pattern[T](
input: DataStream[T],
pattern: Pattern[T, _ <: T],
comparator: EventComparator[T]): PatternStream[T] = {
wrapPatternStream(JCEP.pattern(input.javaStream, pattern.wrappedPattern, comparator))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@
*/
package org.apache.flink.cep.scala

import java.util.{Map => JMap}
import java.util.{List => JList}
import java.util.{List => JList, Map => JMap}

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
import org.apache.flink.cep.pattern.{Pattern => JPattern}
import org.apache.flink.streaming.api.scala.{asScalaStream, _}
import org.apache.flink.util.Collector
Expand All @@ -31,6 +30,7 @@ import java.lang.{Long => JLong}

import org.apache.flink.cep.operator.CEPOperatorUtils
import org.apache.flink.cep.scala.pattern.Pattern

import scala.collection.Map

/**
Expand All @@ -48,7 +48,9 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {

def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])

def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream())
def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream)

def getComparator: EventComparator[T] = jPatternStream.getComparator

/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
Expand Down Expand Up @@ -91,8 +93,9 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
: DataStream[Either[L, R]] = {

val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
jPatternStream.getPattern())
jPatternStream.getInputStream,
jPatternStream.getPattern,
jPatternStream.getComparator)

val cleanedSelect = cleanClosure(patternSelectFunction)
val cleanedTimeout = cleanClosure(patternTimeoutFunction)
Expand Down Expand Up @@ -156,8 +159,9 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
patternFlatSelectFunction: PatternFlatSelectFunction[T, R])
: DataStream[Either[L, R]] = {
val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
jPatternStream.getPattern()
jPatternStream.getInputStream,
jPatternStream.getPattern,
jPatternStream.getComparator
)

val cleanedSelect = cleanClosure(patternFlatSelectFunction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
def testScalaJavaAPISelectFunForwarding {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
val pattern: Pattern[(Int, Int), _] = Pattern.begin[(Int, Int)]("dummy")
val pattern: Pattern[(Int, Int), (Int, Int)] = Pattern.begin[(Int, Int)]("dummy")
val pStream: PatternStream[(Int, Int)] = CEP.pattern(dummyDataStream, pattern)
val param = Map("begin" -> List((1, 2)))
val result: DataStream[(Int, Int)] = pStream
Expand All @@ -62,7 +62,7 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
def testScalaJavaAPIFlatSelectFunForwarding {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dummyDataStream: DataStream[List[Int]] = env.fromElements()
val pattern: Pattern[List[Int], _] = Pattern.begin[List[Int]]("dummy")
val pattern: Pattern[List[Int], List[Int]] = Pattern.begin[List[Int]]("dummy")
val pStream: PatternStream[List[Int]] = CEP.pattern(dummyDataStream, pattern)
val inList = List(1, 2, 3)
val inParam = Map("begin" -> List(inList))
Expand All @@ -88,7 +88,7 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
def testTimeoutHandling: Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val dummyDataStream: DataStream[String] = env.fromElements()
val pattern: Pattern[String, _] = Pattern.begin[String]("dummy")
val pattern: Pattern[String, String] = Pattern.begin[String]("dummy")
val pStream: PatternStream[String] = CEP.pattern(dummyDataStream, pattern)
val inParam = Map("begin" -> List("barfoo"))
val outList = new java.util.ArrayList[Either[String, String]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,17 @@ public class CEP {
public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern) {
return new PatternStream<>(input, pattern);
}

/**
* Creates a {@link PatternStream} from an input data stream and a pattern.
*
* @param input DataStream containing the input events
* @param pattern Pattern specification which shall be detected
* @param comparator Comparator to sort events with equal timestamps
* @param <T> Type of the input events
* @return Resulting pattern stream
*/
public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern, EventComparator<T> comparator) {
return new PatternStream<>(input, pattern, comparator);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cep;

import java.io.Serializable;
import java.util.Comparator;

/**
* Custom comparator used to compares two events.
*
* @param <T> Type of the event
*/
public interface EventComparator<T> extends Comparator<T>, Serializable {
long serialVersionUID = 1L;
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,19 @@ public class PatternStream<T> {

private final Pattern<T, ?> pattern;

// comparator to sort events
private final EventComparator<T> comparator;

PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
this.inputStream = inputStream;
this.pattern = pattern;
this.comparator = null;
}

PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern, final EventComparator<T> comparator) {
this.inputStream = inputStream;
this.pattern = pattern;
this.comparator = comparator;
}

public Pattern<T, ?> getPattern() {
Expand All @@ -65,6 +75,10 @@ public DataStream<T> getInputStream() {
return inputStream;
}

public EventComparator<T> getComparator() {
return comparator;
}

/**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided {@link PatternSelectFunction} is called. The pattern select function can produce
Expand Down Expand Up @@ -108,7 +122,7 @@ public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R
*/
public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
CEPOperatorUtils.createPatternStream(inputStream, pattern);
CEPOperatorUtils.createPatternStream(inputStream, pattern, comparator);

return patternStream.map(
new PatternSelectMapper<>(
Expand Down Expand Up @@ -139,7 +153,7 @@ public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
final PatternSelectFunction<T, R> patternSelectFunction) {

SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream =
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, comparator);

TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternTimeoutFunction,
Expand Down Expand Up @@ -215,7 +229,7 @@ public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunct
*/
public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
CEPOperatorUtils.createPatternStream(inputStream, pattern);
CEPOperatorUtils.createPatternStream(inputStream, pattern, comparator);

return patternStream.flatMap(
new PatternFlatSelectMapper<>(
Expand Down Expand Up @@ -247,7 +261,7 @@ public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {

SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream =
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, comparator);

TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternFlatTimeoutFunction,
Expand Down
Loading

0 comments on commit e2d3e1f

Please sign in to comment.