Skip to content

Commit

Permalink
[SPARK-18278][SCHEDULER] Spark on Kubernetes - Basic Scheduler Backend
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This is a stripped down version of the `KubernetesClusterSchedulerBackend` for Spark with the following components:
- Static Allocation of Executors
- Executor Pod Factory
- Executor Recovery Semantics

It's step 1 from the step-wise plan documented [here](apache-spark-on-k8s#441 (comment)).
This addition is covered by the [SPIP vote](http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-td22147.html) which passed on Aug 31 .

## How was this patch tested?

- The patch contains unit tests which are passing.
- Manual testing: `./build/mvn -Pkubernetes clean package` succeeded.
- It is a **subset** of the entire changelist hosted in http://github.com/apache-spark-on-k8s/spark which is in active use in several organizations.
- There is integration testing enabled in the fork currently [hosted by PepperData](spark-k8s-jenkins.pepperdata.org:8080) which is being moved over to RiseLAB CI.
- Detailed documentation on trying out the patch in its entirety is in: https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html

cc rxin felixcheung mateiz (shepherd)
k8s-big-data SIG members & contributors: mccheah ash211 ssuchter varunkatta kimoonkim erikerlandson liyinan926 tnachen ifilonenko

Author: Yinan Li <[email protected]>
Author: foxish <[email protected]>
Author: mcheah <[email protected]>

Closes apache#19468 from foxish/spark-kubernetes-3.
  • Loading branch information
liyinan926 authored and rxin committed Nov 29, 2017
1 parent 475a29f commit e9b2070
Show file tree
Hide file tree
Showing 22 changed files with 1,832 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ notifications:
# 5. Run maven install before running lint-java.
install:
- export MAVEN_SKIP_RC=1
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
- build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install

# 6. Run lint-java.
script:
Expand Down
6 changes: 6 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,12 @@ Copyright (C) 2011 Google Inc.
Apache Commons Pool
Copyright 1999-2009 The Apache Software Foundation

This product includes/uses Kubernetes & OpenShift 3 Java Client (https://github.com/fabric8io/kubernetes-client)
Copyright (C) 2015 Red Hat, Inc.

This product includes/uses OkHttp (https://github.com/square/okhttp)
Copyright (C) 2012 The Android Open Source Project

=========================================================================
== NOTICE file corresponding to section 4(d) of the Apache License, ==
== Version 2.0, in this case for the DataNucleus distribution. ==
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.scheduler.cluster

import org.apache.spark.SparkConf
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
import org.apache.spark.util.Utils

private[spark] object SchedulerBackendUtils {
val DEFAULT_NUMBER_EXECUTORS = 2

/**
* Getting the initial target number of executors depends on whether dynamic allocation is
* enabled.
* If not using dynamic allocation it gets the number of executors requested by the user.
*/
def getInitialTargetExecutorNumber(
conf: SparkConf,
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
if (Utils.isDynamicAllocationEnabled(conf)) {
val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
s"initial executor number $initialNumExecutors must between min executor number " +
s"$minNumExecutors and max executor number $maxNumExecutors")

initialNumExecutors
} else {
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
}
}
}
8 changes: 8 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,14 @@ def __hash__(self):
sbt_test_goals=["mesos/test"]
)

kubernetes = Module(
name="kubernetes",
dependencies=[],
source_file_regexes=["resource-managers/kubernetes/core"],
build_profile_flags=["-Pkubernetes"],
sbt_test_goals=["kubernetes/test"]
)

# The root module is a dummy module which is used to run all of the tests.
# No other modules should directly depend on this module.
root = Module(
Expand Down
4 changes: 2 additions & 2 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1438,10 +1438,10 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
<td>0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
<td>0.8 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
<td>
The minimum ratio of registered resources (registered resources / total expected resources)
(resources are executors in yarn mode, CPU cores in standalone mode and Mesos coarsed-grained
(resources are executors in yarn mode and Kubernetes mode, CPU cores in standalone mode and Mesos coarsed-grained
mode ['spark.cores.max' value is total expected resources for Mesos coarse-grained mode] )
to wait for before scheduling begins. Specified as a double between 0.0 and 1.0.
Regardless of whether the minimum ratio of resources has been reached,
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2664,6 +2664,13 @@
</modules>
</profile>

<profile>
<id>kubernetes</id>
<modules>
<module>resource-managers/kubernetes/core</module>
</modules>
</profile>

<profile>
<id>hive-thriftserver</id>
<modules>
Expand Down
8 changes: 4 additions & 4 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ object BuildCommons {
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects

val optionallyEnabledProjects@Seq(mesos, yarn,
val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
streamingFlumeSink, streamingFlume,
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
dockerIntegrationTests, hadoopCloud) =
Seq("mesos", "yarn",
Seq("kubernetes", "mesos", "yarn",
"streaming-flume-sink", "streaming-flume",
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
Expand Down Expand Up @@ -671,9 +671,9 @@ object Unidoc {
publish := {},

unidocProjectFilter in(ScalaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),

unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)
Expand Down
100 changes: 100 additions & 0 deletions resource-managers/kubernetes/core/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent_2.11</artifactId>
<version>2.3.0-SNAPSHOT</version>
<relativePath>../../../pom.xml</relativePath>
</parent>

<artifactId>spark-kubernetes_2.11</artifactId>
<packaging>jar</packaging>
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>3.0.0</kubernetes.client.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-client</artifactId>
<version>${kubernetes.client.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Required by kubernetes-client but we exclude it -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-yaml</artifactId>
<version>${fasterxml.jackson.version}</version>
</dependency>

<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- End of shaded deps. -->

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.8.1</version>
</dependency>

</dependencies>


<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit

private[spark] object Config extends Logging {

val KUBERNETES_NAMESPACE =
ConfigBuilder("spark.kubernetes.namespace")
.doc("The namespace that will be used for running the driver and executor pods. When using " +
"spark-submit in cluster mode, this can also be passed to spark-submit via the " +
"--kubernetes-namespace command line argument.")
.stringConf
.createWithDefault("default")

val EXECUTOR_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.executor.docker.image")
.doc("Docker image to use for the executors. Specify this using the standard Docker tag " +
"format.")
.stringConf
.createOptional

val DOCKER_IMAGE_PULL_POLICY =
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
.doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.")
.stringConf
.checkValues(Set("Always", "Never", "IfNotPresent"))
.createWithDefault("IfNotPresent")

val APISERVER_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
"spark.kubernetes.authenticate.driver.mounted"
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"

val KUBERNETES_SERVICE_ACCOUNT_NAME =
ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
.doc("Service account that is used when running the driver pod. The driver pod uses " +
"this service account when requesting executor pods from the API server. If specific " +
"credentials are given for the driver pod to use, the driver will favor " +
"using those credentials instead.")
.stringConf
.createOptional

// Note that while we set a default for this when we start up the
// scheduler, the specific default value is dynamically determined
// based on the executor memory.
val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
.doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This " +
"is memory that accounts for things like VM overheads, interned strings, other native " +
"overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
.bytesConf(ByteUnit.MiB)
.createOptional

val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."

val KUBERNETES_DRIVER_POD_NAME =
ConfigBuilder("spark.kubernetes.driver.pod.name")
.doc("Name of the driver pod.")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
.doc("Prefix to use in front of the executor pod names.")
.internal()
.stringConf
.createWithDefault("spark")

val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor allocation.")
.intConf
.checkValue(value => value > 0, "Allocation batch size should be a positive integer")
.createWithDefault(5)

val KUBERNETES_ALLOCATION_BATCH_DELAY =
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
.doc("Number of seconds to wait between each round of executor allocation.")
.longConf
.checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
.createWithDefault(1)

val KUBERNETES_EXECUTOR_LIMIT_CORES =
ConfigBuilder("spark.kubernetes.executor.limit.cores")
.doc("Specify the hard cpu limit for a single executor pod")
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
.doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
"before it is assumed that the executor failed.")
.intConf
.checkValue(value => value > 0, "Maximum attempts of checks of executor lost reason " +
"must be a positive integer")
.createWithDefault(10)

val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
}
Loading

0 comments on commit e9b2070

Please sign in to comment.