Skip to content

Commit

Permalink
[FLINK-5716] [streaming] Make StreamSourceContexts aware of source id…
Browse files Browse the repository at this point in the history
…leness

This closes apache#3347.
  • Loading branch information
tzulitai committed Feb 22, 2017
1 parent 646490c commit b0f0f37
Show file tree
Hide file tree
Showing 29 changed files with 814 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,12 @@ public void emitWatermark(Watermark mark) {
block();
}

@Override
@Override
public void markAsTemporarilyIdle() {
throw new UnsupportedOperationException();
}

@Override
public Object getCheckpointLock() {
return new Object();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ public void emitWatermark(Watermark mark) {
block();
}

@Override
public void markAsTemporarilyIdle() {
throw new UnsupportedOperationException();
}

@Override
public Object getCheckpointLock() {
return new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ public void emitWatermark(Watermark mark) {
}
}


@Override
public void markAsTemporarilyIdle() {
throw new UnsupportedOperationException();
}

@Override
public Object getCheckpointLock() {
return checkpointLock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,12 @@ public void collectWithTimestamp(java.lang.String element, long timestamp) {

@Override
public void emitWatermark(Watermark mark) {
throw new UnsupportedOperationException();
}

@Override
public void markAsTemporarilyIdle() {
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ public void collectWithTimestamp(Tuple1<Integer> element, long timestamp) {

@Override
public void emitWatermark(Watermark mark) {
// ignore it
throw new UnsupportedOperationException();
}

@Override
public void markAsTemporarilyIdle() {
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,10 @@ public void collectWithTimestamp(TimestampedFileInputSplit element, long timesta
public void emitWatermark(Watermark mark) {
}

@Override
public void markAsTemporarilyIdle() {
}

@Override
public Object getCheckpointLock() {
return lock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,13 @@ public void open() throws Exception {
final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
this.readerContext = StreamSourceContexts.getSourceContext(
timeCharacteristic, getProcessingTimeService(), checkpointLock, output, watermarkInterval);
timeCharacteristic,
getProcessingTimeService(),
checkpointLock,
getContainingTask().getStreamStatusMaintainer(),
output,
watermarkInterval,
-1);

// and initialize the split reading thread
this.reader = new SplitReader<>(format, serializer, readerContext, checkpointLock, restoredReaderState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,20 @@ interface SourceContext<T> {
@PublicEvolving
void emitWatermark(Watermark mark);

/**
* Marks the source to be temporarily idle. This tells the system that this source will
* temporarily stop emitting records and watermarks for an indefinite amount of time. This
* is only relevant when running on {@link TimeCharacteristic#IngestionTime} and
* {@link TimeCharacteristic#EventTime}, allowing downstream tasks to advance their
* watermarks without the need to wait for watermarks from this source while it is idle.
*
* <p>Source functions should make a best effort to call this method as soon as they
* acknowledge themselves to be idle. The system will consider the source to resume activity
* again once {@link SourceContext#collect(T)}, {@link SourceContext#collectWithTimestamp(T, long)},
* or {@link SourceContext#emitWatermark(Watermark)} is called to emit elements or watermarks from the source.
*/
@PublicEvolving
void markAsTemporarilyIdle();

/**
* Returns the checkpoint lock. Please refer to the class-level comment in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

Expand Down Expand Up @@ -51,12 +52,15 @@ public StreamSource(SRC sourceFunction) {
this.chainingStrategy = ChainingStrategy.HEAD;
}

public void run(final Object lockingObject) throws Exception {
run(lockingObject, output);
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer) throws Exception {
run(lockingObject, streamStatusMaintainer, output);
}


public void run(final Object lockingObject, final Output<StreamRecord<OUT>> collector) throws Exception {
public void run(final Object lockingObject,
final StreamStatusMaintainer streamStatusMaintainer,
final Output<StreamRecord<OUT>> collector) throws Exception {

final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();

LatencyMarksEmitter latencyEmitter = null;
Expand All @@ -68,11 +72,17 @@ public void run(final Object lockingObject, final Output<StreamRecord<OUT>> coll
getOperatorConfig().getVertexID(),
getRuntimeContext().getIndexOfThisSubtask());
}

final long watermarkInterval = getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();

this.ctx = StreamSourceContexts.getSourceContext(
timeCharacteristic, getProcessingTimeService(), lockingObject, collector, watermarkInterval);
timeCharacteristic,
getProcessingTimeService(),
lockingObject,
streamStatusMaintainer,
collector,
watermarkInterval,
-1);

try {
userFunction.run(ctx);
Expand Down Expand Up @@ -108,7 +118,7 @@ public void cancel() {
/**
* Marks this source as canceled or stopped.
*
* <p>This indicates that any exit of the {@link #run(Object, Output)} method
* <p>This indicates that any exit of the {@link #run(Object, StreamStatusMaintainer, Output)} method
* cannot be interpreted as the result of a finite source.
*/
protected void markCanceledOrStopped() {
Expand Down
Loading

0 comments on commit b0f0f37

Please sign in to comment.