Skip to content

Commit

Permalink
[FLINK-15031][runtime] Netty Shuffle Service supports to announce fin…
Browse files Browse the repository at this point in the history
…e grained network buffer requirement

This closes apache#16173.
  • Loading branch information
jinxing64 authored and zhuzhurk committed Jul 6, 2021
1 parent a42d267 commit 2c52816
Show file tree
Hide file tree
Showing 17 changed files with 564 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class NettyShuffleServiceFactory

@Override
public NettyShuffleMaster createShuffleMaster(Configuration configuration) {
return NettyShuffleMaster.INSTANCE;
return new NettyShuffleMaster(configuration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolFactory;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ProcessorArchitecture;
import org.apache.flink.util.function.SupplierWithException;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -265,23 +267,17 @@ private static void releasePartitionsQuietly(ResultSubpartition[] partitions, in
SupplierWithException<BufferPool, IOException> createBufferPoolFactory(
int numberOfSubpartitions, ResultPartitionType type) {
return () -> {
int maxNumberOfMemorySegments =
type.isBounded()
? numberOfSubpartitions * networkBuffersPerChannel
+ floatingNetworkBuffersPerGate
: Integer.MAX_VALUE;
int numRequiredBuffers =
!type.isPipelined() && numberOfSubpartitions >= sortShuffleMinParallelism
? sortShuffleMinBuffers
: numberOfSubpartitions + 1;

// If the partition type is back pressure-free, we register with the buffer pool for
// callbacks to release memory.
Pair<Integer, Integer> pair =
NettyShuffleUtils.getMinMaxNetworkBuffersPerResultPartition(
networkBuffersPerChannel,
floatingNetworkBuffersPerGate,
sortShuffleMinParallelism,
sortShuffleMinBuffers,
numberOfSubpartitions,
type);

return bufferPoolFactory.createBufferPool(
numRequiredBuffers,
maxNumberOfMemorySegments,
numberOfSubpartitions,
maxBuffersPerChannel);
pair.getLeft(), pair.getRight(), numberOfSubpartitions, maxBuffersPerChannel);
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ public void requestPartitions() {
}

@VisibleForTesting
void convertRecoveredInputChannels() {
public void convertRecoveredInputChannels() {
LOG.debug("Converting recovered input channels ({} channels)", getNumberOfInputChannels());
for (Map.Entry<IntermediateResultPartitionID, InputChannel> entry :
inputChannels.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.function.SupplierWithException;

import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -238,9 +240,10 @@ static SupplierWithException<BufferPool, IOException> createBufferPoolFactory(
int floatingNetworkBuffersPerGate,
int size,
ResultPartitionType type) {
// Note that we should guarantee at-least one floating buffer for local channel state
// recovery.
return () -> bufferPoolFactory.createBufferPool(1, floatingNetworkBuffersPerGate);
Pair<Integer, Integer> pair =
NettyShuffleUtils.getMinMaxFloatingBuffersPerInputGate(
floatingNetworkBuffersPerGate);
return () -> bufferPoolFactory.createBufferPool(pair.getLeft(), pair.getRight());
}

/** Statistics of input channels. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,45 @@

package org.apache.flink.runtime.shuffle;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.PartitionConnectionInfo;
import org.apache.flink.runtime.util.ConfigurationParserUtils;

import java.util.concurrent.CompletableFuture;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** Default {@link ShuffleMaster} for netty and local file based shuffle implementation. */
public enum NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> {
INSTANCE;
public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> {

private final int buffersPerInputChannel;

private final int buffersPerInputGate;

private final int sortShuffleMinParallelism;

private final int sortShuffleMinBuffers;

private final int networkBufferSize;

public NettyShuffleMaster(Configuration conf) {
checkNotNull(conf);
buffersPerInputChannel =
conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
buffersPerInputGate =
conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE);
sortShuffleMinParallelism =
conf.getInteger(
NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM);
sortShuffleMinBuffers =
conf.getInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS);
networkBufferSize = ConfigurationParserUtils.getPageSize(conf);
}

@Override
public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(
Expand Down Expand Up @@ -58,4 +87,35 @@ private static PartitionConnectionInfo createConnectionInfo(
producerDescriptor, connectionIndex)
: LocalExecutionPartitionConnectionInfo.INSTANCE;
}

/**
* JM announces network memory requirement from the calculating result of this method. Please
* note that the calculating algorithm depends on both I/O details of a vertex and network
* configuration, e.g. {@link NettyShuffleEnvironmentOptions#NETWORK_BUFFERS_PER_CHANNEL} and
* {@link NettyShuffleEnvironmentOptions#NETWORK_EXTRA_BUFFERS_PER_GATE}, which means we should
* always keep the consistency of configurations between JM, RM and TM in fine-grained resource
* management, thus to guarantee that the processes of memory announcing and allocating respect
* each other.
*/
@Override
public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor desc) {
checkNotNull(desc);

int numTotalInputChannels =
desc.getInputChannelNums().values().stream().mapToInt(Integer::intValue).sum();
int numTotalInputGates = desc.getInputChannelNums().size();

int numRequiredNetworkBuffers =
NettyShuffleUtils.computeNetworkBuffersForAnnouncing(
buffersPerInputChannel,
buffersPerInputGate,
sortShuffleMinParallelism,
sortShuffleMinBuffers,
numTotalInputChannels,
numTotalInputGates,
desc.getSubpartitionNums(),
desc.getPartitionTypes());

return new MemorySize((long) networkBufferSize * numRequiredNetworkBuffers);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.shuffle;

import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;

import org.apache.commons.lang3.tuple.Pair;

import java.util.Map;

import static org.apache.flink.util.Preconditions.checkArgument;

/**
* Utils to calculate network memory requirement of a vertex from network configuration and details
* of input and output. The methods help to decide the volume of buffer pools when initializing
* shuffle environment and also guide network memory announcing in fine-grained resource management.
*/
public class NettyShuffleUtils {

public static Pair<Integer, Integer> getMinMaxFloatingBuffersPerInputGate(
final int numFloatingBuffersPerGate) {
// We should guarantee at-least one floating buffer for local channel state recovery.
return Pair.of(1, numFloatingBuffersPerGate);
}

public static Pair<Integer, Integer> getMinMaxNetworkBuffersPerResultPartition(
final int numBuffersPerChannel,
final int numFloatingBuffersPerGate,
final int sortShuffleMinParallelism,
final int sortShuffleMinBuffers,
final int numSubpartitions,
final ResultPartitionType type) {
int min =
type.isBlocking() && numSubpartitions >= sortShuffleMinParallelism
? sortShuffleMinBuffers
: numSubpartitions + 1;
int max =
type.isBounded()
? numSubpartitions * numBuffersPerChannel + numFloatingBuffersPerGate
: Integer.MAX_VALUE;
return Pair.of(min, max);
}

public static int computeNetworkBuffersForAnnouncing(
final int numBuffersPerChannel,
final int numFloatingBuffersPerGate,
final int sortShuffleMinParallelism,
final int sortShuffleMinBuffers,
final int numTotalInputChannels,
final int numTotalInputGates,
final Map<IntermediateDataSetID, Integer> subpartitionNums,
final Map<IntermediateDataSetID, ResultPartitionType> partitionTypes) {

// Each input channel will retain N exclusive network buffers, N = numBuffersPerChannel.
// Each input gate is guaranteed to have a number of floating buffers.
int requirmentForInputs =
numBuffersPerChannel * numTotalInputChannels
+ getMinMaxFloatingBuffersPerInputGate(numFloatingBuffersPerGate).getRight()
* numTotalInputGates;

int requirementForOutputs = 0;
for (IntermediateDataSetID dataSetId : subpartitionNums.keySet()) {
int numSubs = subpartitionNums.get(dataSetId);
checkArgument(partitionTypes.containsKey(dataSetId));
ResultPartitionType partitionType = partitionTypes.get(dataSetId);

requirementForOutputs +=
getNumBuffersToAnnounceForResultPartition(
partitionType,
numBuffersPerChannel,
numFloatingBuffersPerGate,
sortShuffleMinParallelism,
sortShuffleMinBuffers,
numSubs);
}

return requirmentForInputs + requirementForOutputs;
}

private static int getNumBuffersToAnnounceForResultPartition(
ResultPartitionType type,
int numBuffersPerChannel,
int floatingBuffersPerGate,
int sortShuffleMinParallelism,
int sortShuffleMinBuffers,
int numSubpartitions) {

Pair<Integer, Integer> minAndMax =
getMinMaxNetworkBuffersPerResultPartition(
numBuffersPerChannel,
floatingBuffersPerGate,
sortShuffleMinParallelism,
sortShuffleMinBuffers,
numSubpartitions,
type);

// In order to avoid network buffer request timeout (see FLINK-12852), we announce
// network buffer requirement by below:
// 1. For pipelined shuffle, the floating buffers may not be returned in time due to back
// pressure so we need to include all the floating buffers in the announcement, i.e. we
// should take the max value;
// 2. For blocking shuffle, it is back pressure free and floating buffers can be recycled
// in time, so that the minimum required buffers would be enough.
int ret = type.isPipelined() ? minAndMax.getRight() : minAndMax.getLeft();

if (ret == Integer.MAX_VALUE) {
// Should never reach this branch. Result partition will allocate an unbounded
// buffer pool only when type is ResultPartitionType.PIPELINED. But fine-grained
// resource management is disabled in such case.
throw new IllegalArgumentException(
"Illegal to announce network memory requirement as Integer.MAX_VALUE, partition type: "
+ type);
}
return ret;
}

/** Private default constructor to avoid being instantiated. */
private NettyShuffleUtils() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.shuffle;

import org.apache.flink.configuration.MemorySize;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -61,4 +63,16 @@ CompletableFuture<T> registerPartitionWithProducer(
* @param shuffleDescriptor shuffle descriptor of the result partition to release externally.
*/
void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor);

/**
* Compute shuffle memory size for a task with the given {@link TaskInputsOutputsDescriptor}.
*
* @param taskInputsOutputsDescriptor describes task inputs and outputs information for shuffle
* memory calculation.
* @return shuffle memory size for a task with the given {@link TaskInputsOutputsDescriptor}.
*/
default MemorySize computeShuffleMemorySizeForTask(
TaskInputsOutputsDescriptor taskInputsOutputsDescriptor) {
return MemorySize.ZERO;
}
}
Loading

0 comments on commit 2c52816

Please sign in to comment.