Skip to content

Commit

Permalink
[FLINK-34237] Revert the breaking changes to SourceReaderContext
Browse files Browse the repository at this point in the history
This closes apache#24197
  • Loading branch information
WencongLiu authored and xintongsong committed Jan 26, 2024
1 parent 84444d5 commit d0829ba
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public InputStatus pollNext(ReaderOutput output) throws Exception {
// trap END_OF_INPUT unless all sources have finished
LOG.info(
"End of input subtask={} sourceIndex={} {}",
readerContext.getTaskInfo().getIndexOfThisSubtask(),
readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);
// Signal the coordinator that this reader has consumed all input and the
Expand Down Expand Up @@ -140,7 +140,7 @@ public CompletableFuture<Void> isAvailable() {
public void addSplits(List<HybridSourceSplit> splits) {
LOG.info(
"Adding splits subtask={} sourceIndex={} currentReader={} {}",
readerContext.getTaskInfo().getIndexOfThisSubtask(),
readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader,
splits);
Expand Down Expand Up @@ -168,7 +168,7 @@ public void notifyNoMoreSplits() {
}
LOG.debug(
"No more splits for subtask={} sourceIndex={} currentReader={}",
readerContext.getTaskInfo().getIndexOfThisSubtask(),
readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);
}
Expand All @@ -179,7 +179,7 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
SwitchSourceEvent sse = (SwitchSourceEvent) sourceEvent;
LOG.info(
"Switch source event: subtask={} sourceIndex={} source={}",
readerContext.getTaskInfo().getIndexOfThisSubtask(),
readerContext.getIndexOfSubtask(),
sse.sourceIndex(),
sse.source());
switchedSources.put(sse.sourceIndex(), sse.source());
Expand All @@ -197,7 +197,7 @@ public void close() throws Exception {
}
LOG.debug(
"Reader closed: subtask={} sourceIndex={} currentReader={}",
readerContext.getTaskInfo().getIndexOfThisSubtask(),
readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);
}
Expand All @@ -212,7 +212,7 @@ private void setCurrentReader(int index) {
}
LOG.debug(
"Reader closed: subtask={} sourceIndex={} currentReader={}",
readerContext.getTaskInfo().getIndexOfThisSubtask(),
readerContext.getIndexOfSubtask(),
currentSourceIndex,
currentReader);
}
Expand All @@ -230,7 +230,7 @@ private void setCurrentReader(int index) {
availabilityFuture.complete(null);
LOG.debug(
"Reader started: subtask={} sourceIndex={} {}",
readerContext.getTaskInfo().getIndexOfThisSubtask(),
readerContext.getIndexOfSubtask(),
currentSourceIndex,
reader);
// add restored splits
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.connector.datagen.source;

import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.source.ReaderOutput;
Expand All @@ -30,7 +29,6 @@
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.testutils.source.TestingTaskInfo;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
Expand Down Expand Up @@ -184,6 +182,11 @@ public String getLocalHostName() {
return "localhost";
}

@Override
public int getIndexOfSubtask() {
return 0;
}

@Override
public void sendSplitRequest() {}

Expand All @@ -196,14 +199,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}

@Override
public TaskInfo getTaskInfo() {
return new TestingTaskInfo.Builder()
.setTaskName("DummyTask")
.setMaxNumberOfParallelSubtasks(1)
.setIndexOfThisSubtask(0)
.setNumberOfParallelSubtasks(1)
.setAttemptNumber(0)
.build();
public int currentParallelism() {
return 1;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public GeneratorSourceReaderFactory(
@Override
public SourceReader<OUT, NumberSequenceSource.NumberSequenceSplit> createReader(
SourceReaderContext readerContext) {
int parallelism = readerContext.getTaskInfo().getNumberOfParallelSubtasks();
int parallelism = readerContext.currentParallelism();
RateLimiter rateLimiter = rateLimiterStrategy.createRateLimiter(parallelism);
return new RateLimitedSourceReader<>(
new GeneratingIteratorSourceReader<>(readerContext, generatorFunction),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.connector.file.src;

import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand All @@ -31,7 +30,6 @@
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.testutils.TestingFileSystem;
import org.apache.flink.connector.testutils.source.TestingTaskInfo;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.InputStatus;
Expand Down Expand Up @@ -197,15 +195,6 @@ public int read(byte[] b, int off, int len) throws IOException {

private static final class NoOpReaderContext implements SourceReaderContext {

private final TaskInfo taskInfo =
new TestingTaskInfo.Builder()
.setTaskName("NoOpTask")
.setMaxNumberOfParallelSubtasks(1)
.setIndexOfThisSubtask(0)
.setAttemptNumber(0)
.setNumberOfParallelSubtasks(1)
.build();

@Override
public SourceReaderMetricGroup metricGroup() {
return UnregisteredMetricsGroup.createSourceReaderMetricGroup();
Expand All @@ -221,6 +210,11 @@ public String getLocalHostName() {
return "localhost";
}

@Override
public int getIndexOfSubtask() {
return 0;
}

@Override
public void sendSplitRequest() {}

Expand All @@ -233,8 +227,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}

@Override
public TaskInfo getTaskInfo() {
return taskInfo;
public int currentParallelism() {
return 1;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.flink.api.connector.source;

import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.util.UserCodeClassLoader;
Expand All @@ -41,19 +39,8 @@ public interface SourceReaderContext {
*/
String getLocalHostName();

/**
* Get the index of this subtask.
*
* @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be
* provided uniformly by {@link #getTaskInfo()}.
* @see <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
* FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a>
*/
@Deprecated
default int getIndexOfSubtask() {
return getTaskInfo().getIndexOfThisSubtask();
}
/** @return The index of this subtask. */
int getIndexOfSubtask();

/**
* Sends a split request to the source's {@link SplitEnumerator}. This will result in a call to
Expand Down Expand Up @@ -81,22 +68,8 @@ default int getIndexOfSubtask() {
* Get the current parallelism of this Source.
*
* @return the parallelism of the Source.
* @deprecated This method is deprecated since Flink 1.19. All metadata about the task should be
* provided uniformly by {@link #getTaskInfo()}.
* @see <a
* href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-382%3A+Unify+the+Provision+of+Diverse+Metadata+for+Context-like+APIs">
* FLIP-382: Unify the Provision of Diverse Metadata for Context-like APIs </a>
*/
@Deprecated
default int currentParallelism() {
return getTaskInfo().getNumberOfParallelSubtasks();
throw new UnsupportedOperationException();
}

/**
* Get the meta information of current task.
*
* @return the task meta information.
*/
@PublicEvolving
TaskInfo getTaskInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.flink.api.connector.source.lib;

import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
Expand Down Expand Up @@ -130,6 +128,11 @@ public String getLocalHostName() {
return "localhost";
}

@Override
public int getIndexOfSubtask() {
return 0;
}

@Override
public void sendSplitRequest() {}

Expand All @@ -142,8 +145,8 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}

@Override
public TaskInfo getTaskInfo() {
return new TaskInfoImpl("DummyTask", 1, 0, 1, 0);
public int currentParallelism() {
return 1;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ public EventsGeneratorFunction(double errorProbability) {

@Override
public void open(SourceReaderContext readerContext) throws Exception {
final int range =
Integer.MAX_VALUE / readerContext.getTaskInfo().getNumberOfParallelSubtasks();
min = range * readerContext.getTaskInfo().getIndexOfThisSubtask();
final int range = Integer.MAX_VALUE / readerContext.currentParallelism();
min = range * readerContext.getIndexOfSubtask();
max = min + range;
generator = new EventsGenerator(errorProbability);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public SimpleVersionedSerializer<DummyCheckpoint> getEnumeratorCheckpointSeriali
public SourceReader<RowData, DummySplit> createReader(SourceReaderContext readerContext)
throws Exception {
Preconditions.checkState(
readerContext.getTaskInfo().getNumberOfParallelSubtasks() == 1,
readerContext.currentParallelism() == 1,
"SocketSource can only work with a parallelism of 1.");
deserializer.open(
new DeserializationSchema.InitializationContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.eventtime.WatermarkAlignmentParams;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
Expand Down Expand Up @@ -255,6 +254,8 @@ public void initReader() throws Exception {
return;
}

final int subtaskIndex = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();

final SourceReaderContext context =
new SourceReaderContext() {
@Override
Expand All @@ -272,6 +273,11 @@ public String getLocalHostName() {
return localHostname;
}

@Override
public int getIndexOfSubtask() {
return subtaskIndex;
}

@Override
public void sendSplitRequest() {
operatorEventGateway.sendEventToCoordinator(
Expand Down Expand Up @@ -302,8 +308,8 @@ public void registerReleaseHookIfAbsent(
}

@Override
public TaskInfo getTaskInfo() {
return getRuntimeContext().getTaskInfo();
public int currentParallelism() {
return getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@

package org.apache.flink.connector.testutils.source.reader;

import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.testutils.source.TestingTaskInfo;
import org.apache.flink.metrics.groups.SourceReaderMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.util.SimpleUserCodeClassLoader;
Expand Down Expand Up @@ -68,6 +66,11 @@ public String getLocalHostName() {
return "localhost";
}

@Override
public int getIndexOfSubtask() {
return 0;
}

@Override
public void sendSplitRequest() {
numSplitRequests++;
Expand All @@ -83,6 +86,11 @@ public UserCodeClassLoader getUserCodeClassLoader() {
return SimpleUserCodeClassLoader.create(getClass().getClassLoader());
}

@Override
public int currentParallelism() {
return 1;
}

// ------------------------------------------------------------------------

public int getNumSplitRequests() {
Expand All @@ -96,15 +104,4 @@ public List<SourceEvent> getSentEvents() {
public void clearSentEvents() {
sentEvents.clear();
}

@Override
public TaskInfo getTaskInfo() {
return new TestingTaskInfo.Builder()
.setTaskName("TestTask")
.setMaxNumberOfParallelSubtasks(1)
.setIndexOfThisSubtask(0)
.setAttemptNumber(0)
.setNumberOfParallelSubtasks(1)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<Long, LongSplit> createReader(SourceReaderContext readerContext) {
return new LongSourceReader(
readerContext.getTaskInfo().getIndexOfThisSubtask(),
readerContext.getIndexOfSubtask(),
minCheckpoints,
expectedRestarts,
checkpointingInterval,
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2320,7 +2320,6 @@ under the License.
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection)</exclude>
<exclude>org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#fromData(java.util.Collection,org.apache.flink.api.common.typeinfo.TypeInformation)</exclude>
<exclude>org.apache.flink.api.common.functions.RuntimeContext</exclude>
<exclude>org.apache.flink.api.connector.source.SourceReaderContext</exclude>
<!-- MARKER: end exclusions -->
</excludes>
<accessModifier>public</accessModifier>
Expand Down

0 comments on commit d0829ba

Please sign in to comment.