Skip to content

Commit

Permalink
SAMZA-767 - yarn.queue option is not used anywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksandar Pejakovic authored and navina committed Nov 19, 2015
1 parent 092e381 commit e8a2ef5
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public class YarnConfig extends MapConfig {
public static final String CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb";
private static final int DEFAULT_CONTAINER_MEM = 1024;

/**
* Name of YARN queue to run jobs on
*/
public static final String QUEUE_NAME = "yarn.queue";

/**
* Number of CPU cores to request from YARN per container
*/
Expand Down Expand Up @@ -144,6 +149,9 @@ public String getAmOpts() {
return get(AM_JVM_OPTIONS, "");
}

public String getQueueName() {
return get(QUEUE_NAME, null);
}

public String getAMJavaHome() {
return get(AM_JAVA_HOME, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ClientHelper(conf: Configuration) extends Logging {
/**
* Generate an application and submit it to the resource manager to start an application master
*/
def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String], queueName: Option[String]): Option[ApplicationId] = {
val app = yarnClient.createApplication
val newAppResponse = app.getNewApplicationResponse
var mem = memoryMb
Expand Down Expand Up @@ -106,6 +106,14 @@ class ClientHelper(conf: Configuration) extends Logging {
case None => None
}

queueName match {
case Some(queueName) => {
appCtx.setQueue(queueName)
info("set yarn queue name to %s" format queueName)
}
case None => None
}

// set the local package so that the containers and app master are provisioned with it
val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
}
envMapWithJavaHome
}),
Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1)))
)
Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))),
Option(yarnConfig.getQueueName))
this
}

Expand Down

0 comments on commit e8a2ef5

Please sign in to comment.