Skip to content

Commit

Permalink
[FLINK-33905][core] Unify the provision of metadata in SourceReaderCo…
Browse files Browse the repository at this point in the history
…ntext
  • Loading branch information
WencongLiu authored and xintongsong committed Jan 11, 2024
1 parent 0f3470d commit b9e9997
Show file tree
Hide file tree
Showing 13 changed files with 244 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public InputStatus pollNext(ReaderOutput output) throws Exception {
// trap END_OF_INPUT unless all sources have finished
LOG.info(
"End of input subtask={} sourceIndex={} {}",
readerContext.getIndexOfSubtask(),
readerContext.getTaskInfo().getIndexOfThisSubtask(),
currentSourceIndex,
currentReader);
// Signal the coordinator that this reader has consumed all input and the
Expand Down Expand Up @@ -140,7 +140,7 @@ public CompletableFuture<Void> isAvailable() {
public void addSplits(List<HybridSourceSplit> splits) {
LOG.info(
"Adding splits subtask={} sourceIndex={} currentReader={} {}",
readerContext.getIndexOfSubtask(),
readerContext.getTaskInfo().getIndexOfThisSubtask(),
currentSourceIndex,
currentReader,
splits);
Expand Down Expand Up @@ -168,7 +168,7 @@ public void notifyNoMoreSplits() {
}
LOG.debug(
"No more splits for subtask={} sourceIndex={} currentReader={}",
readerContext.getIndexOfSubtask(),
readerContext.getTaskInfo().getIndexOfThisSubtask(),
currentSourceIndex,
currentReader);
}
Expand All @@ -179,7 +179,7 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
SwitchSourceEvent sse = (SwitchSourceEvent) sourceEvent;
LOG.info(
"Switch source event: subtask={} sourceIndex={} source={}",
readerContext.getIndexOfSubtask(),
readerContext.getTaskInfo().getIndexOfThisSubtask(),
sse.sourceIndex(),
sse.source());
switchedSources.put(sse.sourceIndex(), sse.source());
Expand All @@ -197,7 +197,7 @@ public void close() throws Exception {
}
LOG.debug(
"Reader closed: subtask={} sourceIndex={} currentReader={}",
readerContext.getIndexOfSubtask(),
readerContext.getTaskInfo().getIndexOfThisSubtask(),
currentSourceIndex,
currentReader);
}
Expand All @@ -212,7 +212,7 @@ private void setCurrentReader(int index) {
}
LOG.debug(
"Reader closed: subtask={} sourceIndex={} currentReader={}",
readerContext.getIndexOfSubtask(),
readerContext.getTaskInfo().getIndexOfThisSubtask(),
currentSourceIndex,
currentReader);
}
Expand All @@ -230,7 +230,7 @@ private void setCurrentReader(int index) {
availabilityFuture.complete(null);
LOG.debug(
"Reader started: subtask={} sourceIndex={} {}",
readerContext.getIndexOfSubtask(),
readerContext.getTaskInfo().getIndexOfThisSubtask(),
currentSourceIndex,
reader);
// add restored splits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.datagen.source;

import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.ReaderOutput;
Expand All @@ -29,6 +30,7 @@
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.testutils.source.TestingTaskInfo;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
Expand Down Expand Up @@ -182,11 +184,6 @@ public String getLocalHostName() {
return "localhost";
}

@Override
public int getIndexOfSubtask() {
return 0;
}

@Override
public void sendSplitRequest() {}

Expand All @@ -199,8 +196,14 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}

@Override
public int currentParallelism() {
return 1;
public TaskInfo getTaskInfo() {
return new TestingTaskInfo.Builder()
.setTaskName("DummyTask")
.setMaxNumberOfParallelSubtasks(1)
.setIndexOfThisSubtask(0)
.setNumberOfParallelSubtasks(1)
.setAttemptNumber(0)
.build();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public GeneratorSourceReaderFactory(
@Override
public SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit> createReader(
SourceReaderContext readerContext) {
int parallelism = readerContext.currentParallelism();
int parallelism = readerContext.getTaskInfo().getNumberOfParallelSubtasks();
RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(parallelism);
return new RateLimitedSourceReader<>(
new GeneratingIteratorSourceReader<>(readerContext, generatorFunction),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.file.src;

import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -30,6 +31,7 @@
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
import org.apache.flink.connector.testutils.source.TestingTaskInfo;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputStatus;
Expand Down Expand Up @@ -195,6 +197,15 @@ public int read(byte[] b, int off, int len) throws IOException {

private static final class NoOpReaderContext implements SourceReaderContext {

private final TaskInfo taskInfo =
new TestingTaskInfo.Builder()
.setTaskName("NoOpTask")
.setMaxNumberOfParallelSubtasks(1)
.setIndexOfThisSubtask(0)
.setAttemptNumber(0)
.setNumberOfParallelSubtasks(1)
.build();

@Override
public SourceReaderMetricGroup metricGroup() {
return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
Expand All @@ -210,11 +221,6 @@ public String getLocalHostName() {
return "localhost";
}

@Override
public int getIndexOfSubtask() {
return 0;
}

@Override
public void sendSplitRequest() {}

Expand All @@ -227,8 +233,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}

@Override
public int currentParallelism() {
return 1;
public TaskInfo getTaskInfo() {
return taskInfo;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.api.connector.source;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
Expand All @@ -39,8 +41,19 @@ public interface SourceReaderContext {
*/
String getLocalHostName();

/** @return The index of this subtask. */
int getIndexOfSubtask();
/**
* Get the index of this subtask.
*
* @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be
* provided uniformly by {@link #getTaskInfo()}.
* @see <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
* FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a>
*/
@Deprecated
default int getIndexOfSubtask() {
return getTaskInfo().getIndexOfThisSubtask();
}

/**
* Sends a split request to the source's {@link SplitEnumerator}. This will result in a call to
Expand Down Expand Up @@ -68,8 +81,22 @@ public interface SourceReaderContext {
* Get the current parallelism of this Source.
*
* @return the parallelism of the Source.
* @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be
* provided uniformly by {@link #getTaskInfo()}.
* @see <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
* FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a>
*/
@Deprecated
default int currentParallelism() {
throw new UnsupportedOperationException();
return getTaskInfo().getNumberOfParallelSubtasks();
}

/**
* Get the meta information of current task.
*
* @return the task meta information.
*/
@PublicEvolving
TaskInfo getTaskInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.api.connector.source.lib;

import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
Expand Down Expand Up @@ -128,11 +130,6 @@ public String getLocalHostName() {
return "localhost";
}

@Override
public int getIndexOfSubtask() {
return 0;
}

@Override
public void sendSplitRequest() {}

Expand All @@ -145,8 +142,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}

@Override
public int currentParallelism() {
return 1;
public TaskInfo getTaskInfo() {
return new TaskInfoImpl("DummyTask", 1, 0, 1, 0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ public EventsGeneratorFunction(double errorProbability) {

@Override
public void open(SourceReaderContext readerContext) throws Exception {
final int range = Integer.MAX_VALUE / readerContext.currentParallelism();
min = range * readerContext.getIndexOfSubtask();
final int range =
Integer.MAX_VALUE / readerContext.getTaskInfo().getNumberOfParallelSubtasks();
min = range * readerContext.getTaskInfo().getIndexOfThisSubtask();
max = min + range;
generator = new EventsGenerator(errorProbability);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public SimpleVersionedSerializer<DummyCheckpoint> getEnumeratorCheckpointSeriali
public SourceReader<RowData, DummySplit> createReader(SourceReaderContext readerContext)
throws Exception {
Preconditions.checkState(
readerContext.currentParallelism() == 1,
readerContext.getTaskInfo().getNumberOfParallelSubtasks() == 1,
"SocketSource can only work with a parallelism of 1.");
deserializer.open(
new DeserializationSchema.InitializationContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
Expand Down Expand Up @@ -254,8 +255,6 @@ public void initReader() throws Exception {
return;
}

final int subtaskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();

final SourceReaderContext context =
new SourceReaderContext() {
@Override
Expand All @@ -273,11 +272,6 @@ public String getLocalHostName() {
return localHostname;
}

@Override
public int getIndexOfSubtask() {
return subtaskIndex;
}

@Override
public void sendSplitRequest() {
operatorEventGateway.sendEventToCoordinator(
Expand Down Expand Up @@ -308,8 +302,8 @@ public void registerReleaseHookIfAbsent(
}

@Override
public int currentParallelism() {
return getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
public TaskInfo getTaskInfo() {
return getRuntimeContext().getTaskInfo();
}
};

Expand Down
Loading

0 comments on commit b9e9997

Please sign in to comment.