Skip to content

Commit

Permalink
[hotfix] Deduplicate JavaDocs in SourceFunction
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Sep 7, 2021
1 parent ef52106 commit 0128976
Showing 1 changed file with 10 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.watermark.Watermark;

Expand Down Expand Up @@ -98,53 +99,17 @@
public interface SourceFunction<T> extends Function, Serializable {

/**
* Starts the source. Implementations can use the {@link SourceContext} emit elements.
* Starts the source. Implementations use the {@link SourceContext} to emit elements. Sources
* that checkpoint their state for fault tolerance should use the {@link
* SourceContext#getCheckpointLock()} checkpoint lock} to ensure consistency between the
* bookkeeping and emitting the elements.
*
* <p>Sources that implement {@link
* org.apache.flink.streaming.api.checkpoint.CheckpointedFunction} must lock on the checkpoint
* lock (using a synchronized block) before updating internal state and emitting elements, to
* make both an atomic operation:
* <p>Sources that implement {@link CheckpointedFunction} must lock on the {@link
* SourceContext#getCheckpointLock()} checkpoint lock} checkpoint lock (using a synchronized
* block) before updating internal state and emitting elements, to make both an atomic
* operation.
*
* <pre>{@code
* public class ExampleCountSource implements SourceFunction<Long>, CheckpointedFunction {
* private long count = 0L;
* private volatile boolean isRunning = true;
*
* private transient ListState<Long> checkpointedCount;
*
* public void run(SourceContext<T> ctx) {
* while (isRunning && count < 1000) {
* // this synchronized block ensures that state checkpointing,
* // internal state updates and emission of elements are an atomic operation
* synchronized (ctx.getCheckpointLock()) {
* ctx.collect(count);
* count++;
* }
* }
* }
*
* public void cancel() {
* isRunning = false;
* }
*
* public void initializeState(FunctionInitializationContext context) {
* this.checkpointedCount = context
* .getOperatorStateStore()
* .getListState(new ListStateDescriptor<>("count", Long.class));
*
* if (context.isRestored()) {
* for (Long count : this.checkpointedCount.get()) {
* this.count = count;
* }
* }
* }
*
* public void snapshotState(FunctionSnapshotContext context) {
* this.checkpointedCount.clear();
* this.checkpointedCount.add(count);
* }
* }
* }</pre>
* <p>Refer to the {@link SourceFunction top-level class docs} for an example.
*
* @param ctx The context to emit elements to and for accessing locks.
*/
Expand Down

0 comments on commit 0128976

Please sign in to comment.