Skip to content

Commit

Permalink
[FLINK-4282] Add Offset Parameter to WindowAssigners
Browse files Browse the repository at this point in the history
  • Loading branch information
Renkai authored and aljoscha committed Aug 19, 2016
1 parent 3be9a28 commit 0977462
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow>

private final long slide;

protected SlidingEventTimeWindows(long size, long slide) {
private final long offset;

protected SlidingEventTimeWindows(long size, long slide, long offset) {
this.size = size;
this.slide = slide;
this.offset = offset;
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide;
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
Expand Down Expand Up @@ -102,7 +105,32 @@ public String toString() {
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}

/**
* Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp and offset.
*<p>
* For example, if you want window a stream by hour,but window begins at the 15th minutes
* of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
* time windows start at 0:15:00,1:15:00,2:15:00,etc.
*</p>
*
* <p>
* Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
* such as China which is using UTC+08:00,and you want a time window with size of one day,
* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
* </p>
* @param size The size of the generated windows.
* @param slide The slide interval of the generated windows.
* @param offset The offset which window start would be shifted by.
* @return The time policy.
*/
public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
offset.toMilliseconds() % slide.toMilliseconds());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,21 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin

private final long size;

private final long offset;

private final long slide;

private SlidingProcessingTimeWindows(long size, long slide) {
private SlidingProcessingTimeWindows(long size, long slide, long offset){
this.size = size;
this.slide = slide;
this.offset = offset;
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
timestamp = context.getCurrentProcessingTime();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
long lastStart = timestamp - timestamp % slide;
long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
Expand Down Expand Up @@ -94,7 +97,32 @@ public String toString() {
* @return The time policy.
*/
public static SlidingProcessingTimeWindows of(Time size, Time slide) {
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}

/**
* Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp and offset.
*<p>
* For example, if you want window a stream by hour,but window begins at the 15th minutes
* of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
* time windows start at 0:15:00,1:15:00,2:15:00,etc.
*</p>
*
* <p>
* Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
* such as China which is using UTC+08:00,and you want a time window with size of one day,
* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
* </p>
* @param size The size of the generated windows.
* @param slide The slide interval of the generated windows.
* @param offset The offset which window start would be shifted by.
* @return The time policy.
*/
public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
offset.toMilliseconds() % slide.toMilliseconds());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class SlidingTimeWindows extends SlidingEventTimeWindows {
private static final long serialVersionUID = 1L;

private SlidingTimeWindows(long size, long slide) {
super(size, slide);
super(size, slide, 0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,19 @@
public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private long size;
private final long size;

protected TumblingEventTimeWindows(long size) {
private final long offset;

protected TumblingEventTimeWindows(long size, long offset){
this.size = size;
this.offset = offset;
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
if (timestamp > Long.MIN_VALUE) {
// Long.MIN_VALUE is currently assigned when no timestamp is present
long start = timestamp - (timestamp % size);
long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
} else {
throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
Expand Down Expand Up @@ -88,7 +90,30 @@ public String toString() {
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size) {
return new TumblingEventTimeWindows(size.toMilliseconds());
return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
}

/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp and offset.
*<p>
* For example, if you want window a stream by hour,but window begins at the 15th minutes
* of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
* time windows start at 0:15:00,1:15:00,2:15:00,etc.
*</p>
*
* <p>
* Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
* such as China which is using UTC+08:00,and you want a time window with size of one day,
* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
* </p>
* @param size The size of the generated windows.
* @param offset The offset which window start would be shifted by.
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size, Time offset) {
return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,20 @@
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;

private long size;
private final long size;

private TumblingProcessingTimeWindows(long size) {
private final long offset;


private TumblingProcessingTimeWindows(long size,long offset) {
this.size = size;
this.offset = offset;
}

@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
final long now = context.getCurrentProcessingTime();
long start = now - (now % size);
long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
return Collections.singletonList(new TimeWindow(start, start + size));
}

Expand All @@ -79,9 +83,31 @@ public String toString() {
* @return The time policy.
*/
public static TumblingProcessingTimeWindows of(Time size) {
return new TumblingProcessingTimeWindows(size.toMilliseconds());
return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
}

/**
* Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp and offset.
*<p>
* For example, if you want window a stream by hour,but window begins at the 15th minutes
* of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
* time windows start at 0:15:00,1:15:00,2:15:00,etc.
*</p>
*
* <p>
* Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
* such as China which is using UTC+08:00,and you want a time window with size of one day,
* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
* </p>
* @param size The size of the generated windows.
* @param offset The offset which window start would be shifted by.
* @return The time policy.
*/
public static TumblingProcessingTimeWindows of(Time size, Time offset) {
return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds() % size.toMilliseconds());
}
@Override
public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new TimeWindow.Serializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class TumblingTimeWindows extends TumblingEventTimeWindows {
private static final long serialVersionUID = 1L;

private TumblingTimeWindows(long size) {
super(size);
super(size, 0);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,16 @@ public int compare(TimeWindow o1, TimeWindow o2) {
}
}
}

/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.operators.windowing;

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

public class TimeWindowTest {
@Test
public void testGetWindowStartWithOffset() {
//[0,7),[7,14),[14,21)...
long offset = 0;
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),0);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(6,offset,7),0);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),7);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(8,offset,7),7);

//[-4,3),[3,10),[10,17)...
offset = 3;
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-4);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(2,offset,7),-4);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),3);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(9,offset,7),3);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(10,offset,7),10);

//[-2,5),[5,12),[12,19)...
offset = -2;
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1,offset,7),-2);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(-2,offset,7),-2);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(3,offset,7),-2);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(4,offset,7),-2);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(7,offset,7),5);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(12,offset,7),12);

// for GMT+8:00
offset = - TimeUnit.HOURS.toMillis(8);
long size = TimeUnit.DAYS.toMillis(1);
Assert.assertEquals(TimeWindow.getWindowStartWithOffset(1470902048450l,offset,size),1470844800000l);
}
}
Loading

0 comments on commit 0977462

Please sign in to comment.