Skip to content

Commit

Permalink
[scala] [streaming] added group windowing example with different poli…
Browse files Browse the repository at this point in the history
…cy types

[scala] [streaming] updated example with scala delta helper usage
  • Loading branch information
senorcarbone committed Jan 3, 2015
1 parent d4ec009 commit a761cdc
Show file tree
Hide file tree
Showing 2 changed files with 214 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.windowing;


import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.deltafunction.DeltaFunction;
import org.apache.flink.streaming.api.windowing.helper.Delta;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* An example of grouped stream windowing where different eviction and trigger policies can be used.
* A source fetches events from cars every 1 sec containing their id, their current speed (kmh),
* overall elapsed distance (m) and a timestamp. The streaming
* example triggers the top speed of each car every x meters elapsed for the last y seconds.
*/
public class TopSpeedWindowingExample {

public static void main(String[] args) throws Exception {

if (!parseParameters(args)) {
return;
}

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

@SuppressWarnings({"unchecked", "rawtypes"})
DataStream topSpeeds = env
.addSource(CarSource.create(numOfCars))
.groupBy(0)
.window(Time.of(evictionSec, TimeUnit.SECONDS))
.every(Delta.of(
new DeltaFunction<Tuple4<Integer, Integer, Double, Long>>() {
@Override
public double getDelta(Tuple4<Integer, Integer, Double, Long> oldDataPoint, Tuple4<Integer, Integer, Double, Long> newDataPoint) {
return newDataPoint.f2 - oldDataPoint.f2;
}
}
, new Tuple4<Integer, Integer, Double, Long>(0, 0, 0d, 0l), triggerMeters))
.maxBy(1);

topSpeeds.print();
env.execute("CarTopSpeedWindowingExample");
}

private static class CarSource implements SourceFunction<Tuple4<Integer, Integer, Double, Long>> {
private Integer[] speeds;
private Double[] distances;

private Random rand = new Random();

private CarSource(int numOfCars) {
speeds = new Integer[numOfCars];
distances = new Double[numOfCars];
Arrays.fill(speeds, 50);
Arrays.fill(distances, 0d);
}

public static CarSource create(int cars) {
return new CarSource(cars);
}

@Override
public void invoke(Collector<Tuple4<Integer, Integer, Double, Long>> collector) throws Exception {

while (true) {
Thread.sleep(1000);
for (int carId = 0; carId < speeds.length; carId++) {
if (rand.nextBoolean())
speeds[carId] = Math.min(100, speeds[carId] + 5);
else
speeds[carId] = Math.max(0, speeds[carId] - 5);
distances[carId] += speeds[carId] / 3.6d;
collector.collect(new Tuple4<Integer, Integer, Double, Long>(carId, speeds[carId], distances[carId], System.currentTimeMillis()));
}
}
}
}

private static int numOfCars = 2;
private static int evictionSec = 10;
private static double triggerMeters = 50;

private static boolean parseParameters(String[] args) {

if (args.length > 0) {
if (args.length == 3) {
numOfCars = Integer.valueOf(args[0]);
evictionSec = Integer.valueOf(args[1]);
triggerMeters = Double.valueOf(args[2]);
} else {
System.err.println("Usage: TopSpeedWindowingExample <numCars> <evictSec> <triggerMeters>");
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.examples.scala.streaming.windowing


import java.util.concurrent.TimeUnit._
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment
import org.apache.flink.api.scala.streaming.windowing.Delta
import org.apache.flink.streaming.api.windowing.helper.Time
import org.apache.flink.util.Collector

import scala.math.{max, min}
import scala.util.Random

/**
* An example of grouped stream windowing where different eviction and trigger policies can be used.
* A source fetches events from cars every 1 sec containing their id, their current speed (kmh),
* overall elapsed distance (m) and a timestamp. The streaming
* example triggers the top speed of each car every x meters elapsed for the last y seconds.
*/
object TopSpeedWindowing {

case class CarSpeed(carId: Int, speed: Int, distance: Double, time: Long)

def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
val cars = env.addSource(carSource _).groupBy("carId")
.window(Time.of(evictionSec, SECONDS))
.every(Delta.of[CarSpeed](triggerMeters, (oldSp,newSp) => newSp.distance-oldSp.distance, CarSpeed(0,0,0,0)))
.reduce((x, y) => if (x.speed > y.speed) x else y)

cars print

env.execute("TopSpeedWindowing")

}

def carSource(out: Collector[CarSpeed]) = {

val speeds = new Array[Int](numOfCars)
val distances = new Array[Double](numOfCars)

while (true) {
Thread sleep 1000
for (i <- 0 until speeds.length) {
speeds(i) = if (Random.nextBoolean) min(100, speeds(i) + 5) else max(0, speeds(i) - 5)
distances(i) += speeds(i) / 3.6d
out.collect(new CarSpeed(i, speeds(i), distances(i), System.currentTimeMillis))
}
}
}

def parseParameters(args: Array[String]): Boolean = {
if (args.length > 0) {
if (args.length == 3) {
numOfCars = args(0).toInt
evictionSec = args(1).toInt
triggerMeters = args(2).toDouble
}
else {
System.err.println("Usage: TopSpeedWindowing <numCars> <evictSec> <triggerMeters>")
false
}
}
true
}

var numOfCars = 2
var evictionSec = 10
var triggerMeters = 50d

}

0 comments on commit a761cdc

Please sign in to comment.