Skip to content

Commit

Permalink
[FLINK-17899][runtime] Integrate FLIP-126 Watermarks with FLIP-27 Sou…
Browse files Browse the repository at this point in the history
…rces
  • Loading branch information
StephanEwen committed May 27, 2020
1 parent 4bec7fb commit 54f1a4c
Show file tree
Hide file tree
Showing 22 changed files with 1,637 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Licensed to the Apache Software Foundation (ASF) under one

package org.apache.flink.connector.base.source.reader;

import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
Expand Down Expand Up @@ -63,7 +64,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;

/** The state of the splits. */
private final Map<String, SplitStateT> splitStates;
private final Map<String, SplitContext<T, SplitStateT>> splitStates;

/** The record emitter to handle the records read by the SplitReaders. */
protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void start() {
}

@Override
public InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception {
public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
splitFetcherManager.checkErrors();
// poll from the queue if the last element was successfully handled. Otherwise
// just pass the last element again.
Expand All @@ -133,14 +134,19 @@ public InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception {
// Process one record.
if (splitIter.hasNext()) {
// emit the record.
E record = splitIter.next();
recordEmitter.emitRecord(record, sourceOutput, splitStates.get(splitIter.currentSplitId()));
final E record = splitIter.next();
final SplitContext<T, SplitStateT> splitContext = splitStates.get(splitIter.currentSplitId());
final SourceOutput<T> splitOutput = splitContext.getOrCreateSplitOutput(output);
recordEmitter.emitRecord(record, splitOutput, splitContext.state);
LOG.trace("Emitted record: {}", record);
}
// Do some cleanup if the all the records in the current splitIter have been processed.
if (!splitIter.hasNext()) {
// First remove the state of the split.
splitIter.finishedSplitIds().forEach(splitStates::remove);
splitIter.finishedSplitIds().forEach((id) -> {
splitStates.remove(id);
output.releaseOutputForSplit(id);
});
// Handle the finished splits.
onSplitFinished(splitIter.finishedSplitIds());
// Prepare the return status based on the availability of the next element.
Expand Down Expand Up @@ -173,15 +179,15 @@ public CompletableFuture<Void> isAvailable() {
@Override
public List<SplitT> snapshotState() {
List<SplitT> splits = new ArrayList<>();
splitStates.forEach((id, state) -> splits.add(toSplitType(id, state)));
splitStates.forEach((id, context) -> splits.add(toSplitType(id, context.state)));
return splits;
}

@Override
public void addSplits(List<SplitT> splits) {
LOG.trace("Adding splits {}", splits);
// Initialize the state for each split.
splits.forEach(s -> splitStates.put(s.splitId(), initializedState(s)));
splits.forEach(s -> splitStates.put(s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
// Hand over the splits to the split fetcher to start fetch.
splitFetcherManager.addSplits(splits);
}
Expand All @@ -201,6 +207,8 @@ public void close() throws Exception {
splitFetcherManager.close(options.sourceReaderCloseTimeout);
}



// -------------------- Abstract method to allow different implementations ------------------
/**
* Handles the finished splits to clean the state if needed.
Expand Down Expand Up @@ -233,4 +241,25 @@ private InputStatus finishedOrAvailableLater() {
return InputStatus.NOTHING_AVAILABLE;
}
}

// ------------------ private helper classes ---------------------

private static final class SplitContext<T, SplitStateT> {

final String splitId;
final SplitStateT state;
SourceOutput<T> sourceOutput;

private SplitContext(String splitId, SplitStateT state) {
this.state = state;
this.splitId = splitId;
}

SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
if (sourceOutput == null) {
sourceOutput = mainOutput.createOutputForSplit(splitId);
}
return sourceOutput;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Licensed to the Apache Software Foundation (ASF) under one

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
Expand Down Expand Up @@ -170,7 +171,7 @@ private SourceReader<Integer, SplitT> consumeRecords(
/**
* A source output that validates the output.
*/
protected static class ValidatingSourceOutput implements SourceOutput<Integer> {
protected static class ValidatingSourceOutput implements ReaderOutput<Integer> {
private Set<Integer> consumedValues = new HashSet<>();
private int max = Integer.MIN_VALUE;
private int min = Integer.MAX_VALUE;
Expand Down Expand Up @@ -204,13 +205,17 @@ public int count() {
}

@Override
public void emitWatermark(Watermark watermark) {

}
public void emitWatermark(Watermark watermark) {}

@Override
public void markIdle() {
public void markIdle() {}

@Override
public SourceOutput<Integer> createOutputForSplit(String splitId) {
return this;
}

@Override
public void releaseOutputForSplit(String splitId) {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package org.apache.flink.api.connector.source;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.eventtime.Watermark;

/**
* The interface provided by Flink task to the {@link SourceReader} to emit records
* to downstream operators for message processing.
*/
@PublicEvolving
public interface ReaderOutput<T> extends SourceOutput<T> {

/**
* Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)};
*
* @param record the record to emit.
*/
@Override
void collect(T record);

/**
* Emit a record with timestamp.
*
* @param record the record to emit.
* @param timestamp the timestamp of the record.
*/
@Override
void collect(T record, long timestamp);

/**
* Emits the given watermark.
*
* <p>Emitting a watermark also implicitly marks the stream as <i>active</i>, ending
* previously marked idleness.
*/
@Override
void emitWatermark(Watermark watermark);

/**
* Marks this output as idle, meaning that downstream operations do not
* wait for watermarks from this output.
*
* <p>An output becomes active again as soon as the next watermark is emitted.
*/
@Override
void markIdle();

/**
* Creates a {@code SourceOutput} for a specific Source Split. Use these outputs if you want to
* run split-local logic, like watermark generation.
*
* <p>If a split-local output was already created for this split-ID, the method will return that instance,
* so that only one split-local output exists per split-ID.
*
* <p><b>IMPORTANT:</b> After the split has been finished, it is crucial to release the created
* output again. Otherwise it will continue to contribute to the watermark generation like a
* perpetually stalling source split, and may hold back the watermark indefinitely.
*
* @see #releaseOutputForSplit(String)
*/
SourceOutput<T> createOutputForSplit(String splitId);

/**
* Releases the {@code SourceOutput} created for the split with the given ID.
*
* @see #createOutputForSplit(String)
*/
void releaseOutputForSplit(String splitId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseab
*
* @return The InputStatus of the SourceReader after the method invocation.
*/
InputStatus pollNext(SourceOutput<T> sourceOutput) throws Exception;
InputStatus pollNext(ReaderOutput<T> output) throws Exception;

/**
* Checkpoint on the state of the source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ Licensed to the Apache Software Foundation (ASF) under one

package org.apache.flink.api.connector.source.mocks;

import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.core.io.InputStatus;

Expand Down Expand Up @@ -53,7 +53,7 @@ public void start() {
}

@Override
public InputStatus pollNext(SourceOutput<Integer> sourceOutput) throws Exception {
public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception {
boolean finished = true;
currentSplitIndex = 0;
// Find first splits with available records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
Expand All @@ -38,8 +39,10 @@
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.source.TimestampsAndWatermarks;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CollectionUtil;

import java.util.List;
Expand Down Expand Up @@ -81,30 +84,47 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
/** The event gateway through which this operator talks to its coordinator. */
private final OperatorEventGateway operatorEventGateway;

// ---- lazily initialized fields ----
/** The factory for timestamps and watermark generators. */
private final WatermarkStrategy<OUT> watermarkStrategy;

// ---- lazily initialized fields (these fields are the "hot" fields) ----

/** The source reader that does most of the work. */
private SourceReader<OUT, SplitT> sourceReader;

private ReaderOutput<OUT> currentMainOutput;

private DataOutput<OUT> lastInvokedOutput;

/** The state that holds the currently assigned splits. */
private ListState<SplitT> readerState;

/** The event time and watermarking logic. Ideally this would be eagerly passed into this operator,
* but we currently need to instantiate this lazily, because the metric groups exist only later. */
private TimestampsAndWatermarks<OUT> eventTimeLogic;

public SourceOperator(
Function<SourceReaderContext, SourceReader<OUT, SplitT>> readerFactory,
OperatorEventGateway operatorEventGateway,
SimpleVersionedSerializer<SplitT> splitSerializer) {
SimpleVersionedSerializer<SplitT> splitSerializer,
WatermarkStrategy<OUT> watermarkStrategy,
ProcessingTimeService timeService) {

this.readerFactory = checkNotNull(readerFactory);
this.operatorEventGateway = checkNotNull(operatorEventGateway);
this.splitSerializer = checkNotNull(splitSerializer);
this.watermarkStrategy = checkNotNull(watermarkStrategy);
this.processingTimeService = timeService;
}

@Override
public void open() throws Exception {
final MetricGroup metricGroup = getMetricGroup();

final SourceReaderContext context = new SourceReaderContext() {
@Override
public MetricGroup metricGroup() {
return getRuntimeContext().getMetricGroup();
return metricGroup;
}

@Override
Expand All @@ -113,6 +133,15 @@ public void sendSourceEventToCoordinator(SourceEvent event) {
}
};

// in the future when we support both batch and streaming modes for the source operator,
// and when this one is migrated to the "eager initialization" operator (StreamOperatorV2),
// then we should evaluate this during operator construction.
eventTimeLogic = TimestampsAndWatermarks.createStreamingEventTimeLogic(
watermarkStrategy,
metricGroup,
getProcessingTimeService(),
getExecutionConfig().getAutoWatermarkInterval());

sourceReader = readerFactory.apply(context);

// restore the state if necessary.
Expand All @@ -125,12 +154,31 @@ public void sendSourceEventToCoordinator(SourceEvent event) {
sourceReader.start();
// Register the reader to the coordinator.
registerReader();

eventTimeLogic.startPeriodicWatermarkEmits();
}

@Override
public void close() throws Exception {
eventTimeLogic.stopPeriodicWatermarkEmits();
super.close();
}

@Override
@SuppressWarnings("unchecked")
public InputStatus emitNext(DataOutput<OUT> output) throws Exception {
return sourceReader.pollNext((SourceOutput<OUT>) output);
// guarding an assumptions we currently make due to the fact that certain classes
// assume a constant output
assert lastInvokedOutput == output || lastInvokedOutput == null;

// short circuit the common case (every invocation except the first)
if (currentMainOutput != null) {
return sourceReader.pollNext(currentMainOutput);
}

// this creates a batch or streaming output based on the runtime mode
currentMainOutput = eventTimeLogic.createMainOutput(output);
lastInvokedOutput = output;
return sourceReader.pollNext(currentMainOutput);
}

@Override
Expand Down
Loading

0 comments on commit 54f1a4c

Please sign in to comment.