Skip to content

Commit

Permalink
[FLINK-20270][refactor] Initialize reader in SourceOperator at an ear…
Browse files Browse the repository at this point in the history
…lier point.

That way we can access the reader during task setup and use it and its properties
during initialization of the SourceStreamTask.
  • Loading branch information
StephanEwen committed Nov 24, 2020
1 parent fc1bf51 commit 0de189c
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,27 @@ public SourceOperator(
this.emitProgressiveWatermarks = emitProgressiveWatermarks;
}

@Override
public void open() throws Exception {
/**
* Initializes the reader. The code from this method should ideally happen in the
* constructor or in the operator factory even. It has to happen here at a slightly
* later stage, because of the lazy metric initialization.
*
* <p>Calling this method explicitly is an optional way to have the reader
* initialization a bit earlier than in open(), as needed by the
* {@link org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask}
*
* <p>This code should move to the constructor once the metric groups are available
* at task setup time.
*/
public void initReader() throws Exception {
if (sourceReader != null) {
return;
}

final MetricGroup metricGroup = getMetricGroup();
assert metricGroup != null;

final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask();

final SourceReaderContext context = new SourceReaderContext() {
@Override
Expand All @@ -161,7 +179,7 @@ public String getLocalHostName() {

@Override
public int getIndexOfSubtask() {
return getRuntimeContext().getIndexOfThisSubtask();
return subtaskIndex;
}

@Override
Expand All @@ -175,22 +193,27 @@ public void sendSourceEventToCoordinator(SourceEvent event) {
}
};

sourceReader = readerFactory.apply(context);
}

@Override
public void open() throws Exception {
initReader();

// in the future when we this one is migrated to the "eager initialization" operator
// (StreamOperatorV2), then we should evaluate this during operator construction.
if (emitProgressiveWatermarks) {
eventTimeLogic = TimestampsAndWatermarks.createProgressiveEventTimeLogic(
watermarkStrategy,
metricGroup,
getMetricGroup(),
getProcessingTimeService(),
getExecutionConfig().getAutoWatermarkInterval());
} else {
eventTimeLogic = TimestampsAndWatermarks.createNoOpEventTimeLogic(
watermarkStrategy,
metricGroup);
getMetricGroup());
}

sourceReader = readerFactory.apply(context);

// restore the state if necessary.
final List<SplitT> splits = CollectionUtil.iterableToList(readerState.get());
if (!splits.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,15 @@ public SourceOperatorStreamTask(Environment env) throws Exception {
}

@Override
public void init() {
final StreamTaskInput<T> input = new StreamTaskSourceInput<>(mainOperator, 0, 0);
public void init() throws Exception {
final SourceOperator<T, ?> sourceOperator = this.mainOperator;
// reader initialization, which cannot happen in the constructor due to the
// lazy metric group initialization. We do this here now, rather than
// later (in open()) so that we can access the reader when setting up the
// input processors
sourceOperator.initReader();

final StreamTaskInput<T> input = new StreamTaskSourceInput<>(sourceOperator, 0, 0);

// The SourceOperatorStreamTask doesn't have any inputs, so there is no need for
// a WatermarkGauge on the input.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ public void setup() throws Exception {
@After
public void cleanUp() throws Exception {
operator.close();
if (((TestingSourceOperator<Integer>) operator).isReaderCreated()) {
assertTrue(mockSourceReader.isClosed());
}
assertTrue(mockSourceReader.isClosed());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ public class TestingSourceOperator<T> extends SourceOperator<T, MockSourceSplit
private final int subtaskIndex;
private final int parallelism;

private volatile boolean readerCreated;

public TestingSourceOperator(
SourceReader<T, MockSourceSplit> reader,
WatermarkStrategy<T> watermarkStrategy,
Expand Down Expand Up @@ -85,13 +83,13 @@ public TestingSourceOperator(
this.subtaskIndex = subtaskIndex;
this.parallelism = parallelism;
this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
this.readerCreated = false;
}

@Override
public void open() throws Exception {
super.open();
readerCreated = true;
// unchecked wrapping is okay to keep tests simpler
try {
initReader();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand All @@ -106,8 +104,4 @@ public ExecutionConfig getExecutionConfig() {
cfg.setAutoWatermarkInterval(100);
return cfg;
}

public boolean isReaderCreated() {
return readerCreated;
}
}

0 comments on commit 0de189c

Please sign in to comment.