Skip to content

Commit

Permalink
[FLINK-4139][yarn] adjust task slots to parallelism correctly
Browse files Browse the repository at this point in the history
- user specifies no parallelism
  -> parallelism is adjusted to #taskSlots * #nodes.

- user specifies parallelism but no #taskSlots or too few slots
  -> #taskSlots are set such that they meet the parallelism
  • Loading branch information
mxm committed Jul 1, 2016
1 parent 96590ff commit 44b3bc4
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ public void testValidConfig() {

@Test(expected = IllegalConfigurationException.class)
public void testInvalidConfigAndNoOption() throws Exception {
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getInvalidConfigDir());
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});

frontend.retrieveClient(options);
frontend.retrieveClient(options);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,6 @@ public void testManualOptionsOverridesYarn() throws Exception {

}


///////////
// Utils //
///////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,34 @@

package org.apache.flink.yarn;

import akka.actor.ActorSystem;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;

import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.test.util.TestBaseUtils;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -73,4 +86,86 @@ public void testDynamicProperties() throws IOException {
Assert.assertEquals(1, dynProperties.size());
Assert.assertEquals("5 min", dynProperties.get("akka.ask.timeout"));
}

@Test
public void testNotEnoughTaskSlots() throws Exception {

File confFile = tmp.newFile("flink-conf.yaml");
File jarFile = tmp.newFile("test.jar");
new CliFrontend(tmp.getRoot().getAbsolutePath());

String[] params =
new String[] {"-yn", "2", "-ys", "3", "-p", "7", jarFile.getAbsolutePath()};

RunOptions runOptions = CliFrontendParser.parseRunCommand(params);

FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");

AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());

// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
Assert.assertEquals(4, descriptor.getTaskManagerSlots());
Assert.assertEquals(2, descriptor.getTaskManagerCount());
}

@Test
public void testCorrectSettingOfMaxSlots() throws Exception {

File confFile = tmp.newFile("flink-conf.yaml");
File jarFile = tmp.newFile("test.jar");
new CliFrontend(tmp.getRoot().getAbsolutePath());

String[] params =
new String[] {"-yn", "2", "-ys", "3", jarFile.getAbsolutePath()};

RunOptions runOptions = CliFrontendParser.parseRunCommand(params);

FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");

AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor("", runOptions.getCommandLine());

// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
Assert.assertEquals(3, descriptor.getTaskManagerSlots());
Assert.assertEquals(2, descriptor.getTaskManagerCount());

Configuration config = new Configuration();
CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("test", 9000));
ClusterClient client = new TestingYarnClusterClient(descriptor, config);
Assert.assertEquals(6, client.getMaxSlots());
}

private static class TestCLI extends FlinkYarnSessionCli {

public TestCLI(String shortPrefix, String longPrefix) {
super(shortPrefix, longPrefix);
}

private static class JarAgnosticClusterDescriptor extends YarnClusterDescriptor {
@Override
public void setLocalJarPath(Path localJarPath) {
// setLocalJarPath("/tmp");
}
}

@Override
protected AbstractYarnClusterDescriptor getClusterDescriptor() {
return new JarAgnosticClusterDescriptor();
}
}

private static class TestingYarnClusterClient extends YarnClusterClient {

public TestingYarnClusterClient(AbstractYarnClusterDescriptor descriptor, Configuration config) throws IOException, YarnException {
super(descriptor,
Mockito.mock(YarnClient.class),
Mockito.mock(ApplicationReport.class),
config,
new Path("/tmp"), true);
}

@Override
protected ActorSystem createActorSystem() throws IOException {
return Mockito.mock(ActorSystem.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.YarnClusterClient;
Expand Down Expand Up @@ -332,12 +333,13 @@ public boolean accept(File dir, String name) {
int yarnTmSlots = yarnClusterDescriptor.getTaskManagerSlots();
if (yarnTmSlots == -1) {
yarnTmSlots = 1;
yarnClusterDescriptor.setTaskManagerSlots(yarnTmSlots);
}

int maxSlots = yarnTmSlots * yarnClusterDescriptor.getTaskManagerCount();
int userParallelism = Integer.valueOf(cmd.getOptionValue(CliFrontendParser.PARALLELISM_OPTION.getOpt(), "-1"));
if (userParallelism != -1) {
int slotsPerTM = userParallelism / yarnClusterDescriptor.getTaskManagerCount();
int slotsPerTM = (int) Math.ceil((double) userParallelism / yarnClusterDescriptor.getTaskManagerCount());
String message = "The YARN cluster has " + maxSlots + " slots available, " +
"but the user requested a parallelism of " + userParallelism + " on YARN. " +
"Each of the " + yarnClusterDescriptor.getTaskManagerCount() + " TaskManagers " +
Expand Down

0 comments on commit 44b3bc4

Please sign in to comment.