Skip to content

Commit

Permalink
[hotfix] Fix JavaDocs for SourceFunction
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Mar 3, 2016
1 parent 072da7d commit 7fbfab6
Showing 1 changed file with 119 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,33 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.TimestampAssigner;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.io.Serializable;

/**
* Base interface for all stream data sources in Flink. The contract of a stream source
* is the following: When the source should start emitting elements the {@link #run} method
* is called with a {@link org.apache.flink.util.Collector} that can be used for emitting elements.
* is the following: When the source should start emitting elements, the {@link #run} method
* is called with a {@link SourceContext} that can be used for emitting elements.
* The run method can run for as long as necessary. The source must, however, react to an
* invocation of {@link #cancel} by breaking out of its main loop.
* invocation of {@link #cancel()} by breaking out of its main loop.
*
* <p>
* <b>Note about checkpointed sources</b> <br>
* <h3>Checkpointed Sources</h3>
*
* Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
* <p>Sources that also implement the {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
* interface must ensure that state checkpointing, updating of internal state and emission of
* elements are not done concurrently. This is achieved by using the provided checkpointing lock
* object to protect update of state and emission of elements in a synchronized block.
* </p>
*
* <p>
* This is the basic pattern one should follow when implementing a (checkpointed) source:
* </p>
* <p>This is the basic pattern one should follow when implementing a (checkpointed) source:
*
* <pre>{@code
* public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
* private long count = 0L;
* private volatile boolean isRunning = true;
*
* {@literal @}Override
* public void run(SourceContext<T> ctx) {
* while (isRunning && count < 1000) {
* synchronized (ctx.getCheckpointLock()) {
Expand All @@ -60,108 +57,177 @@
* }
* }
*
* {@literal @}Override
* public void cancel() {
* isRunning = false;
* }
*
* {@literal @}Override
* public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; }
*
* {@literal @}Override
* public void restoreState(Long state) { this.count = state; }
* }
* }</pre>
*
*
* <p>
* <b>Note about element timestamps and watermarks:</b> <br>
* Sources must only manually emit watermarks when they implement
* {@link EventTimeSourceFunction }.
* Otherwise, elements automatically get the current timestamp assigned at ingress
* and the system automatically emits watermarks.
* <h3>Timestamps and watermarks:</h3>
*
* Sources may assign timestamps to elements and may manually emit watermarks.
* However, these are only interpreted if the streaming program runs on
* {@link TimeCharacteristic#EventTime}. On other time characteristics
* ({@link TimeCharacteristic#IngestionTime} and {@link TimeCharacteristic#ProcessingTime}),
* the watermarks from the source function are ignored.
*
* <h3>Gracefully Stopping Functions</h3>
*
* Functions may additionally implement the {@link org.apache.flink.api.common.functions.StoppableFunction}
* interface. "Stopping" a function, in contrast to "canceling" means a graceful exit that leaves the
* state and the emitted elements in a consistent state.
*
* <p>When a source is stopped, the executing thread is not interrupted, but expected to leave the
* {@link #run(SourceContext)} method in reasonable time on its own, preserving the atomicity
* of state updates and element emission.
*
* @param <T> The type of the elements produced by this source.
*
* @see org.apache.flink.api.common.functions.StoppableFunction
* @see org.apache.flink.streaming.api.TimeCharacteristic
*/
@Public
public interface SourceFunction<T> extends Function, Serializable {

/**
* Starts the source. You can use the {@link org.apache.flink.util.Collector} parameter to emit
* elements. Sources that implement
* {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} must lock on the
* checkpoint lock (using a synchronized block) before updating internal state and/or emitting
* elements. Also, the update of state and emission of elements must happen in the same
* synchronized block.
* Starts the source. Implementations can use the {@link SourceContext} emit
* elements.
*
* <p>Sources that implement {@link org.apache.flink.streaming.api.checkpoint.Checkpointed}
* must lock on the checkpoint lock (using a synchronized block) before updating internal
* state and emitting elements, to make both an atomic operation:
*
* @param ctx The context for interaction with the outside world.
* <pre>{@code
* public class ExampleSource<T> implements SourceFunction<T>, Checkpointed<Long> {
* private long count = 0L;
* private volatile boolean isRunning = true;
*
* public void run(SourceContext<T> ctx) {
* while (isRunning && count < 1000) {
* synchronized (ctx.getCheckpointLock()) {
* ctx.collect(count);
* count++;
* }
* }
* }
*
* public void cancel() {
* isRunning = false;
* }
*
* public Long snapshotState(long checkpointId, long checkpointTimestamp) { return count; }
*
* public void restoreState(Long state) { this.count = state; }
* }
* }</pre>
*
* @param ctx The context to emit elements to and for accessing locks.
*/
void run(SourceContext<T> ctx) throws Exception;

/**
* Cancels the source. Most sources will have a while loop inside the
* {@link #run} method. You need to ensure that the source will break out of this loop. This
* can be achieved by having a volatile field "isRunning" that is checked in the loop and that
* is set to false in this method.
* {@link #run(SourceContext)} method. The implementation needs to ensure that the
* source will break out of that loop after this method is called.
*
* <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to
* {@code false} in this method. That flag is checked in the loop condition.
*
* <p>When a source is canceled, the executing thread will also be interrupted
* (via {@link Thread#interrupt()}). The interruption happens strictly after this
* method has been called, so any interruption handler can rely on the fact that
* this method has completed. It is good practice to make any flags altered by
* this method "volatile", in order to guarantee the visibility of the effects of
* this method to any interruption handler.
*/
void cancel();

// ------------------------------------------------------------------------
// source context
// ------------------------------------------------------------------------

/**
* Interface that source functions use to communicate with the outside world. Normally
* sources would just emit elements in a loop using {@link #collect}. If the source is a
* {@link org.apache.flink.streaming.api.checkpoint.Checkpointed} source it must retrieve
* the checkpoint lock object and use it to protect state updates and element emission as
* described in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
* Interface that source functions use to emit elements, and possibly watermarks.
*
* @param <T> The type of the elements produced by the source.
*/
@Public // Interface might be extended in the future with additional methods.
interface SourceContext<T> {

/**
* Emits one element from the source. The result of {@link System#currentTimeMillis()} is set as
* the timestamp of the emitted element.
* Emits one element from the source, without attaching a timestamp. In most cases,
* this is the default way of emitting elements.
*
* <p>The timestamp that the element will get assigned depends on the time characteristic of
* the streaming program:
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the element has no timestamp.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the element gets the system's
* current time as the timestamp.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the element will have no timestamp initially.
* It needs to get a timestamp (via a {@link TimestampAssigner}) before any time-dependent
* operation (like time windows).</li>
* </ul>
*
* @param element The element to emit
*/
void collect(T element);

/**
* Emits one element from the source with the given timestamp.
* Emits one element from the source, and attaches the given timestamp. This method
* is relevant for programs using {@link TimeCharacteristic#EventTime}, where the
* sources assign timestamps themselves, rather than relying on a {@link TimestampAssigner}
* on the stream.
*
* <p>On certain time characteristics, this timestamp may be ignored or overwritten.
* This allows programs to switch between the different time characteristics and behaviors
* without changing the code of the source functions.
* <ul>
* <li>On {@link TimeCharacteristic#ProcessingTime}, the timestamp will be ignored,
* because processing time never works with element timestamps.</li>
* <li>On {@link TimeCharacteristic#IngestionTime}, the timestamp is overwritten with the
* system's current time, to realize proper ingestion time semantics.</li>
* <li>On {@link TimeCharacteristic#EventTime}, the timestamp will be used.</li>
* </ul>
*
* @param element The element to emit
* @param timestamp The timestamp in milliseconds
* @param timestamp The timestamp in milliseconds since the Epoch
*/
@PublicEvolving
void collectWithTimestamp(T element, long timestamp);

/**
* Emits the given {@link org.apache.flink.streaming.api.watermark.Watermark}.
*
* <p>
* <b>Important:</b>
* Sources must only manually emit watermarks when they implement
* {@link EventTimeSourceFunction}.
* Otherwise, elements automatically get the current timestamp assigned at ingress
* and the system automatically emits watermarks.
* Emits the given {@link Watermark}. A Watermark of value {@code t} declares that no
* elements with a timestamp {@code t' <= t} will occur any more. If further such
* elements will be emitted, those elements are considered <i>late</i>.
*
* <p>This method is only relevant when running on {@link TimeCharacteristic#EventTime}.
* On {@link TimeCharacteristic#ProcessingTime},Watermarks will be ignored. On
* {@link TimeCharacteristic#IngestionTime}, the Watermarks will be replaced by the
* automatic ingestion time watermarks.
*
* @param mark The {@link Watermark} to emit
* @param mark The Watermark to emit
*/
@PublicEvolving
void emitWatermark(Watermark mark);


/**
* Returns the checkpoint lock. Please refer to the explanation about checkpointed sources
* in {@link org.apache.flink.streaming.api.functions.source.SourceFunction}.
* Returns the checkpoint lock. Please refer to the class-level comment in
* {@link SourceFunction} for details about how to write a consistent checkpointed
* source.
*
* @return The object to use as the lock.
* @return The object to use as the lock
*/
Object getCheckpointLock();

/**
* This must be called when closing the source operator to allow the {@link SourceContext}
* to clean up internal state.
* This method is called by the system to shut down the context.
*/
void close();
}
Expand Down

0 comments on commit 7fbfab6

Please sign in to comment.