Skip to content

Commit

Permalink
[FLINK-19833] Rename Sink API Writer interface to SinkWriter
Browse files Browse the repository at this point in the history
This makes it more consistent with SourceReader.

This closes apache#13887.
  • Loading branch information
guoweiM authored and kl0u committed Nov 3, 2020
1 parent 0d7d8c8 commit 34d0a9c
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,27 @@
/**
* This interface lets the sink developer build a simple sink topology, which could guarantee the exactly once
* semantics in both batch and stream execution mode if there is a {@link Committer} or {@link GlobalCommitter}.
* 1. The {@link Writer} is responsible for producing the committable.
* 1. The {@link SinkWriter} is responsible for producing the committable.
* 2. The {@link Committer} is responsible for committing a single committable.
* 3. The {@link GlobalCommitter} is responsible for committing an aggregated committable, which we call the global
* committable. The {@link GlobalCommitter} is always executed with a parallelism of 1.
* Note: Developers need to ensure the idempotence of {@link Committer} and {@link GlobalCommitter}.
*
* @param <InputT> The type of the sink's input
* @param <CommT> The type of information needed to commit data staged by the sink
* @param <WriterStateT> The type of the writer's state
* @param <WriterStateT> The type of the sink writer's state
* @param <GlobalCommT> The type of the aggregated committable
*/
@Experimental
public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializable {

/**
* Create a {@link Writer}.
* Create a {@link SinkWriter}.
* @param context the runtime context.
* @param states the writer's state.
* @return A sink writer.
*/
Writer<InputT, CommT, WriterStateT> createWriter(InitContext context, List<WriterStateT> states);
SinkWriter<InputT, CommT, WriterStateT> createWriter(InitContext context, List<WriterStateT> states);

/**
* Creates a {@link Committer}.
Expand Down Expand Up @@ -78,7 +78,7 @@ public interface Sink<InputT, CommT, WriterStateT, GlobalCommT> extends Serializ
Optional<SimpleVersionedSerializer<WriterStateT>> getWriterStateSerializer();

/**
* The interface exposes some runtime info for creating a {@link Writer}.
* The interface exposes some runtime info for creating a {@link SinkWriter}.
*/
interface InitContext {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@
import java.util.List;

/**
* The {@code Writer} is responsible for writing data and handling any potential tmp area used to write yet un-staged
* The {@code SinkWriter} is responsible for writing data and handling any potential tmp area used to write yet un-staged
* data, e.g. in-progress files. The data (or metadata pointing to where the actual data is staged) ready to commit is
* returned to the system by the {@link #prepareCommit(boolean)}.
*
* @param <InputT> The type of the writer's input
* @param <InputT> The type of the sink writer's input
* @param <CommT> The type of information needed to commit data staged by the sink
* @param <WriterStateT> The type of the writer's state
*/
@Experimental
public interface Writer<InputT, CommT, WriterStateT> extends AutoCloseable {
public interface SinkWriter<InputT, CommT, WriterStateT> extends AutoCloseable {

/**
* Add an element to the writer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
Expand All @@ -37,9 +38,9 @@ Licensed to the Apache Software Foundation (ASF) under one
/**
* A {@link Transformation} for {@link Sink}.
*
* @param <InputT> The input type of the {@link org.apache.flink.api.connector.sink.Writer}
* @param <CommT> The committable type of the {@link org.apache.flink.api.connector.sink.Writer}
* @param <WriterStateT> The state type of the {@link org.apache.flink.api.connector.sink.Writer}
* @param <InputT> The input type of the {@link SinkWriter}
* @param <CommT> The committable type of the {@link SinkWriter}
* @param <WriterStateT> The state type of the {@link SinkWriter}
* @param <GlobalCommT> The global committable type of the {@link org.apache.flink.api.connector.sink.GlobalCommitter}
*/
@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.Writer;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
Expand All @@ -32,16 +32,16 @@
import java.util.List;

/**
* Abstract base class for operators that work with a {@link Writer}.
* Abstract base class for operators that work with a {@link SinkWriter}.
*
* <p>Sub-classes are responsible for creating the specific {@link Writer} by implementing {@link
* <p>Sub-classes are responsible for creating the specific {@link SinkWriter} by implementing {@link
* #createWriter()}.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
* @param <InputT> The input type of the {@link SinkWriter}.
* @param <CommT> The committable type of the {@link SinkWriter}.
*/
@Internal
abstract class AbstractWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
abstract class AbstractSinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<CommT>
implements OneInputStreamOperator<InputT, CommT>, BoundedOneInput {

private static final long serialVersionUID = 1L;
Expand All @@ -55,9 +55,9 @@ abstract class AbstractWriterOperator<InputT, CommT> extends AbstractStreamOpera
private Long currentWatermark;

/** The sink writer that does most of the work. */
protected Writer<InputT, CommT, ?> writer;
protected SinkWriter<InputT, CommT, ?> sinkWriter;

AbstractWriterOperator() {
AbstractSinkWriterOperator() {
this.context = new Context<>();
}

Expand All @@ -67,19 +67,19 @@ public void open() throws Exception {

this.currentWatermark = Long.MIN_VALUE;

writer = createWriter();
sinkWriter = createWriter();
}

@Override
public void processElement(StreamRecord<InputT> element) throws Exception {
context.element = element;
writer.write(element.getValue(), context);
sinkWriter.write(element.getValue(), context);
}

@Override
public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
super.prepareSnapshotPreBarrier(checkpointId);
sendCommittables(writer.prepareCommit(false));
sendCommittables(sinkWriter.prepareCommit(false));
}

@Override
Expand All @@ -90,13 +90,13 @@ public void processWatermark(Watermark mark) throws Exception {

@Override
public void endInput() throws Exception {
sendCommittables(writer.prepareCommit(true));
sendCommittables(sinkWriter.prepareCommit(true));
}

@Override
public void close() throws Exception {
super.close();
writer.close();
sinkWriter.close();
}

protected Sink.InitContext createInitContext() {
Expand All @@ -114,19 +114,19 @@ public MetricGroup metricGroup() {
}

/**
* Creates and returns a {@link Writer}.
* Creates and returns a {@link SinkWriter}.
*
* @throws Exception If creating {@link Writer} fail
* @throws Exception If creating {@link SinkWriter} fail
*/
abstract Writer<InputT, CommT, ?> createWriter() throws Exception;
abstract SinkWriter<InputT, CommT, ?> createWriter() throws Exception;

private void sendCommittables(final List<CommT> committables) {
for (CommT committable : committables) {
output.collect(new StreamRecord<>(committable));
}
}

private class Context<IN> implements Writer.Context {
private class Context<IN> implements SinkWriter.Context {

private StreamRecord<IN> element;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,28 @@

package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.connector.sink.Writer;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;

/**
* Base {@link OneInputStreamOperatorFactory} for subclasses of {@link AbstractWriterOperator}.
* Base {@link OneInputStreamOperatorFactory} for subclasses of {@link AbstractSinkWriterOperator}.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
* @param <InputT> The input type of the {@link SinkWriter}.
* @param <CommT> The committable type of the {@link SinkWriter}.
*/
abstract class AbstractWriterOperatorFactory<InputT, CommT> extends AbstractStreamOperatorFactory<CommT>
abstract class AbstractSinkWriterOperatorFactory<InputT, CommT> extends AbstractStreamOperatorFactory<CommT>
implements OneInputStreamOperatorFactory<InputT, CommT> {

@Override
@SuppressWarnings("unchecked")
public <T extends StreamOperator<CommT>> T createStreamOperator(StreamOperatorParameters<CommT> parameters) {
final AbstractWriterOperator<InputT, CommT> writerOperator = createWriterOperator();
final AbstractSinkWriterOperator<InputT, CommT> writerOperator = createWriterOperator();
writerOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
return (T) writerOperator;
}

abstract AbstractWriterOperator<InputT, CommT> createWriterOperator();
abstract AbstractSinkWriterOperator<InputT, CommT> createWriterOperator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.Writer;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
Expand All @@ -34,20 +34,20 @@

/**
* Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
* Writer Writers} that have state.
* SinkWriter Writers} that have state.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
* @param <WriterStateT> The type of the {@link Writer Writer's} state.
* @param <InputT> The input type of the {@link SinkWriter}.
* @param <CommT> The committable type of the {@link SinkWriter}.
* @param <WriterStateT> The type of the {@link SinkWriter Writer's} state.
*/
@Internal
final class StatefulWriterOperator<InputT, CommT, WriterStateT> extends AbstractWriterOperator<InputT, CommT> {
final class StatefulSinkWriterOperator<InputT, CommT, WriterStateT> extends AbstractSinkWriterOperator<InputT, CommT> {

/** The operator's state descriptor. */
private static final ListStateDescriptor<byte[]> WRITER_RAW_STATES_DESC =
new ListStateDescriptor<>("writer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
new ListStateDescriptor<>("writer_raw_states", BytePrimitiveArraySerializer.INSTANCE);

/** Used to create the stateful {@link Writer}. */
/** Used to create the stateful {@link SinkWriter}. */
private final Sink<InputT, CommT, WriterStateT, ?> sink;

/** The writer operator's state serializer. */
Expand All @@ -58,9 +58,9 @@ final class StatefulWriterOperator<InputT, CommT, WriterStateT> extends Abstract
/** The operator's state. */
private ListState<WriterStateT> writerState;

StatefulWriterOperator(
final Sink<InputT, CommT, WriterStateT, ?> sink,
final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer) {
StatefulSinkWriterOperator(
final Sink<InputT, CommT, WriterStateT, ?> sink,
final SimpleVersionedSerializer<WriterStateT> writerStateSimpleVersionedSerializer) {
this.sink = sink;
this.writerStateSimpleVersionedSerializer = writerStateSimpleVersionedSerializer;
}
Expand All @@ -76,11 +76,11 @@ public void initializeState(StateInitializationContext context) throws Exception
@SuppressWarnings("unchecked")
@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
writerState.update((List<WriterStateT>) writer.snapshotState());
writerState.update((List<WriterStateT>) sinkWriter.snapshotState());
}

@Override
Writer<InputT, CommT, WriterStateT> createWriter() throws Exception {
SinkWriter<InputT, CommT, WriterStateT> createWriter() throws Exception {
final List<WriterStateT> committables = CollectionUtil.iterableToList(writerState.get());
return sink.createWriter(createInitContext(), committables);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,32 @@
package org.apache.flink.streaming.runtime.operators.sink;

import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.Writer;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.streaming.api.operators.StreamOperator;

/**
* A {@link org.apache.flink.streaming.api.operators.StreamOperatorFactory} for {@link
* StatefulWriterOperator}.
* StatefulSinkWriterOperator}.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
* @param <WriterStateT> The type of the {@link Writer Writer's} state.
* @param <InputT> The input type of the {@link SinkWriter}.
* @param <CommT> The committable type of the {@link SinkWriter}.
* @param <WriterStateT> The type of the {@link SinkWriter Writer's} state.
*/
public final class StatefulWriterOperatorFactory<InputT, CommT, WriterStateT> extends AbstractWriterOperatorFactory<InputT, CommT> {
public final class StatefulSinkWriterOperatorFactory<InputT, CommT, WriterStateT> extends AbstractSinkWriterOperatorFactory<InputT, CommT> {

private final Sink<InputT, CommT, WriterStateT, ?> sink;

public StatefulWriterOperatorFactory(Sink<InputT, CommT, WriterStateT, ?> sink) {
public StatefulSinkWriterOperatorFactory(Sink<InputT, CommT, WriterStateT, ?> sink) {
this.sink = sink;
}

@Override
AbstractWriterOperator<InputT, CommT> createWriterOperator() {
return new StatefulWriterOperator<>(sink, sink.getWriterStateSerializer().get());
AbstractSinkWriterOperator<InputT, CommT> createWriterOperator() {
return new StatefulSinkWriterOperator<>(sink, sink.getWriterStateSerializer().get());
}

@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return StatefulWriterOperator.class;
return StatefulSinkWriterOperator.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,29 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.Writer;
import org.apache.flink.api.connector.sink.SinkWriter;

import java.util.Collections;

/**
* Runtime {@link org.apache.flink.streaming.api.operators.StreamOperator} for executing {@link
* Writer Writers} that don't have state.
* SinkWriter Writers} that don't have state.
*
* @param <InputT> The input type of the {@link Writer}.
* @param <CommT> The committable type of the {@link Writer}.
* @param <InputT> The input type of the {@link SinkWriter}.
* @param <CommT> The committable type of the {@link SinkWriter}.
*/
@Internal
final class StatelessWriterOperator<InputT, CommT> extends AbstractWriterOperator<InputT, CommT>{
final class StatelessSinkWriterOperator<InputT, CommT> extends AbstractSinkWriterOperator<InputT, CommT> {

/** Used to create the stateless {@link Writer}. */
/** Used to create the stateless {@link SinkWriter}. */
private final Sink<InputT, CommT, ?, ?> sink;

StatelessWriterOperator(final Sink<InputT, CommT, ?, ?> sink) {
StatelessSinkWriterOperator(final Sink<InputT, CommT, ?, ?> sink) {
this.sink = sink;
}

@Override
Writer<InputT, CommT, ?> createWriter() {
SinkWriter<InputT, CommT, ?> createWriter() {
return sink.createWriter(createInitContext(), Collections.emptyList());
}
}
Loading

0 comments on commit 34d0a9c

Please sign in to comment.