diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractWriterOperator.java new file mode 100644 index 0000000000000..5706d78547e75 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractWriterOperator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.InternalTimerService; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.util.List; + +/** + * Abstract base class for operators that work with a {@link Writer}. + * + *

Sub-classes are responsible for creating the specific {@link Writer} by implementing {@link + * #createWriter()}. + * + * @param The input type of the {@link Writer}. + * @param The committable type of the {@link Writer}. + */ +@Internal +abstract class AbstractWriterOperator extends AbstractStreamOperator + implements OneInputStreamOperator, BoundedOneInput { + + private static final long serialVersionUID = 1L; + + /** The runtime information of the input element. */ + private final Context context; + + // ------------------------------- runtime fields --------------------------------------- + + /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */ + private Long currentWatermark; + + /** The sink writer that does most of the work. */ + protected Writer writer; + + AbstractWriterOperator() { + this.context = new Context<>(); + } + + @Override + public void open() throws Exception { + super.open(); + + this.currentWatermark = Long.MIN_VALUE; + + writer = createWriter(); + } + + @Override + public void processElement(StreamRecord element) throws Exception { + context.element = element; + writer.write(element.getValue(), context); + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + super.prepareSnapshotPreBarrier(checkpointId); + sendCommittables(writer.prepareCommit(false)); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + this.currentWatermark = mark.getTimestamp(); + } + + @Override + public void endInput() throws Exception { + sendCommittables(writer.prepareCommit(true)); + } + + @Override + public void close() throws Exception { + super.close(); + writer.close(); + } + + protected Sink.InitContext createInitContext() { + return new Sink.InitContext() { + @Override + public int getSubtaskId() { + return getRuntimeContext().getIndexOfThisSubtask(); + } + + @Override + public MetricGroup metricGroup() { + return getMetricGroup(); + } + }; + } + + /** + * Creates and returns a {@link Writer}. + * + * @throws Exception If creating {@link Writer} fail + */ + abstract Writer createWriter() throws Exception; + + private void sendCommittables(final List committables) { + for (CommT committable : committables) { + output.collect(new StreamRecord<>(committable)); + } + } + + private class Context implements Writer.Context { + + private StreamRecord element; + + @Override + public long currentWatermark() { + return currentWatermark; + } + + @Override + public Long timestamp() { + if (element.hasTimestamp()) { + return element.getTimestamp(); + } + return null; + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractWriterOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractWriterOperatorFactory.java new file mode 100644 index 0000000000000..7493db6a40265 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/AbstractWriterOperatorFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; + +/** + * Base {@link OneInputStreamOperatorFactory} for subclasses of {@link AbstractWriterOperator}. + * + * @param The input type of the {@link Writer}. + * @param The committable type of the {@link Writer}. + */ +abstract class AbstractWriterOperatorFactory extends AbstractStreamOperatorFactory + implements OneInputStreamOperatorFactory { + + @Override + @SuppressWarnings("unchecked") + public > T createStreamOperator(StreamOperatorParameters parameters) { + final AbstractWriterOperator writerOperator = createWriterOperator(); + writerOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); + return (T) writerOperator; + } + + abstract AbstractWriterOperator createWriterOperator(); +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulWriterOperator.java new file mode 100644 index 0000000000000..67bd7ba6a7e0b --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulWriterOperator.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; +import org.apache.flink.util.CollectionUtil; + +import java.util.List; + +/** + * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link + * Writer Writers} that have state. + * + * @param The input type of the {@link Writer}. + * @param The committable type of the {@link Writer}. + * @param The type of the {@link Writer Writer's} state. + */ +@Internal +final class StatefulWriterOperator extends AbstractWriterOperator { + + /** The operator's state descriptor. */ + private static final ListStateDescriptor WRITER_RAW_STATES_DESC = + new ListStateDescriptor<>("writer_raw_states", BytePrimitiveArraySerializer.INSTANCE); + + /** Used to create the stateful {@link Writer}. */ + private final Sink sink; + + /** The writer operator's state serializer. */ + private final SimpleVersionedSerializer writerStateSimpleVersionedSerializer; + + // ------------------------------- runtime fields --------------------------------------- + + /** The operator's state. */ + private ListState writerState; + + StatefulWriterOperator( + final Sink sink, + final SimpleVersionedSerializer writerStateSimpleVersionedSerializer) { + this.sink = sink; + this.writerStateSimpleVersionedSerializer = writerStateSimpleVersionedSerializer; + } + + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + final ListState rawState = context.getOperatorStateStore().getListState(WRITER_RAW_STATES_DESC); + writerState = new SimpleVersionedListState<>(rawState, writerStateSimpleVersionedSerializer); + } + + @SuppressWarnings("unchecked") + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + writerState.update((List) writer.snapshotState()); + } + + @Override + Writer createWriter() throws Exception { + final List committables = CollectionUtil.iterableToList(writerState.get()); + return sink.createWriter(createInitContext(), committables); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulWriterOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulWriterOperatorFactory.java new file mode 100644 index 0000000000000..83b1b8b8ab246 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatefulWriterOperatorFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.streaming.api.operators.StreamOperator; + +/** + * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * StatefulWriterOperator}. + * + * @param The input type of the {@link Writer}. + * @param The committable type of the {@link Writer}. + * @param The type of the {@link Writer Writer's} state. + */ +public final class StatefulWriterOperatorFactory extends AbstractWriterOperatorFactory { + + private final Sink sink; + + public StatefulWriterOperatorFactory(Sink sink) { + this.sink = sink; + } + + @Override + AbstractWriterOperator createWriterOperator() { + return new StatefulWriterOperator<>(sink, sink.getWriterStateSerializer().get()); + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return StatefulWriterOperator.class; + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessWriterOperator.java new file mode 100644 index 0000000000000..8e093bd442cd4 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessWriterOperator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.Writer; + +import java.util.Collections; + +/** + * Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link + * Writer Writers} that don't have state. + * + * @param The input type of the {@link Writer}. + * @param The committable type of the {@link Writer}. + */ +@Internal +final class StatelessWriterOperator extends AbstractWriterOperator{ + + /** Used to create the stateless {@link Writer}. */ + private final Sink sink; + + StatelessWriterOperator(final Sink sink) { + this.sink = sink; + } + + @Override + Writer createWriter() { + return sink.createWriter(createInitContext(), Collections.emptyList()); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessWriterOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessWriterOperatorFactory.java new file mode 100644 index 0000000000000..7204a6538fbb5 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/StatelessWriterOperatorFactory.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.streaming.api.operators.StreamOperator; + +/** + * A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link + * StatelessWriterOperator}. + * + * @param The input type of the {@link Writer}. + * @param The committable type of the {@link Writer}. + */ +public final class StatelessWriterOperatorFactory extends AbstractWriterOperatorFactory { + + private final Sink sink; + + public StatelessWriterOperatorFactory(Sink sink) { + this.sink = sink; + } + + @Override + AbstractWriterOperator createWriterOperator() { + return new StatelessWriterOperator<>(sink); + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return StatelessWriterOperator.class; + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WriterOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WriterOperatorTest.java new file mode 100644 index 0000000000000..adfd226a82611 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/sink/WriterOperatorTest.java @@ -0,0 +1,305 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators.sink; + +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.Writer; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.function.FunctionUtils; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.function.Consumer; + +/** + * Test the writer operator. + */ +public class WriterOperatorTest { + + @Test + public void testStatelessWriter() throws Exception { + final long initialTime = 0; + final ConcurrentLinkedQueue expectedPreCommitOutput = + new ConcurrentLinkedQueue<>( + Arrays.asList( + new Watermark(initialTime), + new StreamRecord<>(Tuple3.of(1, initialTime + 1, initialTime)), + new StreamRecord<>(Tuple3.of(2, initialTime + 2, initialTime)))); + + final ConcurrentLinkedQueue expectedEndOutput = new ConcurrentLinkedQueue<>(expectedPreCommitOutput); + expectedEndOutput.add(new StreamRecord<>(DummyWriter.LAST_ELEMENT)); + + final Consumer>> process = + FunctionUtils.uncheckedConsumer( + task -> { + task.processWatermark(initialTime); + task.processElement(1, initialTime + 1); + task.processElement(2, initialTime + 2); + }); + + final OperatorSubtaskState operatorSubtaskState = processElements( + null, + new StatelessWriterOperatorFactory<>(new StatelessWriterSink()), + process, + expectedPreCommitOutput, + expectedEndOutput); + + // test after restoring + processElements( + operatorSubtaskState, + new StatelessWriterOperatorFactory<>(new StatelessWriterSink()), + process, + expectedPreCommitOutput, + expectedEndOutput); + + } + + @Test + public void testStatefulWriter() throws Exception { + + final long initialTime = 0; + + final ConcurrentLinkedQueue expectedEndOutput1 = + new ConcurrentLinkedQueue<>( + Arrays.asList( + new StreamRecord<>(Tuple3.of(1, initialTime + 1, Long.MIN_VALUE)), + new StreamRecord<>(Tuple3.of(2, initialTime + 2, Long.MIN_VALUE)), + new StreamRecord<>(DummyWriter.LAST_ELEMENT))); + + final Consumer>> process1 = + FunctionUtils.uncheckedConsumer( + task -> { + task.processElement(new StreamRecord<>(1, initialTime + 1)); + task.processElement(new StreamRecord<>(2, initialTime + 2)); + }); + + final OperatorSubtaskState operatorSubtaskState = processElements(null, + new StatefulWriterOperatorFactory<>(new StatefulWriterSink()), + process1, + new ConcurrentLinkedQueue<>(), + expectedEndOutput1); + + final Consumer>> process2 = + FunctionUtils.uncheckedConsumer(task -> task.processElement(new StreamRecord<>(3, initialTime + 3))); + + final ConcurrentLinkedQueue expectedPreCommitOutput2 = + new ConcurrentLinkedQueue<>( + Arrays.asList( + new StreamRecord<>(Tuple3.of(1, initialTime + 1, Long.MIN_VALUE)), + new StreamRecord<>(Tuple3.of(2, initialTime + 2, Long.MIN_VALUE)), + new StreamRecord<>(Tuple3.of(3, initialTime + 3, Long.MIN_VALUE)))); + + final ConcurrentLinkedQueue expectedEndOutput2 = new ConcurrentLinkedQueue<>(expectedPreCommitOutput2); + expectedEndOutput2.add(new StreamRecord<>(DummyWriter.LAST_ELEMENT)); + + processElements(operatorSubtaskState, + new StatefulWriterOperatorFactory<>(new StatefulWriterSink()), + process2, + expectedPreCommitOutput2, + expectedEndOutput2); + } + + private OperatorSubtaskState processElements( + @Nullable OperatorSubtaskState restoredOperatorSubtaskState, + OneInputStreamOperatorFactory> factory, + Consumer>> process, + ConcurrentLinkedQueue expectedPreCommitOutput, + ConcurrentLinkedQueue expectedEndOutput) throws Exception { + + final OneInputStreamOperatorTestHarness> testHarness = + new OneInputStreamOperatorTestHarness<>(factory, IntSerializer.INSTANCE); + + if (restoredOperatorSubtaskState != null) { + testHarness.initializeState(restoredOperatorSubtaskState); + } + + testHarness.open(); + process.accept(testHarness); + //verify pre-commit output + testHarness.prepareSnapshotPreBarrier(1L); + final OperatorSubtaskState operatorSubtaskState = testHarness.snapshot(1L, 1L); + Assert.assertArrayEquals(expectedPreCommitOutput.toArray(), testHarness.getOutput().toArray()); + + //verify end-of-input output + testHarness.endInput(); + Assert.assertArrayEquals(expectedEndOutput.toArray(), testHarness.getOutput().toArray()); + + //verify the close + testHarness.close(); + AbstractWriterOperator> s = (AbstractWriterOperator>) testHarness.getOneInputOperator(); + DummyWriter writer = (DummyWriter) s.writer; + Assert.assertTrue(writer.isClosed()); + + return operatorSubtaskState; + } + + static final class StatelessWriterSink + implements TestSink, Tuple3, Void> { + + @Override + public Writer, Tuple3> createWriter( + InitContext context, List> states) { + return new DummyWriter(); + } + } + + static final class StatefulWriterSink + implements TestSink, Tuple3, Void> { + + @Override + public Writer, Tuple3> createWriter( + InitContext context, + List> states) { + return new DummyWriter(3, states); + } + + @Override + public Optional>> getWriterStateSerializer() { + return Optional.of(new WriterStateSerializer()); + } + } + + static final class DummyWriter + implements Writer, Tuple3> { + + static final Tuple3 LAST_ELEMENT = + Tuple3.of(Integer.MAX_VALUE, Long.MAX_VALUE, Long.MAX_VALUE); + + private boolean isClosed; + + private final int maxCacheElementNum; + + // element, timestamp, watermark + private List> elements; + + DummyWriter(int maxCacheElementNum, List> restoreElements) { + this.isClosed = false; + this.elements = new ArrayList<>(restoreElements); + this.maxCacheElementNum = maxCacheElementNum; + } + + DummyWriter() { + this(0, Collections.emptyList()); + } + + @Override + public void write(Integer element, Context context) { + elements.add(Tuple3.of(element, context.timestamp(), context.currentWatermark())); + } + + @Override + public List> prepareCommit(boolean flush) { + final List> r = elements; + if (flush) { + elements.add(LAST_ELEMENT); + return elements; + } else if (elements.size() >= maxCacheElementNum) { + elements = new ArrayList<>(); + return r; + } else { + return Collections.emptyList(); + } + } + + @Override + public List> snapshotState() { + return elements; + } + + @Override + public void close() { + isClosed = true; + } + + public boolean isClosed() { + return isClosed; + } + } + + static final class WriterStateSerializer implements SimpleVersionedSerializer> { + + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(Tuple3 tuple3) throws IOException { + return InstantiationUtil.serializeObject(tuple3); + + } + + @Override + public Tuple3 deserialize(int version, byte[] serialized) throws IOException { + try { + return InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to deserialize the writer's state.", e); + } + } + } + + interface TestSink extends Sink { + + @Override + default Optional> createCommitter() { + return Optional.empty(); + } + + @Override + default Optional> createGlobalCommitter() { + return Optional.empty(); + } + + @Override + default Optional> getCommittableSerializer() { + return Optional.empty(); + } + + @Override + default Optional> getGlobalCommittableSerializer() { + return Optional.empty(); + } + + @Override + default Optional> getWriterStateSerializer() { + return Optional.empty(); + } + } +}