implements TriggerHandler {
-
- /**
- * Expire old events every EXPIRE_EVENTS_THRESHOLD to
- * keep the window size in check.
- *
- * Note that if the eviction policy is based on watermarks, events will not be evicted until a new
- * watermark would cause them to be considered expired anyway, regardless of this limit
- */
- protected static final int EXPIRE_EVENTS_THRESHOLD = 100;
-
- protected final Collection> queue;
- protected EvictionPolicy evictionPolicy;
- protected TriggerPolicy triggerPolicy;
- protected final WindowLifecycleListener> windowLifecycleListener;
- private final List> expiredEvents;
- private final Set> prevWindowEvents;
- private final AtomicInteger eventsSinceLastExpiry;
- private final ReentrantLock lock;
-
- /**
- * Constructs a {@link WindowManager}
- *
- * @param lifecycleListener the {@link WindowLifecycleListener}
- * @param queue a collection where the events in the window can be enqueued.
- *
- * Note: This collection has to be thread safe.
- */
- public WindowManager(WindowLifecycleListener> lifecycleListener, Collection> queue) {
- windowLifecycleListener = lifecycleListener;
- this.queue = queue;
- expiredEvents = new ArrayList<>();
- prevWindowEvents = new HashSet<>();
- eventsSinceLastExpiry = new AtomicInteger();
- lock = new ReentrantLock(true);
- }
-
- public void setEvictionPolicy(EvictionPolicy evictionPolicy) {
- this.evictionPolicy = evictionPolicy;
- }
-
- public void setTriggerPolicy(TriggerPolicy triggerPolicy) {
- this.triggerPolicy = triggerPolicy;
- }
-
- /**
- * Add an event into the window, with the given ts as the tracking ts.
- *
- * @param event the event to track
- * @param ts the timestamp
- */
- public void add(T event, long ts, byte[] messageId, String topic) {
- add(new EventImpl<>(event, ts, messageId, topic));
- }
-
- /**
- * Tracks a window event
- *
- * @param windowEvent the window event to track
- */
- public void add(Event windowEvent) {
- // watermark events are not added to the queue.
- if (windowEvent.isWatermark()) {
- log.debug(String.format("Got watermark event with ts %d", windowEvent.getTimestamp()));
- } else {
- queue.add(windowEvent);
- }
- track(windowEvent);
- compactWindow();
- }
-
- /**
- * The callback invoked by the trigger policy.
- */
- @Override
- public boolean onTrigger() {
- List> windowEvents = null;
- List> expired = null;
-
- try {
- lock.lock();
- /*
- * scan the entire window to handle out of order events in
- * the case of time based windows.
- */
- windowEvents = scanEvents(true);
- expired = new ArrayList<>(expiredEvents);
- expiredEvents.clear();
- } finally {
- lock.unlock();
- }
-
- List> events = new ArrayList<>();
- List> newEvents = new ArrayList<>();
- for (Event event : windowEvents) {
- events.add(event);
- if (!prevWindowEvents.contains(event)) {
- newEvents.add(event);
- }
- }
- prevWindowEvents.clear();
- if (!events.isEmpty()) {
- prevWindowEvents.addAll(windowEvents);
- log.debug(String.format("invoking windowLifecycleListener onActivation, [%d] events in "
- + "window.", events.size()));
- windowLifecycleListener.onActivation(events, newEvents, expired,
- evictionPolicy.getContext().getReferenceTime());
- } else {
- log.debug("No events in the window, skipping onActivation");
- }
- triggerPolicy.reset();
- return !events.isEmpty();
- }
-
- public void shutdown() {
- log.debug("Shutting down WindowManager");
- if (triggerPolicy != null) {
- triggerPolicy.shutdown();
- }
- }
-
- /**
- * expires events that fall out of the window every
- * EXPIRE_EVENTS_THRESHOLD so that the window does not grow
- * too big.
- */
- protected void compactWindow() {
- if (eventsSinceLastExpiry.incrementAndGet() >= EXPIRE_EVENTS_THRESHOLD) {
- scanEvents(false);
- }
- }
-
- /**
- * feed the event to the eviction and trigger policies
- * for bookkeeping and optionally firing the trigger.
- */
- private void track(Event windowEvent) {
- evictionPolicy.track(windowEvent);
- triggerPolicy.track(windowEvent);
- }
-
- /**
- * Scan events in the queue, using the expiration policy to check
- * if the event should be evicted or not.
- *
- * @param fullScan if set, will scan the entire queue; if not set, will stop
- * as soon as an event not satisfying the expiration policy is found
- * @return the list of events to be processed as a part of the current window
- */
- private List> scanEvents(boolean fullScan) {
- log.debug(String.format("Scan events, eviction policy %s", evictionPolicy));
- List> eventsToExpire = new ArrayList<>();
- List> eventsToProcess = new ArrayList<>();
-
- try {
- lock.lock();
- Iterator> it = queue.iterator();
- while (it.hasNext()) {
- Event windowEvent = it.next();
- EvictionPolicy.Action action = evictionPolicy.evict(windowEvent);
- if (action == EXPIRE) {
- eventsToExpire.add(windowEvent);
- it.remove();
- } else if (!fullScan || action == STOP) {
- break;
- } else if (action == PROCESS) {
- eventsToProcess.add(windowEvent);
- }
- }
- expiredEvents.addAll(eventsToExpire);
- } finally {
- lock.unlock();
- }
- eventsSinceLastExpiry.set(0);
- log.debug(String.format("[%d] events expired from window.", eventsToExpire.size()));
- if (!eventsToExpire.isEmpty()) {
- log.debug("invoking windowLifecycleListener.onExpiry");
- windowLifecycleListener.onExpiry(eventsToExpire);
- }
- return eventsToProcess;
- }
-
- /**
- * Scans the event queue and returns the next earliest event ts
- * between the startTs and endTs
- *
- * @param startTs the start ts (exclusive)
- * @param endTs the end ts (inclusive)
- * @return the earliest event ts between startTs and endTs
- */
- public long getEarliestEventTs(long startTs, long endTs) {
- long minTs = Long.MAX_VALUE;
- for (Event event : queue) {
- if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
- minTs = Math.min(minTs, event.getTimestamp());
- }
- }
- return minTs;
- }
-
- /**
- * Scans the event queue and returns number of events having
- * timestamp less than or equal to the reference time.
- *
- * @param referenceTime the reference timestamp in millis
- * @return the count of events with timestamp less than or equal to referenceTime
- */
- public int getEventCount(long referenceTime) {
- int count = 0;
- for (Event event : queue) {
- if (event.getTimestamp() <= referenceTime) {
- ++count;
- }
- }
- return count;
- }
-
- /**
- * Scans the event queue and returns the list of event ts
- * falling between startTs (exclusive) and endTs (inclusive)
- * at each sliding interval counts.
- *
- * @param startTs the start timestamp (exclusive)
- * @param endTs the end timestamp (inclusive)
- * @param slidingCount the sliding interval count
- * @return the list of event ts
- */
- public List getSlidingCountTimestamps(long startTs, long endTs, int slidingCount) {
- List timestamps = new ArrayList<>();
- if (endTs > startTs) {
- int count = 0;
- long ts = Long.MIN_VALUE;
- for (Event event : queue) {
- if (event.getTimestamp() > startTs && event.getTimestamp() <= endTs) {
- ts = Math.max(ts, event.getTimestamp());
- if (++count % slidingCount == 0) {
- timestamps.add(ts);
- }
- }
- }
- }
- return timestamps;
- }
-
- @Override
- public String toString() {
- return "WindowManager{" + "evictionPolicy=" + evictionPolicy + ", triggerPolicy="
- + triggerPolicy + '}';
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/WindowUtils.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/WindowUtils.java
deleted file mode 100644
index 2f82850c01257..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/WindowUtils.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing;
-
-public class WindowUtils {
- public static String getFullyQualifiedName(String tenant, String namespace, String name) {
- return String.format("%s/%s/%s", tenant, namespace, name);
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/WindowedPulsarFunction.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/WindowedPulsarFunction.java
deleted file mode 100644
index b7fcf0094e684..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/WindowedPulsarFunction.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing;
-
-import net.jodah.typetools.TypeResolver;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.api.PulsarFunction;
-import org.apache.pulsar.functions.composition.windowing.evictors.CountEvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.evictors.TimeEvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.evictors.WatermarkCountEvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.evictors.WatermarkTimeEvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.triggers.CountTriggerPolicy;
-import org.apache.pulsar.functions.composition.windowing.triggers.TimeTriggerPolicy;
-import org.apache.pulsar.functions.composition.windowing.triggers.WatermarkCountTriggerPolicy;
-import org.apache.pulsar.functions.composition.windowing.triggers.WatermarkTimeTriggerPolicy;
-
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Collectors;
-
-@Slf4j
-public abstract class WindowedPulsarFunction implements PulsarFunction {
-
- private boolean initialized;
- protected WindowConfig windowConfig;
- private WindowManager windowManager;
- private TimestampExtractor timestampExtractor;
- protected transient WaterMarkEventGenerator waterMarkEventGenerator;
-
- protected static final long DEFAULT_MAX_LAG_MS = 0; // no lag
- protected static final long DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s
-
- public void initialize(Context context) {
- this.windowConfig = this.getWindowConfigs(context);
- log.info("Window Config: {}", this.windowConfig);
- this.windowManager = this.getWindowManager(this.windowConfig, context);
- this.initialized = true;
- this.start();
- }
-
- private WindowConfig getWindowConfigs(Context context) {
- WindowConfig windowConfig = new WindowConfig();
- if (context.getUserConfigValue("windowLengthCount") != null) {
- windowConfig.setWindowLengthCount(Integer.parseInt(context.getUserConfigValue("windowLengthCount")));
- }
- if (context.getUserConfigValue("windowLengthDurationMs") != null) {
- windowConfig.setWindowLengthDurationMs(Long.parseLong(
- context.getUserConfigValue("windowLengthDurationMs")));
- }
- if (context.getUserConfigValue("slidingIntervalCount") != null) {
- windowConfig.setSlidingIntervalCount(Integer.parseInt(context.getUserConfigValue("slidingIntervalCount")));
- }
- if (context.getUserConfigValue("slidingIntervalDurationMs") != null) {
- windowConfig.setSlidingIntervalDurationMs(Long.parseLong(context.getUserConfigValue
- ("slidingIntervalDurationMs")));
- }
- if (context.getUserConfigValue("lateDataTopic") != null) {
- windowConfig.setLateDataTopic(context.getUserConfigValue("lateDataTopic"));
- }
- if (context.getUserConfigValue("maxLagMs") != null) {
- windowConfig.setMaxLagMs(Long.parseLong(context.getUserConfigValue("maxLagMs")));
- }
- if (context.getUserConfigValue("watermarkEmitIntervalMs") != null) {
- windowConfig.setWatermarkEmitIntervalMs(Long.parseLong(
- context.getUserConfigValue("watermarkEmitIntervalMs")));
- }
- if (context.getUserConfigValue("timestampExtractorClassName") != null) {
- windowConfig.setTimestampExtractorClassName(context.getUserConfigValue("timestampExtractorClassName"));
- }
-
- validateAndSetDefaultsWindowConfig(windowConfig);
- return windowConfig;
- }
-
- private static void validateAndSetDefaultsWindowConfig(WindowConfig windowConfig) {
- if (windowConfig.getWindowLengthDurationMs() == null && windowConfig.getWindowLengthCount() == null) {
- throw new IllegalArgumentException("Window length is not specified");
- }
-
- if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getWindowLengthCount() != null) {
- throw new IllegalArgumentException(
- "Window length for time and count are set! Please set one or the other.");
- }
-
- if (windowConfig.getWindowLengthCount() != null) {
- if (windowConfig.getWindowLengthCount() <= 0) {
- throw new IllegalArgumentException(
- "Window length must be positive [" + windowConfig.getWindowLengthCount() + "]");
- }
- }
-
- if (windowConfig.getWindowLengthDurationMs() != null) {
- if (windowConfig.getWindowLengthDurationMs() <= 0) {
- throw new IllegalArgumentException(
- "Window length must be positive [" + windowConfig.getWindowLengthDurationMs() + "]");
- }
- }
-
- if (windowConfig.getSlidingIntervalCount() != null) {
- if (windowConfig.getSlidingIntervalCount() <= 0) {
- throw new IllegalArgumentException(
- "Sliding interval must be positive [" + windowConfig.getSlidingIntervalCount() + "]");
-
- }
- }
-
- if (windowConfig.getSlidingIntervalDurationMs() != null) {
- if (windowConfig.getSlidingIntervalDurationMs() <= 0) {
- throw new IllegalArgumentException(
- "Sliding interval must be positive [" + windowConfig.getSlidingIntervalDurationMs() + "]");
-
- }
- }
-
- if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getSlidingIntervalDurationMs() == null) {
- windowConfig.setSlidingIntervalDurationMs(windowConfig.getWindowLengthDurationMs());
- }
-
- if (windowConfig.getWindowLengthCount() != null && windowConfig.getSlidingIntervalCount() == null) {
- windowConfig.setSlidingIntervalCount(windowConfig.getWindowLengthCount());
- }
-
- if (windowConfig.getTimestampExtractorClassName() != null) {
- if (windowConfig.getMaxLagMs() != null) {
- if (windowConfig.getMaxLagMs() <= 0) {
- throw new IllegalArgumentException(
- "Lag duration must be positive [" + windowConfig.getMaxLagMs() + "]");
- }
- } else {
- windowConfig.setMaxLagMs(DEFAULT_MAX_LAG_MS);
- }
- if (windowConfig.getWatermarkEmitIntervalMs() != null) {
- if (windowConfig.getWatermarkEmitIntervalMs() <= 0) {
- throw new IllegalArgumentException(
- "Watermark interval must be positive [" + windowConfig.getWatermarkEmitIntervalMs() + "]");
- }
- } else {
- windowConfig.setWatermarkEmitIntervalMs(DEFAULT_WATERMARK_EVENT_INTERVAL_MS);
- }
- }
- }
-
- private WindowManager getWindowManager(WindowConfig windowConfig, Context context) {
-
- WindowLifecycleListener> lifecycleListener = newWindowLifecycleListener(context);
- WindowManager manager = new WindowManager<>(lifecycleListener, new ConcurrentLinkedQueue<>());
-
- if (this.windowConfig.getTimestampExtractorClassName() != null) {
- this.timestampExtractor = getTimeStampExtractor(windowConfig);
-
- waterMarkEventGenerator = new WaterMarkEventGenerator<>(manager, this.windowConfig
- .getWatermarkEmitIntervalMs(),
- this.windowConfig.getMaxLagMs(), new HashSet<>(context.getSourceTopics()), context);
- } else {
- if (this.windowConfig.getLateDataTopic() != null) {
- throw new IllegalArgumentException(
- "Late data topic can be defined only when specifying a timestamp extractor class");
- }
- }
-
- EvictionPolicy evictionPolicy = getEvictionPolicy(windowConfig);
- TriggerPolicy triggerPolicy = getTriggerPolicy(windowConfig, manager,
- evictionPolicy, context);
- manager.setEvictionPolicy(evictionPolicy);
- manager.setTriggerPolicy(triggerPolicy);
-
- return manager;
- }
-
- private TimestampExtractor getTimeStampExtractor(WindowConfig windowConfig) {
-
- Class> theCls;
- try {
- theCls = Class.forName(windowConfig.getTimestampExtractorClassName(),
- true, Thread.currentThread().getContextClassLoader());
- } catch (ClassNotFoundException cnfe) {
- throw new RuntimeException(
- String.format("Timestamp extractor class %s must be in class path",
- windowConfig.getTimestampExtractorClassName()), cnfe);
- }
-
- Object result;
- try {
- Constructor> constructor = theCls.getDeclaredConstructor();
- constructor.setAccessible(true);
- result = constructor.newInstance();
- } catch (InstantiationException ie) {
- throw new RuntimeException("User class must be concrete", ie);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException("User class doesn't have such method", e);
- } catch (IllegalAccessException e) {
- throw new RuntimeException("User class must have a no-arg constructor", e);
- } catch (InvocationTargetException e) {
- throw new RuntimeException("User class constructor throws exception", e);
- }
- Class>[] timestampExtractorTypeArgs = TypeResolver.resolveRawArguments(
- TimestampExtractor.class, result.getClass());
- Class>[] typeArgs = TypeResolver.resolveRawArguments(PulsarFunction.class, this.getClass());
- if (!typeArgs[0].equals(timestampExtractorTypeArgs[0])) {
- throw new RuntimeException(
- "Inconsistent types found between function input type and timestamp extractor type: "
- + " function type = " + typeArgs[0] + ", timestamp extractor type = "
- + timestampExtractorTypeArgs[0]);
- }
- return (TimestampExtractor) result;
- }
-
- private TriggerPolicy getTriggerPolicy(WindowConfig windowConfig, WindowManager manager,
- EvictionPolicy evictionPolicy, Context context) {
- if (windowConfig.getSlidingIntervalCount() != null) {
- if (this.isEventTime()) {
- return new WatermarkCountTriggerPolicy<>(
- windowConfig.getSlidingIntervalCount(), manager, evictionPolicy, manager);
- } else {
- return new CountTriggerPolicy<>(windowConfig.getSlidingIntervalCount(), manager, evictionPolicy);
- }
- } else {
- if (this.isEventTime()) {
- return new WatermarkTimeTriggerPolicy<>(windowConfig.getSlidingIntervalDurationMs(), manager,
- evictionPolicy, manager);
- }
- return new TimeTriggerPolicy<>(windowConfig.getSlidingIntervalDurationMs(), manager,
- evictionPolicy, context);
- }
- }
-
- private EvictionPolicy getEvictionPolicy(WindowConfig windowConfig) {
- if (windowConfig.getWindowLengthCount() != null) {
- if (this.isEventTime()) {
- return new WatermarkCountEvictionPolicy<>(windowConfig.getWindowLengthCount());
- } else {
- return new CountEvictionPolicy<>(windowConfig.getWindowLengthCount());
- }
- } else {
- if (this.isEventTime()) {
- return new WatermarkTimeEvictionPolicy<>(
- windowConfig.getWindowLengthDurationMs(), windowConfig.getMaxLagMs());
- } else {
- return new TimeEvictionPolicy<>(windowConfig.getWindowLengthDurationMs());
- }
- }
- }
-
- protected WindowLifecycleListener> newWindowLifecycleListener(Context context) {
- return new WindowLifecycleListener>() {
- @Override
- public void onExpiry(List> events) {
- for (Event event : events) {
- context.ack(event.getMessageId(), event.getTopic());
- }
- }
-
- @Override
- public void onActivation(List> tuples, List> newTuples, List>
- expiredTuples, Long referenceTime) {
- processWindow(
- context,
- tuples.stream().map(event -> event.get()).collect(Collectors.toList()),
- newTuples.stream().map(event -> event.get()).collect(Collectors.toList()),
- expiredTuples.stream().map(event -> event.get()).collect(Collectors.toList()),
- referenceTime);
- }
- };
- }
-
- private void processWindow(Context context, List tuples, List newTuples, List
- expiredTuples, Long referenceTime) {
-
- O output = null;
- try {
- output = this.handleRequest(
- new WindowImpl<>(tuples, newTuples, expiredTuples, getWindowStartTs(referenceTime), referenceTime),
- new WindowContextImpl(context));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- if (output != null) {
- context.publish(context.getSinkTopic(), output, context.getOutputSerdeClassName());
- }
- }
-
- private Long getWindowStartTs(Long endTs) {
- Long res = null;
- if (endTs != null && this.windowConfig.getWindowLengthDurationMs() != null) {
- res = endTs - this.windowConfig.getWindowLengthDurationMs();
- }
- return res;
- }
-
- private void start() {
- if (this.waterMarkEventGenerator != null) {
- log.debug("Starting waterMarkEventGenerator");
- this.waterMarkEventGenerator.start();
- }
-
- log.debug("Starting trigger policy");
- this.windowManager.triggerPolicy.start();
- }
-
- public void shutdown() {
- if (this.waterMarkEventGenerator != null) {
- this.waterMarkEventGenerator.shutdown();
- }
- if (this.windowManager != null) {
- this.windowManager.shutdown();
- }
- }
-
- private boolean isEventTime() {
- return this.timestampExtractor != null;
- }
-
- @Override
- public O process(I input, Context context) throws Exception {
- if (!this.initialized) {
- initialize(context);
- }
- if (isEventTime()) {
- long ts = this.timestampExtractor.extractTimestamp(input);
- if (this.waterMarkEventGenerator.track(context.getTopicName(), ts)) {
- this.windowManager.add(input, ts, context.getMessageId(), context.getTopicName());
- } else {
- if (this.windowConfig.getLateDataTopic() != null) {
- context.publish(this.windowConfig.getLateDataTopic(), input, context.getOutputSerdeClassName());
- } else {
- log.info(String.format(
- "Received a late tuple %s with ts %d. This will not be " + "processed"
- + ".", input, ts));
- }
- context.ack(context.getMessageId(), context.getTopicName());
- }
- } else {
- this.windowManager.add(input, System.currentTimeMillis(), context.getMessageId(), context.getTopicName());
- }
- return null;
- }
-
- public abstract O handleRequest(Window inputWindow, WindowContext context) throws Exception;
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/CountEvictionPolicy.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/CountEvictionPolicy.java
deleted file mode 100644
index d1894ad9428c3..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/CountEvictionPolicy.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing.evictors;
-
-import org.apache.pulsar.functions.composition.windowing.Event;
-import org.apache.pulsar.functions.composition.windowing.EvictionContext;
-import org.apache.pulsar.functions.composition.windowing.EvictionPolicy;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * An eviction policy that tracks event counts and can
- * evict based on a threshold count.
- *
- * @param the type of event tracked by this policy.
- */
-public class CountEvictionPolicy implements EvictionPolicy {
- protected final int threshold;
- protected final AtomicLong currentCount;
- private EvictionContext context;
-
- public CountEvictionPolicy(int count) {
- this.threshold = count;
- this.currentCount = new AtomicLong();
- }
-
- @Override
- public Action evict(Event event) {
- /*
- * atomically decrement the count if its greater than threshold and
- * return if the event should be evicted
- */
- while (true) {
- long curVal = currentCount.get();
- if (curVal > threshold) {
- if (currentCount.compareAndSet(curVal, curVal - 1)) {
- return Action.EXPIRE;
- }
- } else {
- break;
- }
- }
- return Action.PROCESS;
- }
-
- @Override
- public void track(Event event) {
- if (!event.isWatermark()) {
- currentCount.incrementAndGet();
- }
- }
-
- @Override
- public void setContext(EvictionContext context) {
- this.context = context;
- }
-
- @Override
- public EvictionContext getContext() {
- return context;
- }
-
- @Override
- public String toString() {
- return "CountEvictionPolicy{" + "threshold=" + threshold + ", currentCount=" + currentCount
- + '}';
- }
-
- @Override
- public void reset() {
- // NOOP
- }
-
- @Override
- public Long getState() {
- return currentCount.get();
- }
-
- @Override
- public void restoreState(Long state) {
- currentCount.set(state);
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/TimeEvictionPolicy.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/TimeEvictionPolicy.java
deleted file mode 100644
index 85b227ed2e912..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/TimeEvictionPolicy.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing.evictors;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.composition.windowing.Event;
-import org.apache.pulsar.functions.composition.windowing.EvictionContext;
-import org.apache.pulsar.functions.composition.windowing.EvictionPolicy;
-
-/**
- * Eviction policy that evicts events based on time duration.
- */
-@Slf4j
-public class TimeEvictionPolicy implements EvictionPolicy {
-
- private final long windowLength;
- protected volatile EvictionContext evictionContext;
- private long delta;
-
- /**
- * Constructs a TimeEvictionPolicy that evicts events older
- * than the given window length in millis
- *
- * @param windowLength the duration in milliseconds
- */
- public TimeEvictionPolicy(long windowLength) {
- this.windowLength = windowLength;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public EvictionPolicy.Action evict(Event event) {
- long now =
- evictionContext == null ? System.currentTimeMillis() : evictionContext.getReferenceTime();
- long diff = now - event.getTimestamp();
- if (diff >= (windowLength + delta)) {
- return EvictionPolicy.Action.EXPIRE;
- } else if (diff < 0) { // do not process events beyond current ts
- return Action.KEEP;
- }
- return Action.PROCESS;
- }
-
- @Override
- public void track(Event event) {
- // NOOP
- }
-
- @Override
- public void setContext(EvictionContext context) {
- EvictionContext prevContext = evictionContext;
- evictionContext = context;
- // compute window length adjustment (delta) to account for time drift
- if (context.getSlidingInterval() != null) {
- if (prevContext == null) {
- delta = Integer.MAX_VALUE; // consider all events for the initial window
- } else {
- delta = context.getReferenceTime() - prevContext.getReferenceTime()
- - context.getSlidingInterval();
- if (Math.abs(delta) > 100) {
- log.warn(String.format("Possible clock drift or long running computation in window; "
- + "Previous eviction time: %s, current eviction time: %s", prevContext
- .getReferenceTime(), context.getReferenceTime()));
- }
- }
- }
- }
-
- @Override
- public EvictionContext getContext() {
- return evictionContext;
- }
-
- @Override
- public void reset() {
- // NOOP
- }
-
- @Override
- public EvictionContext getState() {
- return evictionContext;
- }
-
- @Override
- public void restoreState(EvictionContext state) {
- this.evictionContext = state;
- }
-
- @Override
- public String toString() {
- return "TimeEvictionPolicy{" + "windowLength=" + windowLength + ", evictionContext="
- + evictionContext + '}';
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/WatermarkCountEvictionPolicy.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/WatermarkCountEvictionPolicy.java
deleted file mode 100644
index 7dcc097e4b08d..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/WatermarkCountEvictionPolicy.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing.evictors;
-
-import org.apache.pulsar.functions.composition.windowing.Event;
-import org.apache.pulsar.functions.composition.windowing.EvictionContext;
-import org.apache.pulsar.functions.composition.windowing.EvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.Pair;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * An eviction policy that tracks count based on watermark ts and
- * evicts events up to the watermark based on a threshold count.
- *
- * @param the type of event tracked by this policy.
- */
-public class WatermarkCountEvictionPolicy
- implements EvictionPolicy> {
- protected final int threshold;
- protected final AtomicLong currentCount;
- private EvictionContext context;
-
- private volatile long processed;
-
- public WatermarkCountEvictionPolicy(int count) {
- threshold = count;
- currentCount = new AtomicLong();
- }
-
- public EvictionPolicy.Action evict(Event event) {
- if (getContext() == null) {
- //It is possible to get asked about eviction before we have a context, due to WindowManager
- // .compactWindow.
- //In this case we should hold on to all the events. When the first watermark is received,
- // the context will be set,
- //and the events will be reevaluated for eviction
- return Action.STOP;
- }
-
- Action action;
- if (event.getTimestamp() <= getContext().getReferenceTime() && processed < currentCount.get()) {
- action = doEvict(event);
- if (action == Action.PROCESS) {
- ++processed;
- }
- } else {
- action = Action.KEEP;
- }
- return action;
- }
-
- private Action doEvict(Event event) {
- /*
- * atomically decrement the count if its greater than threshold and
- * return if the event should be evicted
- */
- while (true) {
- long curVal = currentCount.get();
- if (curVal > threshold) {
- if (currentCount.compareAndSet(curVal, curVal - 1)) {
- return Action.EXPIRE;
- }
- } else {
- break;
- }
- }
- return Action.PROCESS;
- }
-
- @Override
- public void track(Event event) {
- // NOOP
- }
-
- @Override
- public EvictionContext getContext() {
- return context;
- }
-
- @Override
- public void setContext(EvictionContext context) {
- this.context = context;
- if (context.getCurrentCount() != null) {
- currentCount.set(context.getCurrentCount());
- } else {
- currentCount.set(processed + context.getSlidingCount());
- }
- processed = 0;
- }
-
- @Override
- public void reset() {
- processed = 0;
- }
-
- @Override
- public Pair getState() {
- return Pair.of(currentCount.get(), processed);
- }
-
- @Override
- public void restoreState(Pair state) {
- currentCount.set(state.getFirst());
- processed = state.getSecond();
- }
-
- @Override
- public String toString() {
- return "WatermarkCountEvictionPolicy{" + "} " + super.toString();
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/WatermarkTimeEvictionPolicy.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/WatermarkTimeEvictionPolicy.java
deleted file mode 100644
index af9d2c032813f..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/evictors/WatermarkTimeEvictionPolicy.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing.evictors;
-
-import org.apache.pulsar.functions.composition.windowing.Event;
-
-/**
- * An eviction policy that evicts events based on time duration taking
- * watermark time and event lag into account.
- */
-public class WatermarkTimeEvictionPolicy extends TimeEvictionPolicy {
- private final long lag;
-
- /**
- * Constructs a WatermarkTimeEvictionPolicy that evicts events older
- * than the given window length in millis.
- *
- * @param windowLength the window length in milliseconds
- */
- public WatermarkTimeEvictionPolicy(long windowLength) {
- this(windowLength, Long.MAX_VALUE);
- }
-
- /**
- * Constructs a WatermarkTimeEvictionPolicy that evicts events older
- * than the given window length in millis. The lag parameter
- * can be used in the case of event based ts to break the queue
- * scan early.
- *
- * @param windowLength the window length in milliseconds
- * @param lag the max event lag in milliseconds
- */
- public WatermarkTimeEvictionPolicy(long windowLength, long lag) {
- super(windowLength);
- this.lag = lag;
- }
-
- /**
- * {@inheritDoc}
- *
- * Keeps events with future ts in the queue for processing in the next
- * window. If the ts difference is more than the lag, stops scanning
- * the queue for the current window.
- */
- @Override
- public Action evict(Event event) {
- if (evictionContext == null) {
- //It is possible to get asked about eviction before we have a context, due to WindowManager
- // .compactWindow.
- //In this case we should hold on to all the events. When the first watermark is received,
- // the context will be set,
- //and the events will be reevaluated for eviction
- return Action.STOP;
- }
-
- long referenceTime = evictionContext.getReferenceTime();
- long diff = referenceTime - event.getTimestamp();
- if (diff < -lag) {
- return Action.STOP;
- } else if (diff < 0) {
- return Action.KEEP;
- } else {
- return super.evict(event);
- }
- }
-
- @Override
- public String toString() {
- return "WatermarkTimeEvictionPolicy{" + "lag=" + lag + "} " + super.toString();
- }
-
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/CountTriggerPolicy.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/CountTriggerPolicy.java
deleted file mode 100644
index 5c46069c8a955..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/CountTriggerPolicy.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing.triggers;
-
-import org.apache.pulsar.functions.composition.windowing.DefaultEvictionContext;
-import org.apache.pulsar.functions.composition.windowing.Event;
-import org.apache.pulsar.functions.composition.windowing.EvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.TriggerHandler;
-import org.apache.pulsar.functions.composition.windowing.TriggerPolicy;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * A trigger that tracks event counts and calls back {@link TriggerHandler#onTrigger()}
- * when the count threshold is hit.
- *
- * @param the type of event tracked by this policy.
- */
-public class CountTriggerPolicy implements TriggerPolicy {
- private final int count;
- private final AtomicInteger currentCount;
- private final TriggerHandler handler;
- private final EvictionPolicy evictionPolicy;
- private boolean started;
-
- public CountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy
- evictionPolicy) {
- this.count = count;
- this.currentCount = new AtomicInteger();
- this.handler = handler;
- this.evictionPolicy = evictionPolicy;
- this.started = false;
- }
-
- @Override
- public void track(Event event) {
- if (started && !event.isWatermark()) {
- if (currentCount.incrementAndGet() >= count) {
- evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
- handler.onTrigger();
- }
- }
- }
-
- @Override
- public void reset() {
- currentCount.set(0);
- }
-
- @Override
- public void start() {
- started = true;
- }
-
- @Override
- public void shutdown() {
- // NOOP
- }
-
- @Override
- public Integer getState() {
- return currentCount.get();
- }
-
- @Override
- public void restoreState(Integer state) {
- currentCount.set(state);
- }
-
- @Override
- public String toString() {
- return "CountTriggerPolicy{" + "count=" + count + ", currentCount=" + currentCount
- + ", started=" + started + '}';
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/TimeTriggerPolicy.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/TimeTriggerPolicy.java
deleted file mode 100644
index ee97d9f9e1716..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/TimeTriggerPolicy.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing.triggers;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.logging.log4j.ThreadContext;
-import org.apache.pulsar.functions.api.Context;
-import org.apache.pulsar.functions.composition.windowing.DefaultEvictionContext;
-import org.apache.pulsar.functions.composition.windowing.Event;
-import org.apache.pulsar.functions.composition.windowing.EvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.TriggerHandler;
-import org.apache.pulsar.functions.composition.windowing.TriggerPolicy;
-import org.apache.pulsar.functions.composition.windowing.WindowUtils;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-
-/**
- * Invokes {@link TriggerHandler#onTrigger()} after the duration.
- */
-
-@Slf4j
-public class TimeTriggerPolicy implements TriggerPolicy {
-
- private long duration;
- private final TriggerHandler handler;
- private final EvictionPolicy evictionPolicy;
- private ScheduledFuture> executorFuture;
- private final ScheduledExecutorService executor;
- private Context context;
-
- public TimeTriggerPolicy(long millis, TriggerHandler handler, EvictionPolicy
- evictionPolicy, Context context) {
- this.duration = millis;
- this.handler = handler;
- this.evictionPolicy = evictionPolicy;
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setNameFormat("time-trigger-policy-%d")
- .setDaemon(true)
- .build();
- this.executor = Executors.newSingleThreadScheduledExecutor(threadFactory);
- this.context = context;
- }
-
- @Override
- public void track(Event event) {
- checkFailures();
- }
-
- @Override
- public void reset() {
-
- }
-
- @Override
- public void start() {
- executorFuture = executor.scheduleAtFixedRate(newTriggerTask(), duration, duration, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void shutdown() {
- executor.shutdown();
- try {
- if (!executor.awaitTermination(2, TimeUnit.SECONDS)) {
- executor.shutdownNow();
- }
- } catch (InterruptedException ie) {
- executor.shutdownNow();
- Thread.currentThread().interrupt();
- }
- }
-
- @Override
- public String toString() {
- return "TimeTriggerPolicy{" + "duration=" + duration + '}';
- }
-
- private Runnable newTriggerTask() {
- return new Runnable() {
- @Override
- public void run() {
- // initialize the thread context
- ThreadContext.put("function", WindowUtils.getFullyQualifiedName(
- context.getTenant(), context.getNamespace(), context.getFunctionName()));
- // do not process current timestamp since tuples might arrive while the trigger is executing
- long now = System.currentTimeMillis() - 1;
- try {
- /*
- * set the current timestamp as the reference time for the eviction policy
- * to evict the events
- */
- evictionPolicy.setContext(new DefaultEvictionContext(now, null, null, duration));
- handler.onTrigger();
- } catch (Throwable th) {
- log.error("handler.onTrigger failed ", th);
- /*
- * propagate it so that task gets canceled and the exception
- * can be retrieved from executorFuture.get()
- */
- throw th;
- }
- }
- };
- }
-
- private void checkFailures() {
- if (executorFuture != null && executorFuture.isDone()) {
- try {
- executorFuture.get();
- } catch (InterruptedException | ExecutionException e) {
- log.error("Got exception in timer trigger policy ", e);
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public Void getState() {
- return null;
- }
-
- @Override
- public void restoreState(Void state) {
-
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/WatermarkCountTriggerPolicy.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/WatermarkCountTriggerPolicy.java
deleted file mode 100644
index 2e390f8221d07..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/WatermarkCountTriggerPolicy.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing.triggers;
-
-import org.apache.pulsar.functions.composition.windowing.DefaultEvictionContext;
-import org.apache.pulsar.functions.composition.windowing.Event;
-import org.apache.pulsar.functions.composition.windowing.EvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.TriggerHandler;
-import org.apache.pulsar.functions.composition.windowing.TriggerPolicy;
-import org.apache.pulsar.functions.composition.windowing.WindowManager;
-
-import java.util.List;
-
-/**
- * A trigger policy that tracks event counts and sets the context for
- * eviction policy to evict based on latest watermark time.
- *
- * @param the type of event tracked by this policy.
- */
-public class WatermarkCountTriggerPolicy implements TriggerPolicy {
- private final int count;
- private final TriggerHandler handler;
- private final EvictionPolicy evictionPolicy;
- private final WindowManager windowManager;
- private volatile long lastProcessedTs;
- private boolean started;
-
- public WatermarkCountTriggerPolicy(int count, TriggerHandler handler, EvictionPolicy
- evictionPolicy, WindowManager windowManager) {
- this.count = count;
- this.handler = handler;
- this.evictionPolicy = evictionPolicy;
- this.windowManager = windowManager;
- this.started = false;
- }
-
- @Override
- public void track(Event event) {
- if (started && event.isWatermark()) {
- handleWaterMarkEvent(event);
- }
- }
-
- @Override
- public void reset() {
- // NOOP
- }
-
- @Override
- public void start() {
- started = true;
- }
-
- @Override
- public void shutdown() {
- // NOOP
- }
-
- /**
- * Triggers all the pending windows up to the waterMarkEvent timestamp
- * based on the sliding interval count.
- *
- * @param waterMarkEvent the watermark event
- */
- private void handleWaterMarkEvent(Event waterMarkEvent) {
- long watermarkTs = waterMarkEvent.getTimestamp();
- List eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs,
- count);
- for (long ts : eventTs) {
- evictionPolicy.setContext(new DefaultEvictionContext(ts, null, Long.valueOf(count)));
- handler.onTrigger();
- lastProcessedTs = ts;
- }
- }
-
- @Override
- public Long getState() {
- return lastProcessedTs;
- }
-
- @Override
- public void restoreState(Long state) {
- lastProcessedTs = state;
- }
-
- @Override
- public String toString() {
- return "WatermarkCountTriggerPolicy{" + "count=" + count + ", lastProcessedTs="
- + lastProcessedTs + ", started=" + started + '}';
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/WatermarkTimeTriggerPolicy.java b/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/WatermarkTimeTriggerPolicy.java
deleted file mode 100644
index 370688ff66f2c..0000000000000
--- a/pulsar-functions/api-composition-java/src/main/java/org/apache/pulsar/functions/composition/windowing/triggers/WatermarkTimeTriggerPolicy.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.composition.windowing.triggers;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.composition.windowing.DefaultEvictionContext;
-import org.apache.pulsar.functions.composition.windowing.Event;
-import org.apache.pulsar.functions.composition.windowing.EvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.TriggerHandler;
-import org.apache.pulsar.functions.composition.windowing.TriggerPolicy;
-import org.apache.pulsar.functions.composition.windowing.WindowManager;
-
-/**
- * Handles watermark events and triggers {@link TriggerHandler#onTrigger()} for each window
- * interval that has events to be processed up to the watermark ts.
- */
-@Slf4j
-public class WatermarkTimeTriggerPolicy implements TriggerPolicy {
- private final long slidingIntervalMs;
- private final TriggerHandler handler;
- private final EvictionPolicy evictionPolicy;
- private final WindowManager windowManager;
- private volatile long nextWindowEndTs;
- private boolean started;
-
- public WatermarkTimeTriggerPolicy(long slidingIntervalMs, TriggerHandler handler,
- EvictionPolicy evictionPolicy, WindowManager
- windowManager) {
- this.slidingIntervalMs = slidingIntervalMs;
- this.handler = handler;
- this.evictionPolicy = evictionPolicy;
- this.windowManager = windowManager;
- this.started = false;
- }
-
- @Override
- public void track(Event event) {
- if (started && event.isWatermark()) {
- handleWaterMarkEvent(event);
- }
- }
-
- @Override
- public void reset() {
- // NOOP
- }
-
- @Override
- public void start() {
- started = true;
- }
-
- @Override
- public void shutdown() {
- // NOOP
- }
-
- /**
- * Invokes the trigger all pending windows up to the
- * watermark timestamp. The end ts of the window is set
- * in the eviction policy context so that the events falling
- * within that window can be processed.
- */
- private void handleWaterMarkEvent(Event event) {
- long watermarkTs = event.getTimestamp();
- long windowEndTs = nextWindowEndTs;
- log.debug(String.format("Window end ts %d Watermark ts %d", windowEndTs, watermarkTs));
- while (windowEndTs <= watermarkTs) {
- long currentCount = windowManager.getEventCount(windowEndTs);
- evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
- if (handler.onTrigger()) {
- windowEndTs += slidingIntervalMs;
- } else {
- /*
- * No events were found in the previous window interval.
- * Scan through the events in the queue to find the next
- * window intervals based on event ts.
- */
- long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
- log.debug(String.format("Next aligned window end ts %d", ts));
- if (ts == Long.MAX_VALUE) {
- log.debug(String.format("No events to process between %d and watermark ts %d",
- windowEndTs, watermarkTs));
- break;
- }
- windowEndTs = ts;
- }
- }
- nextWindowEndTs = windowEndTs;
- }
-
- /**
- * Computes the next window by scanning the events in the window and
- * finds the next aligned window between the startTs and endTs. Return the end ts
- * of the next aligned window, i.e. the ts when the window should fire.
- *
- * @param startTs the start timestamp (excluding)
- * @param endTs the end timestamp (including)
- * @return the aligned window end ts for the next window or Long.MAX_VALUE if there
- * are no more events to be processed.
- */
- private long getNextAlignedWindowTs(long startTs, long endTs) {
- long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
- if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
- return nextTs;
- }
- return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
- }
-
- @Override
- public Long getState() {
- return nextWindowEndTs;
- }
-
- @Override
- public void restoreState(Long state) {
- nextWindowEndTs = state;
- }
-
- @Override
- public String toString() {
- return "WatermarkTimeTriggerPolicy{" + "slidingIntervalMs=" + slidingIntervalMs
- + ", nextWindowEndTs=" + nextWindowEndTs + ", started=" + started + '}';
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/test/java/org/apache/pulsar/functions/composition/windowing/WaterMarkEventGeneratorTest.java b/pulsar-functions/api-composition-java/src/test/java/org/apache/pulsar/functions/composition/windowing/WaterMarkEventGeneratorTest.java
deleted file mode 100644
index c58f7855dcef4..0000000000000
--- a/pulsar-functions/api-composition-java/src/test/java/org/apache/pulsar/functions/composition/windowing/WaterMarkEventGeneratorTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.functions.composition.windowing;
-
-import org.apache.pulsar.functions.api.Context;
-import org.mockito.Mockito;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertTrue;
-
-/**
- * Unit tests for {@link WaterMarkEventGenerator}
- */
-public class WaterMarkEventGeneratorTest {
- private WaterMarkEventGenerator waterMarkEventGenerator;
- private WindowManager windowManager;
- private List> eventList = new ArrayList<>();
- private Context context;
-
- @BeforeMethod
- public void setUp() {
- windowManager = new WindowManager(null, new LinkedList<>()) {
- @Override
- public void add(Event event) {
- eventList.add(event);
- }
- };
-
- context = Mockito.mock(Context.class);
- Mockito.doReturn("test-function").when(context).getFunctionName();
- Mockito.doReturn("test-namespace").when(context).getNamespace();
- Mockito.doReturn("test-tenant").when(context).getTenant();
- // set watermark interval to a high value and trigger manually to fix timing issues
- waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5L, 5, Collections
- .singleton("s1"), context);
-// waterMarkEventGenerator.start();
- }
-
- @AfterMethod
- public void tearDown() {
-// waterMarkEventGenerator.shutdown();
- eventList.clear();
- }
-
- @Test
- public void testTrackSingleStream() throws Exception {
- waterMarkEventGenerator.track("s1", 100);
- waterMarkEventGenerator.track("s1", 110);
- waterMarkEventGenerator.run();
- assertTrue(eventList.get(0).isWatermark());
- assertEquals(105, eventList.get(0).getTimestamp());
- }
-
- @Test
- public void testTrackSingleStreamOutOfOrder() throws Exception {
- waterMarkEventGenerator.track("s1", 100);
- waterMarkEventGenerator.track("s1", 110);
- waterMarkEventGenerator.track("s1", 104);
- waterMarkEventGenerator.run();
- assertTrue(eventList.get(0).isWatermark());
- assertEquals(105, eventList.get(0).getTimestamp());
- }
-
- @Test
- public void testTrackTwoStreams() throws Exception {
- Set streams = new HashSet<>();
- streams.add("s1");
- streams.add("s2");
- waterMarkEventGenerator = new WaterMarkEventGenerator<>(windowManager, 5L,
- 5, streams, context);
- waterMarkEventGenerator.start();
-
- waterMarkEventGenerator.track("s1", 100);
- waterMarkEventGenerator.track("s1", 110);
- waterMarkEventGenerator.run();
- assertTrue(eventList.isEmpty());
- waterMarkEventGenerator.track("s2", 95);
- waterMarkEventGenerator.track("s2", 98);
- waterMarkEventGenerator.run();
- assertTrue(eventList.get(0).isWatermark());
- assertEquals(93, eventList.get(0).getTimestamp());
- }
-
- @Test
- public void testNoEvents() throws Exception {
- waterMarkEventGenerator.run();
- assertTrue(eventList.isEmpty());
- }
-
- @Test
- public void testLateEvent() throws Exception {
- assertTrue(waterMarkEventGenerator.track("s1", 100));
- assertTrue(waterMarkEventGenerator.track("s1", 110));
- waterMarkEventGenerator.run();
- assertTrue(eventList.get(0).isWatermark());
- assertEquals(105, eventList.get(0).getTimestamp());
- eventList.clear();
- assertTrue(waterMarkEventGenerator.track("s1", 105));
- assertTrue(waterMarkEventGenerator.track("s1", 106));
- assertTrue(waterMarkEventGenerator.track("s1", 115));
- assertFalse(waterMarkEventGenerator.track("s1", 104));
- waterMarkEventGenerator.run();
- assertTrue(eventList.get(0).isWatermark());
- assertEquals(110, eventList.get(0).getTimestamp());
- }
-}
diff --git a/pulsar-functions/api-composition-java/src/test/java/org/apache/pulsar/functions/composition/windowing/WindowManagerTest.java b/pulsar-functions/api-composition-java/src/test/java/org/apache/pulsar/functions/composition/windowing/WindowManagerTest.java
deleted file mode 100644
index d52e8784993d4..0000000000000
--- a/pulsar-functions/api-composition-java/src/test/java/org/apache/pulsar/functions/composition/windowing/WindowManagerTest.java
+++ /dev/null
@@ -1,839 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.functions.composition.windowing;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.functions.composition.windowing.evictors.CountEvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.evictors.TimeEvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.evictors.WatermarkCountEvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.evictors.WatermarkTimeEvictionPolicy;
-import org.apache.pulsar.functions.composition.windowing.triggers.CountTriggerPolicy;
-import org.apache.pulsar.functions.composition.windowing.triggers.TimeTriggerPolicy;
-import org.apache.pulsar.functions.composition.windowing.triggers.WatermarkCountTriggerPolicy;
-import org.apache.pulsar.functions.composition.windowing.triggers.WatermarkTimeTriggerPolicy;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-
-
-/**
- * Unit tests for {@link WindowManager}
- */
-@Slf4j
-public class WindowManagerTest {
- private WindowManager windowManager;
- private Listener listener;
-
- private static final long TIMESTAMP = 1516776194873L;
- private static final String TOPIC = "test-topic";
-
- private static class Listener implements WindowLifecycleListener> {
- private List> onExpiryEvents = Collections.emptyList();
- private List> onActivationEvents = Collections.emptyList();
- private List> onActivationNewEvents = Collections.emptyList();
- private List> onActivationExpiredEvents = Collections.emptyList();
-
- // all events since last clear
- private List>> allOnExpiryEvents = new ArrayList<>();
- private List>> allOnActivationEvents = new ArrayList<>();
- private List>> allOnActivationNewEvents = new ArrayList<>();
- private List>> allOnActivationExpiredEvents = new ArrayList<>();
-
- @Override
- public void onExpiry(List> events) {
- onExpiryEvents = events;
- allOnExpiryEvents.add(events);
- }
-
- @Override
- public void onActivation(List> events, List> newEvents, List>
- expired, Long timestamp) {
- onActivationEvents = events;
- allOnActivationEvents.add(events);
- onActivationNewEvents = newEvents;
- allOnActivationNewEvents.add(newEvents);
- onActivationExpiredEvents = expired;
- allOnActivationExpiredEvents.add(expired);
- }
-
- void clear() {
- onExpiryEvents = Collections.emptyList();
- onActivationEvents = Collections.emptyList();
- onActivationNewEvents = Collections.emptyList();
- onActivationExpiredEvents = Collections.emptyList();
-
- allOnExpiryEvents.clear();
- allOnActivationEvents.clear();
- allOnActivationNewEvents.clear();
- allOnActivationExpiredEvents.clear();
- }
- }
-
- @BeforeMethod
- public void setUp() {
- listener = new Listener();
- windowManager = new WindowManager<>(listener, new LinkedList<>());
- }
-
- @AfterMethod
- public void tearDown() {
- windowManager.shutdown();
- }
-
- @Test
- public void testCountBasedWindow() throws Exception {
- EvictionPolicy evictionPolicy = new CountEvictionPolicy(5);
- TriggerPolicy triggerPolicy = new CountTriggerPolicy(2, windowManager, evictionPolicy);
- triggerPolicy.start();
- windowManager.setEvictionPolicy(evictionPolicy);
- windowManager.setTriggerPolicy(triggerPolicy);
- windowManager.add(new EventImpl<>(1, TIMESTAMP, null, TOPIC));
- windowManager.add(new EventImpl<>(2, TIMESTAMP, null, TOPIC));
- // nothing expired yet
- assertTrue(listener.onExpiryEvents.isEmpty());
- assertEquals(seq(1, 2), listener.onActivationEvents);
- assertEquals(seq(1, 2), listener.onActivationNewEvents);
- assertTrue(listener.onActivationExpiredEvents.isEmpty());
- windowManager.add(new EventImpl<>(3, TIMESTAMP, null, TOPIC));
- windowManager.add(new EventImpl<>(4, TIMESTAMP, null, TOPIC));
- // nothing expired yet
- assertTrue(listener.onExpiryEvents.isEmpty());
- assertEquals(seq(1, 4), listener.onActivationEvents);
- assertEquals(seq(3, 4), listener.onActivationNewEvents);
- assertTrue(listener.onActivationExpiredEvents.isEmpty());
- windowManager.add(new EventImpl<>(5, TIMESTAMP, null, TOPIC));
- windowManager.add(new EventImpl<>(6, TIMESTAMP, null, TOPIC));
- // 1 expired
- assertEquals(seq(1), listener.onExpiryEvents);
- assertEquals(seq(2, 6), listener.onActivationEvents);
- assertEquals(seq(5, 6), listener.onActivationNewEvents);
- assertEquals(seq(1), listener.onActivationExpiredEvents);
- listener.clear();
- windowManager.add(new EventImpl<>(7, TIMESTAMP, null, TOPIC));
- // nothing expires until threshold is hit
- assertTrue(listener.onExpiryEvents.isEmpty());
- windowManager.add(new EventImpl<>(8, TIMESTAMP, null, TOPIC));
- // 1 expired
- assertEquals(seq(2, 3), listener.onExpiryEvents);
- assertEquals(seq(4, 8), listener.onActivationEvents);
- assertEquals(seq(7, 8), listener.onActivationNewEvents);
- assertEquals(seq(2, 3), listener.onActivationExpiredEvents);
- }
-
- @Test
- public void testExpireThreshold() throws Exception {
- int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD;
- int windowLength = 5;
- CountEvictionPolicy countEvictionPolicy = new CountEvictionPolicy(5);
- windowManager.setEvictionPolicy(countEvictionPolicy);
- TriggerPolicy triggerPolicy = new TimeTriggerPolicy(Duration.ofHours(1)
- .toMillis(), windowManager, countEvictionPolicy, null);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
- for (Event i : seq(1, 5)) {
- windowManager.add(i);
- }
- // nothing expired yet
- assertTrue(listener.onExpiryEvents.isEmpty());
- for (Event i : seq(6, 10)) {
- windowManager.add(i);
- }
- for (Event i : seq(11, threshold)) {
- windowManager.add(i);
- }
- // window should be compacted and events should be expired.
- assertEquals(seq(1, threshold - windowLength), listener.onExpiryEvents);
- }
-
- private void testEvictBeforeWatermarkForWatermarkEvictionPolicy(EvictionPolicy
- watermarkEvictionPolicy,
- int windowLength) throws
- Exception {
- /**
- * The watermark eviction policy must not evict tuples until the first watermark has been
- * received.
- * The policies can't make a meaningful decision prior to the first watermark, so the safe
- * decision
- * is to postpone eviction.
- */
- int threshold = WindowManager.EXPIRE_EVENTS_THRESHOLD;
- windowManager.setEvictionPolicy(watermarkEvictionPolicy);
- WatermarkCountTriggerPolicy triggerPolicy = new WatermarkCountTriggerPolicy(windowLength, windowManager,
- watermarkEvictionPolicy, windowManager);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
- for (Event i : seqThreshold(1, threshold)) {
- windowManager.add(i);
- }
- assertTrue(listener.onExpiryEvents.isEmpty(), "The watermark eviction policies should never evict events " +
- "before the first "
- + "watermark is received");
- windowManager.add(new WaterMarkEvent<>(threshold));
- // The events should be put in a window when the first watermark is received
- assertEquals(seqThreshold(1, threshold), listener.onActivationEvents);
- //Now add some more events and a new watermark, and check that the previous events are expired
- for (Event i : seqThreshold(threshold + 1, threshold * 2)) {
- windowManager.add(i);
- }
- windowManager.add(new WaterMarkEvent<>(threshold + windowLength + 1));
- //All the events should be expired when the next watermark is received
- assertEquals(listener
- .onExpiryEvents, seqThreshold(1, threshold), "All the events should be expired after the second " +
- "watermark");
- }
-
- @Test
- @SuppressWarnings("rawtypes")
- public void testExpireThresholdWithWatermarkCountEvictionPolicy() throws Exception {
- int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD;
- EvictionPolicy watermarkCountEvictionPolicy = new WatermarkCountEvictionPolicy(windowLength);
- testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkCountEvictionPolicy, windowLength);
- }
-
- @Test
- @SuppressWarnings("rawtypes")
- public void testExpireThresholdWithWatermarkTimeEvictionPolicy() throws Exception {
- int windowLength = WindowManager.EXPIRE_EVENTS_THRESHOLD;
- EvictionPolicy watermarkTimeEvictionPolicy = new WatermarkTimeEvictionPolicy(windowLength);
- testEvictBeforeWatermarkForWatermarkEvictionPolicy(watermarkTimeEvictionPolicy, windowLength);
- }
-
- @Test
- public void testTimeBasedWindow() throws Exception {
- EvictionPolicy evictionPolicy = new TimeEvictionPolicy(Duration
- .ofSeconds(1).toMillis());
- windowManager.setEvictionPolicy(evictionPolicy);
- /*
- * Don't wait for Timetrigger to fire since this could lead to timing issues in unit tests.
- * Set it to a large value and trigger manually.
- */
- TriggerPolicy triggerPolicy = new TimeTriggerPolicy(Duration.ofDays(1)
- .toMillis(), windowManager, evictionPolicy, null);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
- long now = System.currentTimeMillis();
-
- // add with past ts
- for (Event i : seq(1, 50, now - 1000)) {
- windowManager.add(i);
- }
-
- // add with current ts
- for (Event i : seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD, now)) {
- windowManager.add(i);
- }
- // first 50 should have expired due to expire events threshold
- assertEquals(50, listener.onExpiryEvents.size());
-
- // add more events with past ts
- for (Event i : seq(
- WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100, now - 1000)) {
- windowManager.add(i);
- }
- // simulate the time trigger by setting the reference time and invoking onTrigger() manually
- evictionPolicy.setContext(new DefaultEvictionContext(now + 100));
- windowManager.onTrigger();
-
- // 100 events with past ts should expire
- assertEquals(100, listener.onExpiryEvents.size());
- assertEquals(seq(
- WindowManager.EXPIRE_EVENTS_THRESHOLD + 1,
- WindowManager.EXPIRE_EVENTS_THRESHOLD + 100, now - 1000), listener.onExpiryEvents);
- List> activationsEvents = seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD, now);
- assertEquals(seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD, now), listener.onActivationEvents);
- assertEquals(seq(51, WindowManager.EXPIRE_EVENTS_THRESHOLD, now), listener.onActivationNewEvents);
- // activation expired list should contain even the ones expired due to EXPIRE_EVENTS_THRESHOLD
- List> expiredList = seq(1, 50, now - 1000);
- expiredList.addAll(seq(
- WindowManager.EXPIRE_EVENTS_THRESHOLD + 1, WindowManager.EXPIRE_EVENTS_THRESHOLD + 100, now - 1000));
- assertEquals(expiredList, listener.onActivationExpiredEvents);
-
- listener.clear();
- // add more events with current ts
- List> newEvents = seq(
- WindowManager.EXPIRE_EVENTS_THRESHOLD + 101, WindowManager.EXPIRE_EVENTS_THRESHOLD + 200, now);
- for (Event i : newEvents) {
- windowManager.add(i);
- }
- activationsEvents.addAll(newEvents);
- // simulate the time trigger by setting the reference time and invoking onTrigger() manually
- evictionPolicy.setContext(new DefaultEvictionContext(now + 200));
- windowManager.onTrigger();
- assertTrue(listener.onExpiryEvents.isEmpty());
- assertEquals(activationsEvents, listener.onActivationEvents);
- assertEquals(newEvents, listener.onActivationNewEvents);
-
- }
-
-
- @Test
- public void testTimeBasedWindowExpiry() throws Exception {
- EvictionPolicy evictionPolicy =
- new TimeEvictionPolicy(Duration.ofMillis(100).toMillis());
- windowManager.setEvictionPolicy(evictionPolicy);
- /*
- * Don't wait for Timetrigger to fire since this could lead to timing issues in unit tests.
- * Set it to a large value and trigger manually.
- */
- TriggerPolicy triggerPolicy = new TimeTriggerPolicy(Duration.ofDays(1)
- .toMillis(), windowManager, evictionPolicy, null);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
- long now = TIMESTAMP;
- // add 10 events
- for (Event i : seq(1, 10)) {
- windowManager.add(i);
- }
- // simulate the time trigger by setting the reference time and invoking onTrigger() manually
- evictionPolicy.setContext(new DefaultEvictionContext(now + 60));
- windowManager.onTrigger();
-
- assertEquals(seq(1, 10), listener.onActivationEvents);
- assertTrue(listener.onActivationExpiredEvents.isEmpty());
- listener.clear();
- // wait so all events expire
- evictionPolicy.setContext(new DefaultEvictionContext(now + 120));
- windowManager.onTrigger();
-
- assertEquals(seq(1, 10), listener.onExpiryEvents);
- assertTrue(listener.onActivationEvents.isEmpty());
- listener.clear();
- evictionPolicy.setContext(new DefaultEvictionContext(now + 180));
- windowManager.onTrigger();
- assertTrue(listener.onActivationExpiredEvents.isEmpty());
- assertTrue(listener.onActivationEvents.isEmpty());
-
- }
-
- @Test
- public void testTumblingWindow() throws Exception {
- EvictionPolicy evictionPolicy = new CountEvictionPolicy(3);
- windowManager.setEvictionPolicy(evictionPolicy);
- TriggerPolicy triggerPolicy = new CountTriggerPolicy(3, windowManager, evictionPolicy);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
- windowManager.add(new EventImpl<>(1, TIMESTAMP, null, TOPIC));
- windowManager.add(new EventImpl<>(2, TIMESTAMP, null, TOPIC));
- // nothing expired yet
- assertTrue(listener.onExpiryEvents.isEmpty());
- windowManager.add(new EventImpl<>(3, TIMESTAMP, null, TOPIC));
- assertTrue(listener.onExpiryEvents.isEmpty());
- assertEquals(seq(1, 3), listener.onActivationEvents);
- assertTrue(listener.onActivationExpiredEvents.isEmpty());
- assertEquals(seq(1, 3), listener.onActivationNewEvents);
-
- listener.clear();
- windowManager.add(new EventImpl<>(4, TIMESTAMP, null, TOPIC));
- windowManager.add(new EventImpl<>(5, TIMESTAMP, null, TOPIC));
- windowManager.add(new EventImpl<>(6, TIMESTAMP, null, TOPIC));
-
- assertEquals(seq(1, 3), listener.onExpiryEvents);
- assertEquals(seq(4, 6), listener.onActivationEvents);
- assertEquals(seq(1, 3), listener.onActivationExpiredEvents);
- assertEquals(seq(4, 6), listener.onActivationNewEvents);
-
- }
-
-
- @Test
- public void testEventTimeBasedWindow() throws Exception {
- EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy<>(20);
- windowManager.setEvictionPolicy(evictionPolicy);
- TriggerPolicy triggerPolicy = new WatermarkTimeTriggerPolicy(10,
- windowManager, evictionPolicy, windowManager);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
-
- windowManager.add(new EventImpl<>(1, 603, null, TOPIC));
- windowManager.add(new EventImpl<>(2, 605, null, TOPIC));
- windowManager.add(new EventImpl<>(3, 607, null, TOPIC));
-
- // This should trigger the scan to find
- // the next aligned window end ts, but not produce any activations
- windowManager.add(new WaterMarkEvent(609));
- assertEquals(Collections.emptyList(), listener.allOnActivationEvents);
-
- windowManager.add(new EventImpl<>(4, 618, null, TOPIC));
- windowManager.add(new EventImpl<>(5, 626, null, TOPIC));
- windowManager.add(new EventImpl<>(6, 636, null, TOPIC));
- // send a watermark event, which should trigger three windows.
- windowManager.add(new WaterMarkEvent(631));
-
- assertEquals(3, listener.allOnActivationEvents.size());
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC)
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC),
- new EventImpl<>(4, 618, null, TOPIC)
- }), listener.allOnActivationEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 618, null, TOPIC),
- new EventImpl<>(5, 626, null, TOPIC)
- }), listener.allOnActivationEvents.get(2));
-
- assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(0));
- assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC)
- }), listener.allOnActivationExpiredEvents.get(2));
-
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC)
- }), listener.allOnActivationNewEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 618, null, TOPIC)
- }), listener.allOnActivationNewEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(5, 626, null, TOPIC)
- }), listener.allOnActivationNewEvents.get(2));
-
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC)
- }), listener.allOnExpiryEvents.get(0));
-
- // add more events with a gap in ts
- windowManager.add(new EventImpl<>(7, 825, null, TOPIC));
- windowManager.add(new EventImpl<>(8, 826, null, TOPIC));
- windowManager.add(new EventImpl<>(9, 827, null, TOPIC));
- windowManager.add(new EventImpl<>(10, 839, null, TOPIC));
-
- listener.clear();
- windowManager.add(new WaterMarkEvent(834));
-
- assertEquals(3, listener.allOnActivationEvents.size());
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(5, 626, null, TOPIC),
- new EventImpl<>(6, 636, null, TOPIC)
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(6, 636, null, TOPIC)
- }), listener.allOnActivationEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(7, 825, null, TOPIC),
- new EventImpl<>(8, 826, null, TOPIC),
- new EventImpl<>(9, 827, null, TOPIC)
- }), listener.allOnActivationEvents.get(2));
-
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 618, null, TOPIC)
- }), listener.allOnActivationExpiredEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(5, 626, null, TOPIC)
- }), listener.allOnActivationExpiredEvents.get(1));
- assertEquals(Collections.emptyList(), listener.allOnActivationExpiredEvents.get(2));
-
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(6, 636, null, TOPIC)
- }), listener.allOnActivationNewEvents.get(0));
- assertEquals(Collections.emptyList(), listener.allOnActivationNewEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(7, 825, null, TOPIC),
- new EventImpl<>(8, 826, null, TOPIC),
- new EventImpl<>(9, 827, null, TOPIC)
- }), listener.allOnActivationNewEvents.get(2));
-
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 618, null, TOPIC)
- }), listener.allOnExpiryEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(5, 626, null, TOPIC)
- }), listener.allOnExpiryEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(6, 636, null, TOPIC)
- }), listener.allOnExpiryEvents.get(2));
- }
-
- @Test
- public void testCountBasedWindowWithEventTs() throws Exception {
- EvictionPolicy evictionPolicy = new WatermarkCountEvictionPolicy<>(3);
- windowManager.setEvictionPolicy(evictionPolicy);
- TriggerPolicy triggerPolicy
- = new WatermarkTimeTriggerPolicy(10, windowManager, evictionPolicy, windowManager);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
-
- windowManager.add(new EventImpl<>(1, 603, null, TOPIC));
- windowManager.add(new EventImpl<>(2, 605, null, TOPIC));
- windowManager.add(new EventImpl<>(3, 607, null, TOPIC));
- windowManager.add(new EventImpl<>(4, 618, null, TOPIC));
- windowManager.add(new EventImpl<>(5, 626, null, TOPIC));
- windowManager.add(new EventImpl<>(6, 636, null, TOPIC));
- // send a watermark event, which should trigger three windows.
- windowManager.add(new WaterMarkEvent(631));
-
- assertEquals(3, listener.allOnActivationEvents.size());
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC)
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC),
- new EventImpl<>(4, 618, null, TOPIC)
- }), listener.allOnActivationEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(3, 607, null, TOPIC),
- new EventImpl<>(4, 618, null, TOPIC),
- new EventImpl<>(5, 626, null, TOPIC)
- }), listener.allOnActivationEvents.get(2));
-
- // add more events with a gap in ts
- windowManager.add(new EventImpl<>(7, 665, null, TOPIC));
- windowManager.add(new EventImpl<>(8, 666, null, TOPIC));
- windowManager.add(new EventImpl<>(9, 667, null, TOPIC));
- windowManager.add(new EventImpl<>(10, 679, null, TOPIC));
-
- listener.clear();
- windowManager.add(new WaterMarkEvent(674));
- assertEquals(4, listener.allOnActivationEvents.size());
- // same set of events part of three windows
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 618, null, TOPIC),
- new EventImpl<>(5, 626, null, TOPIC),
- new EventImpl<>(6, 636, null, TOPIC)
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 618, null, TOPIC),
- new EventImpl<>(5, 626, null, TOPIC),
- new EventImpl<>(6, 636, null, TOPIC)
- }), listener.allOnActivationEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 618, null, TOPIC),
- new EventImpl<>(5, 626, null, TOPIC),
- new EventImpl<>(6, 636, null, TOPIC)
- }), listener.allOnActivationEvents.get(2));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(7, 665, null, TOPIC),
- new EventImpl<>(8, 666, null, TOPIC),
- new EventImpl<>(9, 667, null, TOPIC)
- }), listener.allOnActivationEvents.get(3));
- }
-
- @Test
- public void testCountBasedTriggerWithEventTs() throws Exception {
- EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy(20);
- windowManager.setEvictionPolicy(evictionPolicy);
- TriggerPolicy triggerPolicy
- = new WatermarkCountTriggerPolicy(3, windowManager, evictionPolicy, windowManager);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
-
- windowManager.add(new EventImpl<>(1, 603, null, TOPIC));
- windowManager.add(new EventImpl<>(2, 605, null, TOPIC));
- windowManager.add(new EventImpl<>(3, 607, null, TOPIC));
- windowManager.add(new EventImpl<>(4, 618, null, TOPIC));
- windowManager.add(new EventImpl<>(5, 625, null, TOPIC));
- windowManager.add(new EventImpl<>(6, 626, null, TOPIC));
- windowManager.add(new EventImpl<>(7, 629, null, TOPIC));
- windowManager.add(new EventImpl<>(8, 636, null, TOPIC));
- // send a watermark event, which should trigger three windows.
- windowManager.add(new WaterMarkEvent(631));
-
- assertEquals(2, listener.allOnActivationEvents.size());
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC)
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(3, 607, null, TOPIC),
- new EventImpl<>(4, 618, null, TOPIC),
- new EventImpl<>(5, 625, null, TOPIC),
- new EventImpl<>(6, 626, null, TOPIC)
-
- }), listener.allOnActivationEvents.get(1));
-
- // add more events with a gap in ts
- windowManager.add(new EventImpl<>(9, 665, null, TOPIC));
- windowManager.add(new EventImpl<>(10, 666, null, TOPIC));
- windowManager.add(new EventImpl<>(11, 667, null, TOPIC));
- windowManager.add(new EventImpl<>(12, 669, null, TOPIC));
- windowManager.add(new EventImpl<>(12, 679, null, TOPIC));
-
- listener.clear();
- windowManager.add(new WaterMarkEvent(674));
- assertEquals(2, listener.allOnActivationEvents.size());
- // same set of events part of three windows
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(9, 665, null, TOPIC)
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(9, 665, null, TOPIC),
- new EventImpl<>(10, 666, null, TOPIC),
- new EventImpl<>(11, 667, null, TOPIC),
- new EventImpl<>(12, 669, null, TOPIC),
- }), listener.allOnActivationEvents.get(1));
- }
-
- @Test
- public void testCountBasedTumblingWithSameEventTs() throws Exception {
- EvictionPolicy evictionPolicy = new WatermarkCountEvictionPolicy<>(2);
- windowManager.setEvictionPolicy(evictionPolicy);
- TriggerPolicy triggerPolicy
- = new WatermarkCountTriggerPolicy(2, windowManager, evictionPolicy, windowManager);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
-
- windowManager.add(new EventImpl<>(1, 10, null, TOPIC));
- windowManager.add(new EventImpl<>(2, 10, null, TOPIC));
- windowManager.add(new EventImpl<>(3, 11, null, TOPIC));
- windowManager.add(new EventImpl<>(4, 12, null, TOPIC));
- windowManager.add(new EventImpl<>(5, 12, null, TOPIC));
- windowManager.add(new EventImpl<>(6, 12, null, TOPIC));
- windowManager.add(new EventImpl<>(7, 12, null, TOPIC));
- windowManager.add(new EventImpl<>(8, 13, null, TOPIC));
- windowManager.add(new EventImpl<>(9, 14, null, TOPIC));
- windowManager.add(new EventImpl<>(10, 15, null, TOPIC));
-
- windowManager.add(new WaterMarkEvent(20));
- assertEquals(5, listener.allOnActivationEvents.size());
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 10, null, TOPIC),
- new EventImpl<>(2, 10, null, TOPIC)
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(3, 11, null, TOPIC),
- new EventImpl<>(4, 12, null, TOPIC)
- }), listener.allOnActivationEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(5, 12, null, TOPIC),
- new EventImpl<>(6, 12, null, TOPIC)
- }), listener.allOnActivationEvents.get(2));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(7, 12, null, TOPIC),
- new EventImpl<>(8, 13, null, TOPIC)
- }), listener.allOnActivationEvents.get(3));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(9, 14, null, TOPIC),
- new EventImpl<>(10, 15, null, TOPIC)
- }), listener.allOnActivationEvents.get(4));
- }
-
- @Test
- public void testCountBasedSlidingWithSameEventTs() throws Exception {
- EvictionPolicy evictionPolicy = new WatermarkCountEvictionPolicy<>(5);
- windowManager.setEvictionPolicy(evictionPolicy);
- TriggerPolicy triggerPolicy
- = new WatermarkCountTriggerPolicy(2, windowManager, evictionPolicy, windowManager);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
-
- windowManager.add(new EventImpl<>(1, 10, null, TOPIC));
- windowManager.add(new EventImpl<>(2, 10, null, TOPIC));
- windowManager.add(new EventImpl<>(3, 11, null, TOPIC));
- windowManager.add(new EventImpl<>(4, 12, null, TOPIC));
- windowManager.add(new EventImpl<>(5, 12, null, TOPIC));
- windowManager.add(new EventImpl<>(6, 12, null, TOPIC));
- windowManager.add(new EventImpl<>(7, 12, null, TOPIC));
- windowManager.add(new EventImpl<>(8, 13, null, TOPIC));
- windowManager.add(new EventImpl<>(9, 14, null, TOPIC));
- windowManager.add(new EventImpl<>(10, 15, null, TOPIC));
-
- windowManager.add(new WaterMarkEvent(20));
- assertEquals(5, listener.allOnActivationEvents.size());
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 10, null, TOPIC),
- new EventImpl<>(2, 10, null, TOPIC)
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 10, null, TOPIC),
- new EventImpl<>(2, 10, null, TOPIC),
- new EventImpl<>(3, 11, null, TOPIC),
- new EventImpl<>(4, 12, null, TOPIC)
- }), listener.allOnActivationEvents.get(1));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(2, 10, null, TOPIC),
- new EventImpl<>(3, 11, null, TOPIC),
- new EventImpl<>(4, 12, null, TOPIC),
- new EventImpl<>(5, 12, null, TOPIC),
- new EventImpl<>(6, 12, null, TOPIC)
- }), listener.allOnActivationEvents.get(2));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 12, null, TOPIC),
- new EventImpl<>(5, 12, null, TOPIC),
- new EventImpl<>(6, 12, null, TOPIC),
- new EventImpl<>(7, 12, null, TOPIC),
- new EventImpl<>(8, 13, null, TOPIC)
- }), listener.allOnActivationEvents.get(3));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(6, 12, null, TOPIC),
- new EventImpl<>(7, 12, null, TOPIC),
- new EventImpl<>(8, 13, null, TOPIC),
- new EventImpl<>(9, 14, null, TOPIC),
- new EventImpl<>(10, 15, null, TOPIC),
- }), listener.allOnActivationEvents.get(4));
- }
-
- @Test
- public void testEventTimeLag() throws Exception {
- EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy<>(20, 5);
- windowManager.setEvictionPolicy(evictionPolicy);
- TriggerPolicy triggerPolicy
- = new WatermarkTimeTriggerPolicy(10, windowManager, evictionPolicy, windowManager);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
-
- windowManager.add(new EventImpl<>(1, 603, null, TOPIC));
- windowManager.add(new EventImpl<>(2, 605, null, TOPIC));
- windowManager.add(new EventImpl<>(3, 607, null, TOPIC));
- windowManager.add(new EventImpl<>(4, 618, null, TOPIC));
- windowManager.add(new EventImpl<>(5, 626, null, TOPIC));
- windowManager.add(new EventImpl<>(6, 632, null, TOPIC));
- windowManager.add(new EventImpl<>(7, 629, null, TOPIC));
- windowManager.add(new EventImpl<>(8, 636, null, TOPIC));
- // send a watermark event, which should trigger three windows.
- windowManager.add(new WaterMarkEvent(631));
- assertEquals(3, listener.allOnActivationEvents.size());
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC),
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC),
- new EventImpl<>(4, 618, null, TOPIC),
- }), listener.allOnActivationEvents.get(1));
- // out of order events should be processed upto the lag
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 618, null, TOPIC),
- new EventImpl<>(5, 626, null, TOPIC),
- new EventImpl<>(7, 629, null, TOPIC)
- }), listener.allOnActivationEvents.get(2));
- }
-
- @Test
- public void testScanStop() throws Exception {
- final Set> eventsScanned = new HashSet<>();
- EvictionPolicy evictionPolicy = new WatermarkTimeEvictionPolicy(20, 5) {
-
- @Override
- public Action evict(Event event) {
- eventsScanned.add(event);
- return super.evict(event);
- }
-
- };
- windowManager.setEvictionPolicy(evictionPolicy);
- TriggerPolicy triggerPolicy
- = new WatermarkTimeTriggerPolicy(10, windowManager, evictionPolicy, windowManager);
- triggerPolicy.start();
- windowManager.setTriggerPolicy(triggerPolicy);
-
- windowManager.add(new EventImpl<>(1, 603, null, TOPIC));
- windowManager.add(new EventImpl<>(2, 605, null, TOPIC));
- windowManager.add(new EventImpl<>(3, 607, null, TOPIC));
- windowManager.add(new EventImpl<>(4, 618, null, TOPIC));
- windowManager.add(new EventImpl<>(5, 626, null, TOPIC));
- windowManager.add(new EventImpl<>(6, 629, null, TOPIC));
- windowManager.add(new EventImpl<>(7, 636, null, TOPIC));
- windowManager.add(new EventImpl<>(8, 637, null, TOPIC));
- windowManager.add(new EventImpl<>(9, 638, null, TOPIC));
- windowManager.add(new EventImpl<>(10, 639, null, TOPIC));
-
- // send a watermark event, which should trigger three windows.
- windowManager.add(new WaterMarkEvent(631));
-
- assertEquals(3, listener.allOnActivationEvents.size());
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC),
- }), listener.allOnActivationEvents.get(0));
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC),
- new EventImpl<>(4, 618, null, TOPIC),
- }), listener.allOnActivationEvents.get(1));
-
- // out of order events should be processed upto the lag
- assertEquals(Arrays.asList(new Event[]{
- new EventImpl<>(4, 618, null, TOPIC),
- new EventImpl<>(5, 626, null, TOPIC),
- new EventImpl<>(6, 629, null, TOPIC)
- }), listener.allOnActivationEvents.get(2));
-
- // events 8, 9, 10 should not be scanned at all since TimeEvictionPolicy lag 5s should break
- // the WindowManager scan loop early.
- assertEquals(new HashSet<>(Arrays.asList(new Event[]{
- new EventImpl<>(1, 603, null, TOPIC),
- new EventImpl<>(2, 605, null, TOPIC),
- new EventImpl<>(3, 607, null, TOPIC),
- new EventImpl<>(4, 618, null, TOPIC),
- new EventImpl<>(5, 626, null, TOPIC),
- new EventImpl<>(6, 629, null, TOPIC),
- new EventImpl<>(7, 636, null, TOPIC)
- })), eventsScanned);
- }
-
- private List> seq(int start) {
- return seq(start, start);
- }
-
- private List> seq(int start, int stop) {
- return seq(start, stop, null);
- }
-
- private List> seq(int start, int stop, Long ts) {
- long timestamp = TIMESTAMP;
- if (ts != null) {
- timestamp = ts;
- }
- List> ints = new ArrayList<>();
- for (int i = start; i <= stop; i++) {
- ints.add(new EventImpl<>(i, timestamp, null, TOPIC));
- }
- return ints;
- }
-
- private List> seqThreshold(int start, int stop) {
- List