Skip to content

Commit

Permalink
[FLINK-16438][yarn] YarnResourceManager starts workers with resources…
Browse files Browse the repository at this point in the history
… requested by SlotManager.

This means YarnResourceManager no longer:
- be aware of the default task executor resources
- assumes all workers are identical
  • Loading branch information
xintongsong authored and tillrohrmann committed Apr 25, 2020
1 parent a91057f commit f019356
Show file tree
Hide file tree
Showing 2 changed files with 304 additions and 71 deletions.
162 changes: 101 additions & 61 deletions flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
Expand Down Expand Up @@ -73,11 +74,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

/**
* The yarn implementation of the resource manager. Used when the system is started
Expand Down Expand Up @@ -117,11 +118,6 @@ public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode>
/** Client to communicate with the Node manager and launch TaskExecutor processes. */
private NMClientAsync nodeManagerClient;

/** The number of containers requested, but not yet granted. */
private int numPendingContainerRequests;

private final Resource resource;

private final WorkerSpecContainerResourceAdapter workerSpecContainerResourceAdapter;

public YarnResourceManager(
Expand Down Expand Up @@ -167,10 +163,8 @@ public YarnResourceManager(
}
yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
containerRequestHeartbeatIntervalMillis = flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
numPendingContainerRequests = 0;

this.webInterfaceUrl = webInterfaceUrl;
this.resource = Resource.newInstance(defaultMemoryMB, defaultTaskExecutorProcessSpec.getCpuCores().getValue().intValue());

this.workerSpecContainerResourceAdapter = new WorkerSpecContainerResourceAdapter(
flinkConfig,
Expand Down Expand Up @@ -312,16 +306,12 @@ protected void internalDeregisterApplication(

@Override
public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
Preconditions.checkArgument(Objects.equals(
workerResourceSpec,
WorkerResourceSpec.fromTaskExecutorProcessSpec(defaultTaskExecutorProcessSpec)));
requestYarnContainer();
return true;
return requestYarnContainer(workerResourceSpec);
}

@VisibleForTesting
Resource getContainerResource() {
return resource;
Optional<Resource> getContainerResource(WorkerResourceSpec workerResourceSpec) {
return workerSpecContainerResourceAdapter.tryComputeContainerResource(workerResourceSpec);
}

@Override
Expand Down Expand Up @@ -372,31 +362,66 @@ public void onContainersCompleted(final List<ContainerStatus> statuses) {
@Override
public void onContainersAllocated(List<Container> containers) {
runAsync(() -> {
log.info("Received {} containers with {} pending container requests.", containers.size(), numPendingContainerRequests);
final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();

// number of allocated containers can be larger than the number of pending container requests
final int numAcceptedContainers = Math.min(containers.size(), numPendingContainerRequests);
final List<Container> requiredContainers = containers.subList(0, numAcceptedContainers);
final List<Container> excessContainers = containers.subList(numAcceptedContainers, containers.size());
log.info("Received {} containers.", containers.size());

for (int i = 0; i < requiredContainers.size(); i++) {
removeContainerRequest(pendingRequestsIterator.next());
for (Map.Entry<Resource, List<Container>> entry : groupContainerByResource(containers).entrySet()) {
onContainersOfResourceAllocated(entry.getKey(), entry.getValue());
}

excessContainers.forEach(this::returnExcessContainer);
requiredContainers.forEach(this::startTaskExecutorInContainer);

// if we are waiting for no further containers, we can go to the
// regular heartbeat interval
if (numPendingContainerRequests <= 0) {
if (getNumPendingWorkers() <= 0) {
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
}
});
}

private void startTaskExecutorInContainer(Container container) {
private Map<Resource, List<Container>> groupContainerByResource(List<Container> containers) {
return containers.stream().collect(Collectors.groupingBy(Container::getResource));
}

private void onContainersOfResourceAllocated(Resource resource, List<Container> containers) {
final List<WorkerResourceSpec> pendingWorkerResourceSpecs =
workerSpecContainerResourceAdapter.getWorkerSpecs(resource).stream()
.flatMap(spec -> Collections.nCopies(getNumPendingWorkersFor(spec), spec).stream())
.collect(Collectors.toList());

int numPending = pendingWorkerResourceSpecs.size();
log.info("Received {} containers with resource {}, {} pending container requests.",
containers.size(),
resource,
numPending);

final Iterator<Container> containerIterator = containers.iterator();
final Iterator<WorkerResourceSpec> pendingWorkerSpecIterator = pendingWorkerResourceSpecs.iterator();
final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator =
getPendingRequestsAndCheckConsistency(resource, pendingWorkerResourceSpecs.size()).iterator();

int numAccepted = 0;
while (containerIterator.hasNext() && pendingWorkerSpecIterator.hasNext()) {
final WorkerResourceSpec workerResourceSpec = pendingWorkerSpecIterator.next();
final Container container = containerIterator.next();
final AMRMClient.ContainerRequest pendingRequest = pendingRequestsIterator.next();

notifyNewWorkerAllocated(workerResourceSpec);
startTaskExecutorInContainer(container, workerResourceSpec);
removeContainerRequest(pendingRequest, workerResourceSpec);

numAccepted++;
}
numPending -= numAccepted;

int numExcess = 0;
while (containerIterator.hasNext()) {
returnExcessContainer(containerIterator.next());
numExcess++;
}

log.info("Accepted {} requested containers, returned {} excess containers, {} pending container requests of resource {}.",
numAccepted, numExcess, numPending, resource);
}

private void startTaskExecutorInContainer(Container container, WorkerResourceSpec workerResourceSpec) {
final String containerIdStr = container.getId().toString();
final ResourceID resourceId = new ResourceID(containerIdStr);

Expand All @@ -406,7 +431,8 @@ private void startTaskExecutorInContainer(Container container) {
// Context information used to start a TaskExecutor Java process
ContainerLaunchContext taskExecutorLaunchContext = createTaskExecutorLaunchContext(
containerIdStr,
container.getNodeId().getHost());
container.getNodeId().getHost(),
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, workerResourceSpec));

nodeManagerClient.startContainerAsync(container, taskExecutorLaunchContext);
} catch (Throwable t) {
Expand All @@ -421,7 +447,7 @@ private void releaseFailedContainerAndRequestNewContainerIfRequired(ContainerId

final ResourceID resourceId = new ResourceID(containerId.toString());
// release the failed container
workerNodeMap.remove(resourceId);
YarnWorkerNode yarnWorkerNode = workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(containerId);
// and ask for a new one
requestYarnContainerIfRequired();
Expand All @@ -432,19 +458,20 @@ private void returnExcessContainer(Container excessContainer) {
resourceManagerClient.releaseAssignedContainer(excessContainer.getId());
}

private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) {
numPendingContainerRequests--;

log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests);

private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest, WorkerResourceSpec workerResourceSpec) {
log.info("Removing container request {}.", pendingContainerRequest);
resourceManagerClient.removeContainerRequest(pendingContainerRequest);
}

private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
final List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests = resourceManagerClient.getMatchingRequests(
RM_REQUEST_PRIORITY,
ResourceRequest.ANY,
getContainerResource());
private Collection<AMRMClient.ContainerRequest> getPendingRequestsAndCheckConsistency(Resource resource, int expectedNum) {
final Collection<Resource> equivalentResources = workerSpecContainerResourceAdapter.getEquivalentContainerResource(resource);
final List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests =
equivalentResources.stream()
.flatMap(equivalentResource -> resourceManagerClient.getMatchingRequests(
RM_REQUEST_PRIORITY,
ResourceRequest.ANY,
equivalentResource).stream())
.collect(Collectors.toList());

final Collection<AMRMClient.ContainerRequest> matchingContainerRequests;

Expand All @@ -456,8 +483,10 @@ private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
}

Preconditions.checkState(
matchingContainerRequests.size() == numPendingContainerRequests,
"The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", matchingContainerRequests.size(), numPendingContainerRequests);
matchingContainerRequests.size() == expectedNum,
"The RMClient's and YarnResourceManagers internal state about the number of pending container requests for resource %s has diverged. " +
"Number client's pending container requests %s != Number RM's pending container requests %s.",
resource, matchingContainerRequests.size(), expectedNum);

return matchingContainerRequests;
}
Expand Down Expand Up @@ -550,49 +579,60 @@ private static Tuple2<String, Integer> parseHostPort(String address) {
* Request new container if pending containers cannot satisfy pending slot requests.
*/
private void requestYarnContainerIfRequired() {
final int requiredTaskManagers = getNumberRequiredTaskManagers();

while (requiredTaskManagers > numPendingContainerRequests) {
requestYarnContainer();
for (Map.Entry<WorkerResourceSpec, Integer> requiredWorkersPerResourceSpec : getRequiredResources().entrySet()) {
final WorkerResourceSpec workerResourceSpec = requiredWorkersPerResourceSpec.getKey();
while (requiredWorkersPerResourceSpec.getValue() > getNumPendingWorkersFor(workerResourceSpec)) {
final boolean requestContainerSuccess = requestYarnContainer(workerResourceSpec);
Preconditions.checkState(requestContainerSuccess,
"Cannot request container for worker resource spec {}.", workerResourceSpec);
}
}
}

private void requestYarnContainer() {
resourceManagerClient.addContainerRequest(getContainerRequest());
private boolean requestYarnContainer(WorkerResourceSpec workerResourceSpec) {
Optional<Resource> containerResourceOptional = getContainerResource(workerResourceSpec);

// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
numPendingContainerRequests++;
if (containerResourceOptional.isPresent()) {
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));

log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
resource,
numPendingContainerRequests);
// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
int numPendingWorkers = notifyNewWorkerRequested(workerResourceSpec);

log.info("Requesting new TaskExecutor container with resource {}. Number pending workers of this resource is {}.",
workerResourceSpec,
numPendingWorkers);
return true;
} else {
return false;
}
}

@Nonnull
@VisibleForTesting
AMRMClient.ContainerRequest getContainerRequest() {
AMRMClient.ContainerRequest getContainerRequest(Resource containerResource) {
return new AMRMClient.ContainerRequest(
getContainerResource(),
containerResource,
null,
null,
RM_REQUEST_PRIORITY);
}

private ContainerLaunchContext createTaskExecutorLaunchContext(
String containerId,
String host) throws Exception {
String host,
TaskExecutorProcessSpec taskExecutorProcessSpec) throws Exception {

// init the ContainerLaunchContext
final String currDir = env.get(ApplicationConstants.Environment.PWD.key());

final ContaineredTaskManagerParameters taskManagerParameters =
ContaineredTaskManagerParameters.create(flinkConfig, defaultTaskExecutorProcessSpec);
ContaineredTaskManagerParameters.create(flinkConfig, taskExecutorProcessSpec);

log.info("TaskExecutor {} will be started on {} with {}.",
containerId,
host,
defaultTaskExecutorProcessSpec);
taskExecutorProcessSpec);

final Configuration taskManagerConfig = BootstrapTools.cloneConfiguration(flinkConfig);

Expand Down
Loading

0 comments on commit f019356

Please sign in to comment.