Skip to content

Commit

Permalink
[FLINK-25055][network] Support listen and notify mechanism for remote…
Browse files Browse the repository at this point in the history
… partition request
  • Loading branch information
KarmaGYZ authored Oct 31, 2023
1 parent 81d559a commit 9c3b18b
Show file tree
Hide file tree
Showing 46 changed files with 2,046 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,23 @@
<td>String</td>
<td>The Netty transport type, either "nio" or "epoll". The "auto" means selecting the property mode automatically based on the platform. Note that the "epoll" mode can get better performance, less GC and have more advanced features which are only available on modern Linux.</td>
</tr>
<tr>
<td><h5>taskmanager.network.partition-request-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>Timeout for an individual partition request of remote input channels. The partition request will finally fail if the total wait time exceeds twice the value of <code class="highlighter-rouge">taskmanager.network.request-backoff.max</code>.</td>
</tr>
<tr>
<td><h5>taskmanager.network.request-backoff.initial</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>Minimum backoff in milliseconds for partition requests of input channels.</td>
<td>Minimum backoff in milliseconds for partition requests of local input channels.</td>
</tr>
<tr>
<td><h5>taskmanager.network.request-backoff.max</h5></td>
<td style="word-wrap: break-word;">10000</td>
<td>Integer</td>
<td>Maximum backoff in milliseconds for partition requests of input channels.</td>
<td>Maximum backoff in milliseconds for partition requests of local input channels.</td>
</tr>
<tr>
<td><h5>taskmanager.network.retries</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,17 +170,23 @@
<td>String</td>
<td>The Netty transport type, either "nio" or "epoll". The "auto" means selecting the property mode automatically based on the platform. Note that the "epoll" mode can get better performance, less GC and have more advanced features which are only available on modern Linux.</td>
</tr>
<tr>
<td><h5>taskmanager.network.partition-request-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>Timeout for an individual partition request of remote input channels. The partition request will finally fail if the total wait time exceeds twice the value of <code class="highlighter-rouge">taskmanager.network.request-backoff.max</code>.</td>
</tr>
<tr>
<td><h5>taskmanager.network.request-backoff.initial</h5></td>
<td style="word-wrap: break-word;">100</td>
<td>Integer</td>
<td>Minimum backoff in milliseconds for partition requests of input channels.</td>
<td>Minimum backoff in milliseconds for partition requests of local input channels.</td>
</tr>
<tr>
<td><h5>taskmanager.network.request-backoff.max</h5></td>
<td style="word-wrap: break-word;">10000</td>
<td>Integer</td>
<td>Maximum backoff in milliseconds for partition requests of input channels.</td>
<td>Maximum backoff in milliseconds for partition requests of local input channels.</td>
</tr>
<tr>
<td><h5>taskmanager.network.retries</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;

import java.time.Duration;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.TextElement.code;

/** The set of configuration options relating to network stack. */
@PublicEvolving
Expand Down Expand Up @@ -585,7 +589,7 @@ public class NettyShuffleEnvironmentOptions {
.defaultValue(100)
.withDeprecatedKeys("taskmanager.net.request-backoff.initial")
.withDescription(
"Minimum backoff in milliseconds for partition requests of input channels.");
"Minimum backoff in milliseconds for partition requests of local input channels.");

/** Maximum backoff for partition requests of input channels. */
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
Expand All @@ -595,7 +599,22 @@ public class NettyShuffleEnvironmentOptions {
.defaultValue(10000)
.withDeprecatedKeys("taskmanager.net.request-backoff.max")
.withDescription(
"Maximum backoff in milliseconds for partition requests of input channels.");
"Maximum backoff in milliseconds for partition requests of local input channels.");

/** The timeout for partition request listener in result partition manager. */
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
public static final ConfigOption<Duration> NETWORK_PARTITION_REQUEST_TIMEOUT =
key("taskmanager.network.partition-request-timeout")
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription(
Description.builder()
.text(
"Timeout for an individual partition request of remote input channels. "
+ "The partition request will finally fail if the total wait time exceeds "
+ "twice the value of %s.",
code(NETWORK_REQUEST_BACKOFF_MAX.key()))
.build());

// ------------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,6 +87,7 @@ public NettyShuffleEnvironment createShuffleEnvironment(
shuffleEnvironmentContext.getEventPublisher(),
shuffleEnvironmentContext.getParentMetricGroup(),
shuffleEnvironmentContext.getIoExecutor(),
shuffleEnvironmentContext.getScheduledExecutor(),
shuffleEnvironmentContext.getNumberOfSlots(),
shuffleEnvironmentContext.getTmpDirPaths());
}
Expand All @@ -97,13 +99,15 @@ static NettyShuffleEnvironment createNettyShuffleEnvironment(
TaskEventPublisher taskEventPublisher,
MetricGroup metricGroup,
Executor ioExecutor,
ScheduledExecutor scheduledExecutor,
int numberOfSlots,
String[] tmpDirPaths) {
return createNettyShuffleEnvironment(
config,
taskExecutorResourceId,
taskEventPublisher,
new ResultPartitionManager(),
new ResultPartitionManager(
config.getPartitionRequestListenerTimeout(), scheduledExecutor),
metricGroup,
ioExecutor,
numberOfSlots,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.apache.flink.runtime.io.network;

import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
Expand All @@ -34,12 +38,36 @@
*/
public interface NetworkSequenceViewReader {

void requestSubpartitionView(
/**
* When the netty server receives the downstream task's partition request and the upstream task
* has registered its partition, it will process the partition request immediately, otherwise it
* will create a {@link PartitionRequestListener} for given {@link ResultPartitionID} in {@link
* ResultPartitionManager} and notify the listener when the upstream task registers its
* partition.
*
* @param partitionProvider the result partition provider
* @param resultPartitionId the result partition id
* @param subPartitionIndex the sub partition index
* @throws IOException the thrown exception
*/
void requestSubpartitionViewOrRegisterListener(
ResultPartitionProvider partitionProvider,
ResultPartitionID resultPartitionId,
int subPartitionIndex)
throws IOException;

/**
* When the {@link ResultPartitionManager} registers {@link ResultPartition}, it will get the
* {@link PartitionRequestListener} via given {@link ResultPartitionID}, and create subpartition
* view reader for downstream task.
*
* @param partition the result partition
* @param subPartitionIndex the sub partition index
* @throws IOException the thrown exception
*/
void notifySubpartitionCreated(ResultPartition partition, int subPartitionIndex)
throws IOException;

@Nullable
BufferAndAvailability getNextBuffer() throws IOException;

Expand Down Expand Up @@ -91,4 +119,12 @@ void requestSubpartitionView(
InputChannelID getReceiverId();

void notifyNewBufferSize(int newBufferSize);

/**
* When the partition request from the given downstream task is timeout, it should notify the
* reader in netty server and send {@link PartitionNotFoundException} to the task.
*
* @param partitionRequestListener the timeout message of given {@link PartitionRequestListener}
*/
void notifyPartitionRequestTimeout(PartitionRequestListener partitionRequestListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionRequestListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
Expand All @@ -33,8 +35,10 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkState;

/**
* Simple wrapper for the subpartition view used in the new network credit-based mode.
Expand All @@ -55,6 +59,8 @@ class CreditBasedSequenceNumberingViewReader

private volatile ResultSubpartitionView subpartitionView;

private volatile PartitionRequestListener partitionRequestListener;

/**
* The status indicating whether this reader is already enqueued in the pipeline for
* transferring data or not.
Expand All @@ -78,27 +84,47 @@ class CreditBasedSequenceNumberingViewReader
}

@Override
public void requestSubpartitionView(
public void requestSubpartitionViewOrRegisterListener(
ResultPartitionProvider partitionProvider,
ResultPartitionID resultPartitionId,
int subPartitionIndex)
throws IOException {

synchronized (requestLock) {
if (subpartitionView == null) {
// This call can trigger a notification we have to
// schedule a separate task at the event loop that will
// start consuming this. Otherwise the reference to the
// view cannot be available in getNextBuffer().
this.subpartitionView =
partitionProvider.createSubpartitionView(
resultPartitionId, subPartitionIndex, this);
checkState(subpartitionView == null, "Subpartition already requested");
checkState(
partitionRequestListener == null, "Partition request listener already created");
partitionRequestListener =
new NettyPartitionRequestListener(
partitionProvider, this, subPartitionIndex, resultPartitionId);
// The partition provider will create subpartitionView if resultPartition is
// registered, otherwise it will register a listener of partition request to the result
// partition manager.
Optional<ResultSubpartitionView> subpartitionViewOptional =
partitionProvider.createSubpartitionViewOrRegisterListener(
resultPartitionId, subPartitionIndex, this, partitionRequestListener);
if (subpartitionViewOptional.isPresent()) {
this.subpartitionView = subpartitionViewOptional.get();
} else {
throw new IllegalStateException("Subpartition already requested");
// If the subpartitionView is not exist, it means that the requested partition is
// not registered.
return;
}
}

notifyDataAvailable();
requestQueue.notifyReaderCreated(this);
}

@Override
public void notifySubpartitionCreated(ResultPartition partition, int subPartitionIndex)
throws IOException {
synchronized (requestLock) {
checkState(subpartitionView == null, "Subpartition already requested");
subpartitionView = partition.createSubpartitionView(subPartitionIndex, this);
}

notifyDataAvailable();
requestQueue.notifyReaderCreated(this);
}

@Override
Expand Down Expand Up @@ -182,6 +208,12 @@ public void notifyNewBufferSize(int newBufferSize) {
subpartitionView.notifyNewBufferSize(newBufferSize);
}

@Override
public void notifyPartitionRequestTimeout(PartitionRequestListener partitionRequestListener) {
requestQueue.notifyPartitionRequestTimeout(partitionRequestListener);
this.partitionRequestListener = null;
}

@VisibleForTesting
int getNumCreditsAvailable() {
return numCreditsAvailable;
Expand Down Expand Up @@ -226,6 +258,9 @@ public Throwable getFailureCause() {

@Override
public void releaseAllResources() throws IOException {
if (partitionRequestListener != null) {
partitionRequestListener.releaseListener();
}
subpartitionView.releaseAllResources();
}

Expand Down
Loading

0 comments on commit 9c3b18b

Please sign in to comment.