diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java index 9f751616c7c46..da606a93ba0b7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java @@ -171,7 +171,7 @@ public boolean equals(Object o) { else if (o != null && getClass() == o.getClass()) { StreamRecord that = (StreamRecord) o; return this.hasTimestamp == that.hasTimestamp && - this.timestamp == that.timestamp && + (!this.hasTimestamp || this.timestamp == that.timestamp) && (this.value == null ? that.value == null : this.value.equals(that.value)); } else { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java new file mode 100644 index 0000000000000..a61d995420913 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -0,0 +1,366 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.operators.testutils.MockEnvironment; +import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.ClosableRegistry; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +/** + * Base class for {@code AbstractStreamOperator} test harnesses. + */ +public class AbstractStreamOperatorTestHarness { + + public static final int MAX_PARALLELISM = 10; + + final protected StreamOperator operator; + + final protected ConcurrentLinkedQueue outputList; + + final protected StreamConfig config; + + final protected ExecutionConfig executionConfig; + + final protected TestProcessingTimeService processingTimeService; + + final protected StreamTask mockTask; + + ClosableRegistry closableRegistry; + + // use this as default for tests + protected AbstractStateBackend stateBackend = new MemoryStateBackend(); + + private final Object checkpointLock; + + /** + * Whether setup() was called on the operator. This is reset when calling close(). + */ + private boolean setupCalled = false; + private boolean initializeCalled = false; + + private volatile boolean wasFailedExternally = false; + + public AbstractStreamOperatorTestHarness(StreamOperator operator) throws Exception { + this(operator, new ExecutionConfig()); + } + + public AbstractStreamOperatorTestHarness( + StreamOperator operator, + ExecutionConfig executionConfig) throws Exception { + this.operator = operator; + this.outputList = new ConcurrentLinkedQueue<>(); + Configuration underlyingConfig = new Configuration(); + this.config = new StreamConfig(underlyingConfig); + this.config.setCheckpointingEnabled(true); + this.executionConfig = executionConfig; + this.closableRegistry = new ClosableRegistry(); + this.checkpointLock = new Object(); + + final Environment env = new MockEnvironment( + "MockTask", + 3 * 1024 * 1024, + new MockInputSplitProvider(), + 1024, + underlyingConfig, + executionConfig, + MAX_PARALLELISM, + 1, 0); + + mockTask = mock(StreamTask.class); + processingTimeService = new TestProcessingTimeService(); + processingTimeService.setCurrentTime(0); + + when(mockTask.getName()).thenReturn("Mock Task"); + when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); + when(mockTask.getConfiguration()).thenReturn(config); + when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); + when(mockTask.getEnvironment()).thenReturn(env); + when(mockTask.getExecutionConfig()).thenReturn(executionConfig); + when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader()); + when(mockTask.getCancelables()).thenReturn(this.closableRegistry); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + wasFailedExternally = true; + return null; + } + }).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class)); + + try { + doAnswer(new Answer() { + @Override + public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable { + + final StreamOperator operator = (StreamOperator) invocationOnMock.getArguments()[0]; + return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName()); + } + }).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class)); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + + try { + doAnswer(new Answer() { + @Override + public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { + final StreamOperator operator = (StreamOperator) invocationOnMock.getArguments()[0]; + final Collection stateHandles = (Collection) invocationOnMock.getArguments()[1]; + OperatorStateBackend osb; + if (null == stateHandles) { + osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName()); + } else { + osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles); + } + mockTask.getCancelables().registerClosable(osb); + return osb; + } + }).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), any(Collection.class)); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + + doAnswer(new Answer() { + @Override + public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable { + return processingTimeService; + } + }).when(mockTask).getProcessingTimeService(); + } + + public void setStateBackend(AbstractStateBackend stateBackend) { + this.stateBackend = stateBackend; + } + + public Object getCheckpointLock() { + return mockTask.getCheckpointLock(); + } + + public Environment getEnvironment() { + return this.mockTask.getEnvironment(); + } + + /** + * Get all the output from the task. This contains StreamRecords and Events interleaved. + */ + public ConcurrentLinkedQueue getOutput() { + return outputList; + } + + /** + * Get all the output from the task and clear the output buffer. + * This contains only StreamRecords. + */ + @SuppressWarnings("unchecked") + public List> extractOutputStreamRecords() { + List> resultElements = new LinkedList<>(); + for (Object e: getOutput()) { + if (e instanceof StreamRecord) { + resultElements.add((StreamRecord) e); + } + } + return resultElements; + } + + /** + * Calls + * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} + */ + public void setup() throws Exception { + operator.setup(mockTask, config, new MockOutput()); + setupCalled = true; + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} + * if it was not called before. + */ + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { + if (!setupCalled) { + setup(); + } + operator.initializeState(operatorStateHandles); + initializeCalled = true; + } + + + /** + * Calls {@link StreamOperator#open()}. This also + * calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} + * if it was not called before. + */ + public void open() throws Exception { + if (!initializeCalled) { + initializeState(null); + } + operator.open(); + } + + /** + * Calls {@link StreamOperator#snapshotState(long, long, CheckpointStreamFactory)}. + */ + public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception { + + CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory( + new JobID(), + "test_op"); + + return operator.snapshotState(checkpointId, timestamp, streamFactory); + } + + /** + * Calls {@link StreamCheckpointedOperator#snapshotState(FSDataOutputStream, long, long)} if + * the operator implements this interface. + */ + @Deprecated + public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception { + + CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( + new JobID(), + "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); + if(operator instanceof StreamCheckpointedOperator) { + ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); + return outStream.closeAndGetHandle(); + } else { + throw new RuntimeException("Operator is not StreamCheckpointedOperator"); + } + } + + /** + * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()} + */ + public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { + operator.notifyOfCompletedCheckpoint(checkpointId); + } + + /** + * Calls {@link StreamCheckpointedOperator#restoreState(FSDataInputStream)} if + * the operator implements this interface. + */ @Deprecated + public void restore(StreamStateHandle snapshot) throws Exception { + if(operator instanceof StreamCheckpointedOperator) { + try (FSDataInputStream in = snapshot.openInputStream()) { + ((StreamCheckpointedOperator) operator).restoreState(in); + } + } else { + throw new RuntimeException("Operator is not StreamCheckpointedOperator"); + } + } + + /** + * Calls close and dispose on the operator. + */ + public void close() throws Exception { + operator.close(); + operator.dispose(); + if (processingTimeService != null) { + processingTimeService.shutdownService(); + } + setupCalled = false; + } + + public void setProcessingTime(long time) throws Exception { + processingTimeService.setCurrentTime(time); + } + + public long getProcessingTime() { + return processingTimeService.getCurrentProcessingTime(); + } + + public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { + this.config.setTimeCharacteristic(timeCharacteristic); + } + + public TimeCharacteristic getTimeCharacteristic() { + return this.config.getTimeCharacteristic(); + } + + public boolean wasFailedExternally() { + return wasFailedExternally; + } + + private class MockOutput implements Output> { + + private TypeSerializer outputSerializer; + + @Override + public void emitWatermark(Watermark mark) { + outputList.add(mark); + } + + @Override + public void emitLatencyMarker(LatencyMarker latencyMarker) { + outputList.add(latencyMarker); + } + + @Override + public void collect(StreamRecord element) { + if (outputSerializer == null) { + outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig); + } + if (element.hasTimestamp()) { + outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),element.getTimestamp())); + } else { + outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()))); + } + } + + @Override + public void close() { + // ignore + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 9c9d11b263b31..99527e702bfb0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -50,7 +50,6 @@ /** * Extension of {@link OneInputStreamOperatorTestHarness} that allows the operator to get * a {@link KeyedStateBackend}. - * */ public class KeyedOneInputStreamOperatorTestHarness extends OneInputStreamOperatorTestHarness { @@ -171,7 +170,7 @@ public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throw } /** - * + * */ @Override public void restore(StreamStateHandle snapshot) throws Exception { @@ -189,21 +188,12 @@ public void restore(StreamStateHandle snapshot) throws Exception { } } - /** - * Calls close and dispose on the operator. - */ - public void close() throws Exception { - super.close(); - if (keyedStateBackend != null) { - keyedStateBackend.dispose(); - } - } - @Override public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { - if (null != operatorStateHandles) { - this.restoredKeyedState = operatorStateHandles.getManagedKeyedState(); + if (operatorStateHandles != null) { + restoredKeyedState = operatorStateHandles.getManagedKeyedState(); } + super.initializeState(operatorStateHandles); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java new file mode 100644 index 0000000000000..2e9885c0f6584 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.ClosureCleaner; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.util.Collection; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.doAnswer; + +/** + * Extension of {@link TwoInputStreamOperatorTestHarness} that allows the operator to get + * a {@link KeyedStateBackend}. + */ +public class KeyedTwoInputStreamOperatorTestHarness + extends TwoInputStreamOperatorTestHarness { + + // in case the operator creates one we store it here so that we + // can snapshot its state + private AbstractKeyedStateBackend keyedStateBackend = null; + + // when we restore we keep the state here so that we can call restore + // when the operator requests the keyed state backend + private Collection restoredKeyedState = null; + + public KeyedTwoInputStreamOperatorTestHarness( + TwoInputStreamOperator operator, + final KeySelector keySelector1, + final KeySelector keySelector2, + TypeInformation keyType) throws Exception { + super(operator); + + ClosureCleaner.clean(keySelector1, false); + ClosureCleaner.clean(keySelector2, false); + config.setStatePartitioner(0, keySelector1); + config.setStatePartitioner(1, keySelector2); + config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + config.setNumberOfKeyGroups(MAX_PARALLELISM); + + setupMockTaskCreateKeyedBackend(); + } + + public KeyedTwoInputStreamOperatorTestHarness( + TwoInputStreamOperator operator, + ExecutionConfig executionConfig, + KeySelector keySelector1, + KeySelector keySelector2, + TypeInformation keyType) throws Exception { + super(operator, executionConfig); + + ClosureCleaner.clean(keySelector1, false); + ClosureCleaner.clean(keySelector2, false); + config.setStatePartitioner(0, keySelector1); + config.setStatePartitioner(1, keySelector2); + config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + config.setNumberOfKeyGroups(MAX_PARALLELISM); + + setupMockTaskCreateKeyedBackend(); + } + + private void setupMockTaskCreateKeyedBackend() { + + try { + doAnswer(new Answer() { + @Override + public KeyedStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { + + final TypeSerializer keySerializer = (TypeSerializer) invocationOnMock.getArguments()[0]; + final int numberOfKeyGroups = (Integer) invocationOnMock.getArguments()[1]; + final KeyGroupRange keyGroupRange = (KeyGroupRange) invocationOnMock.getArguments()[2]; + + if(keyedStateBackend != null) { + keyedStateBackend.close(); + } + + if (restoredKeyedState == null) { + keyedStateBackend = stateBackend.createKeyedStateBackend( + mockTask.getEnvironment(), + new JobID(), + "test_op", + keySerializer, + numberOfKeyGroups, + keyGroupRange, + mockTask.getEnvironment().getTaskKvStateRegistry()); + return keyedStateBackend; + } else { + keyedStateBackend = stateBackend.restoreKeyedStateBackend( + mockTask.getEnvironment(), + new JobID(), + "test_op", + keySerializer, + numberOfKeyGroups, + keyGroupRange, + restoredKeyedState, + mockTask.getEnvironment().getTaskKvStateRegistry()); + restoredKeyedState = null; + return keyedStateBackend; + } + } + }).when(mockTask).createKeyedStateBackend(any(TypeSerializer.class), anyInt(), any(KeyGroupRange.class)); + } catch (Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { + if (restoredKeyedState != null) { + restoredKeyedState = operatorStateHandles.getManagedKeyedState(); + } + + super.initializeState(operatorStateHandles); + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 5b277bf5e5563..a3e095a0863e1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -18,89 +18,23 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.ClosableRegistry; -import org.apache.flink.runtime.state.OperatorStateBackend; -import org.apache.flink.runtime.state.OperatorStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.TimeCharacteristic; -import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.streaming.api.operators.OperatorSnapshotResult; -import org.apache.flink.streaming.api.operators.Output; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; -import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.util.InstantiationUtil; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.RunnableFuture; - -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * A test harness for testing a {@link OneInputStreamOperator}. * - *

- * This mock task provides the operator with a basic runtime context and allows pushing elements + *

This mock task provides the operator with a basic runtime context and allows pushing elements * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements * and watermarks can be retrieved. You are free to modify these. */ -public class OneInputStreamOperatorTestHarness { - - public static final int MAX_PARALLELISM = 10; - - final OneInputStreamOperator operator; - - final ConcurrentLinkedQueue outputList; - - final StreamConfig config; - - final ExecutionConfig executionConfig; - - final TestProcessingTimeService processingTimeService; - - StreamTask mockTask; - - ClosableRegistry closableRegistry; - - // use this as default for tests - AbstractStateBackend stateBackend = new MemoryStateBackend(); +public class OneInputStreamOperatorTestHarness + extends AbstractStreamOperatorTestHarness { - private final Object checkpointLock; - - /** - * Whether setup() was called on the operator. This is reset when calling close(). - */ - private boolean setupCalled = false; - private boolean initializeCalled = false; - - private volatile boolean wasFailedExternally = false; + private final OneInputStreamOperator oneInputOperator; public OneInputStreamOperatorTestHarness(OneInputStreamOperator operator) throws Exception { this(operator, new ExecutionConfig()); @@ -109,268 +43,24 @@ public OneInputStreamOperatorTestHarness(OneInputStreamOperator operato public OneInputStreamOperatorTestHarness( OneInputStreamOperator operator, ExecutionConfig executionConfig) throws Exception { - this.operator = operator; - this.outputList = new ConcurrentLinkedQueue<>(); - Configuration underlyingConfig = new Configuration(); - this.config = new StreamConfig(underlyingConfig); - this.config.setCheckpointingEnabled(true); - this.executionConfig = executionConfig; - this.closableRegistry = new ClosableRegistry(); - - this.checkpointLock = new Object(); - - final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0); - mockTask = mock(StreamTask.class); - processingTimeService = new TestProcessingTimeService(); - processingTimeService.setCurrentTime(0); - - when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); - when(mockTask.getConfiguration()).thenReturn(config); - when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); - when(mockTask.getEnvironment()).thenReturn(env); - when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader()); - when(mockTask.getCancelables()).thenReturn(this.closableRegistry); - - doAnswer(new Answer() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - wasFailedExternally = true; - return null; - } - }).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class)); - - try { - doAnswer(new Answer() { - @Override - public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable { - - final StreamOperator operator = (StreamOperator) invocationOnMock.getArguments()[0]; - return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName()); - } - }).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class)); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - - try { - doAnswer(new Answer() { - @Override - public OperatorStateBackend answer(InvocationOnMock invocationOnMock) throws Throwable { - final StreamOperator operator = (StreamOperator) invocationOnMock.getArguments()[0]; - final Collection stateHandles = (Collection) invocationOnMock.getArguments()[1]; - OperatorStateBackend osb; - if (null == stateHandles) { - osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName()); - } else { - osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles); - } - mockTask.getCancelables().registerClosable(osb); - return osb; - } - }).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), any(Collection.class)); - } catch (Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - - doAnswer(new Answer() { - @Override - public ProcessingTimeService answer(InvocationOnMock invocation) throws Throwable { - return processingTimeService; - } - }).when(mockTask).getProcessingTimeService(); - } - - public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { - this.config.setTimeCharacteristic(timeCharacteristic); - } - - public TimeCharacteristic getTimeCharacteristic() { - return this.config.getTimeCharacteristic(); - } - - public boolean wasFailedExternally() { - return wasFailedExternally; - } - - public void setStateBackend(AbstractStateBackend stateBackend) { - this.stateBackend = stateBackend; - } - - public Object getCheckpointLock() { - return mockTask.getCheckpointLock(); - } - - public Environment getEnvironment() { - return this.mockTask.getEnvironment(); - } - - /** - * Get all the output from the task. This contains StreamRecords and Events interleaved. - */ - public ConcurrentLinkedQueue getOutput() { - return outputList; - } - - /** - * Get all the output from the task and clear the output buffer. - * This contains only StreamRecords. - */ - @SuppressWarnings("unchecked") - public List> extractOutputStreamRecords() { - List> resultElements = new LinkedList<>(); - for (Object e: getOutput()) { - if (e instanceof StreamRecord) { - resultElements.add((StreamRecord) e); - } - } - return resultElements; - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} ()} - */ - public void setup() throws Exception { - operator.setup(mockTask, config, new MockOutput()); - setupCalled = true; - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, StreamConfig, Output)} - * if it was not called before. - */ - public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { - if (!setupCalled) { - setup(); - } - operator.initializeState(operatorStateHandles); - initializeCalled = true; - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)} if it - * was not called before. - */ - public void open() throws Exception { - if (!initializeCalled) { - initializeState(null); - } - operator.open(); - } - - /** - * - */ - public OperatorSnapshotResult snapshot(long checkpointId, long timestamp) throws Exception { - - CheckpointStreamFactory streamFactory = stateBackend.createStreamFactory( - new JobID(), - "test_op"); - - return operator.snapshotState(checkpointId, timestamp, streamFactory); - } - - /** - * - */ - @Deprecated - public StreamStateHandle snapshotLegacy(long checkpointId, long timestamp) throws Exception { - - CheckpointStreamFactory.CheckpointStateOutputStream outStream = stateBackend.createStreamFactory( - new JobID(), - "test_op").createCheckpointStateOutputStream(checkpointId, timestamp); - if(operator instanceof StreamCheckpointedOperator) { - ((StreamCheckpointedOperator) operator).snapshotState(outStream, checkpointId, timestamp); - return outStream.closeAndGetHandle(); - } else { - throw new RuntimeException("Operator is not StreamCheckpointedOperator"); - } - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)} ()} - */ - public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception { - operator.notifyOfCompletedCheckpoint(checkpointId); - } + super(operator, executionConfig); - /** - * - */ - @Deprecated - public void restore(StreamStateHandle snapshot) throws Exception { - if(operator instanceof StreamCheckpointedOperator) { - try (FSDataInputStream in = snapshot.openInputStream()) { - ((StreamCheckpointedOperator) operator).restoreState(in); - } - } else { - throw new RuntimeException("Operator is not StreamCheckpointedOperator"); - } - } - - /** - * Calls close and dispose on the operator. - */ - public void close() throws Exception { - operator.close(); - operator.dispose(); - if (processingTimeService != null) { - processingTimeService.shutdownService(); - } - setupCalled = false; + this.oneInputOperator = operator; } public void processElement(StreamRecord element) throws Exception { operator.setKeyContextElement1(element); - operator.processElement(element); + oneInputOperator.processElement(element); } public void processElements(Collection> elements) throws Exception { for (StreamRecord element: elements) { operator.setKeyContextElement1(element); - operator.processElement(element); - } - } - - public void setProcessingTime(long time) throws Exception { - synchronized (checkpointLock) { - processingTimeService.setCurrentTime(time); + oneInputOperator.processElement(element); } } public void processWatermark(Watermark mark) throws Exception { - operator.processWatermark(mark); - } - - private class MockOutput implements Output> { - - private TypeSerializer outputSerializer; - - @Override - public void emitWatermark(Watermark mark) { - outputList.add(mark); - } - - @Override - public void emitLatencyMarker(LatencyMarker latencyMarker) { - outputList.add(latencyMarker); - } - - @Override - public void collect(StreamRecord element) { - if (outputSerializer == null) { - outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig); - } - outputList.add(new StreamRecord(outputSerializer.copy(element.getValue()), - element.getTimestamp())); - } - - @Override - public void close() { - // ignore - } + oneInputOperator.processWatermark(mark); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index 7df68483a5694..95eea98e5b5b5 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -19,26 +19,9 @@ package org.apache.flink.streaming.util; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.MockEnvironment; -import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider; -import org.apache.flink.runtime.state.ClosableRegistry; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; -import org.apache.flink.streaming.runtime.tasks.StreamTask; - -import java.util.concurrent.ConcurrentLinkedQueue; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * A test harness for testing a {@link TwoInputStreamOperator}. @@ -48,122 +31,35 @@ * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements * and watermarks can be retrieved. you are free to modify these. */ -public class TwoInputStreamOperatorTestHarness { - - TwoInputStreamOperator operator; - - final ConcurrentLinkedQueue outputList; - - final ExecutionConfig executionConfig; - - final Object checkpointLock; +public class TwoInputStreamOperatorTestHarnessextends AbstractStreamOperatorTestHarness { - final ClosableRegistry closableRegistry; + private final TwoInputStreamOperator twoInputOperator; - boolean initializeCalled = false; - - public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator operator) { - this(operator, new StreamConfig(new Configuration())); + public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator operator) throws Exception { + this(operator, new ExecutionConfig()); } - public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator operator, StreamConfig config) { - this.operator = operator; - this.outputList = new ConcurrentLinkedQueue(); - this.executionConfig = new ExecutionConfig(); - this.checkpointLock = new Object(); - this.closableRegistry = new ClosableRegistry(); - - Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024); - StreamTask mockTask = mock(StreamTask.class); - when(mockTask.getName()).thenReturn("Mock Task"); - when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); - when(mockTask.getConfiguration()).thenReturn(config); - when(mockTask.getEnvironment()).thenReturn(env); - when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getCancelables()).thenReturn(this.closableRegistry); - - operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput()); - } - - /** - * Get all the output from the task. This contains StreamRecords and Events interleaved. Use - * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)} - * to extract only the StreamRecords. - */ - public ConcurrentLinkedQueue getOutput() { - return outputList; - } + public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator operator, ExecutionConfig executionConfig) throws Exception { + super(operator, executionConfig); - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}. - */ - public void initializeState(OperatorStateHandles operatorStateHandles) throws Exception { - operator.initializeState(operatorStateHandles); - initializeCalled = true; - } - - /** - * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}. - */ - public void open() throws Exception { - if(!initializeCalled) { - initializeState(mock(OperatorStateHandles.class)); - } - - operator.open(); - } - - /** - * Calls close on the operator. - */ - public void close() throws Exception { - operator.close(); + this.twoInputOperator = operator; } public void processElement1(StreamRecord element) throws Exception { - operator.processElement1(element); + twoInputOperator.setKeyContextElement1(element); + twoInputOperator.processElement1(element); } public void processElement2(StreamRecord element) throws Exception { - operator.processElement2(element); + twoInputOperator.setKeyContextElement2(element); + twoInputOperator.processElement2(element); } public void processWatermark1(Watermark mark) throws Exception { - operator.processWatermark1(mark); + twoInputOperator.processWatermark1(mark); } public void processWatermark2(Watermark mark) throws Exception { - operator.processWatermark2(mark); - } - - private class MockOutput implements Output> { - - private TypeSerializer outputSerializer; - - @Override - @SuppressWarnings("unchecked") - public void emitWatermark(Watermark mark) { - outputList.add(mark); - } - - @Override - public void emitLatencyMarker(LatencyMarker latencyMarker) { - outputList.add(latencyMarker); - } - - @Override - @SuppressWarnings("unchecked") - public void collect(StreamRecord element) { - if (outputSerializer == null) { - outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig); - } - outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()), - element.getTimestamp())); - } - - @Override - public void close() { - // ignore - } + twoInputOperator.processWatermark2(mark); } }