Skip to content

Commit

Permalink
Renamed ClusterScheduler to TaskSchedulerImpl for yarn and new-yarn
Browse files Browse the repository at this point in the history
  • Loading branch information
liguoqiang committed Dec 26, 2013
1 parent 56094bc commit 14fcef7
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.Logging
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -233,9 +234,9 @@ private[yarn] class YarnAllocationHandler(
// Note that the list we create below tries to ensure that not all containers end up within
// a host if there is a sufficiently large number of hosts/containers.
val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(dataLocalContainers)
allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(rackLocalContainers)
allocatedContainersToProcess ++= ClusterScheduler.prioritizeContainers(offRackContainers)
allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)

// Run each of the allocated containers.
for (container <- allocatedContainersToProcess) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import org.apache.spark._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.deploy.yarn.YarnAllocationHandler
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.TaskSchedulerImpl

/**
*
* This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {

def this(sc: SparkContext) = this(sc, new Configuration())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.scheduler.TaskSchedulerImpl

private[spark] class YarnClientSchedulerBackend(
scheduler: ClusterScheduler,
scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
import org.apache.spark.util.Utils
import org.apache.hadoop.conf.Configuration
import org.apache.spark.scheduler.TaskSchedulerImpl

/**
*
* This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {

logInfo("Created YarnClusterScheduler")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.spark.Logging
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -214,9 +215,9 @@ private[yarn] class YarnAllocationHandler(
// host if there are sufficiently large number of hosts/containers.

val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
allocatedContainers ++= ClusterScheduler.prioritizeContainers(dataLocalContainers)
allocatedContainers ++= ClusterScheduler.prioritizeContainers(rackLocalContainers)
allocatedContainers ++= ClusterScheduler.prioritizeContainers(offRackContainers)
allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)

// Run each of the allocated containers
for (container <- allocatedContainers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package org.apache.spark.scheduler.cluster
import org.apache.spark._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.deploy.yarn.YarnAllocationHandler
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils

/**
*
* This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
*/
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends ClusterScheduler(sc) {
private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {

def this(sc: SparkContext) = this(sc, new Configuration())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.spark.scheduler.cluster
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments}
import org.apache.spark.scheduler.TaskSchedulerImpl

private[spark] class YarnClientSchedulerBackend(
scheduler: ClusterScheduler,
scheduler: TaskSchedulerImpl,
sc: SparkContext)
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with Logging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark._
import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
import org.apache.spark.scheduler.ClusterScheduler
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils

/**
Expand All @@ -30,7 +30,7 @@ import org.apache.spark.util.Utils
* ApplicationMaster, etc. is done
*/
private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
extends ClusterScheduler(sc) {
extends TaskSchedulerImpl(sc) {

logInfo("Created YarnClusterScheduler")

Expand Down

0 comments on commit 14fcef7

Please sign in to comment.