Skip to content

Commit

Permalink
[LIVY-412][SERVER] Reject create session request if too more sessions…
Browse files Browse the repository at this point in the history
… are creating.

## What changes were proposed in this pull request?

In our cluster, livy server run with spark yarn cluster mode, when createSession request is too frequently, livyServer will start too more spark-submit child process, it will cause the machine oom.

Reject the create session request when there are too many spark-submit child process.

## How was this patch tested?

Add two testcase, test create interactive and batch session when reach the max creating sessions, the request should failed.

Please review https://livy.incubator.apache.org/community/ before opening a pull request.

Author: 沈洪 <[email protected]>

Closes apache#58 from shenh062326/livy-412.
  • Loading branch information
沈洪 authored and jerryshao committed Nov 7, 2017
1 parent 5e0201f commit ef5dccb
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 12 deletions.
3 changes: 3 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/ContextLauncher.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ public ChildProcess(RSCConf conf, Promise<?> promise, final Process childProc, F
@Override
public void run() {
try {
RSCClientFactory.childProcesses().incrementAndGet();
int exitCode = child.waitFor();
if (exitCode != 0) {
LOG.warn("Child process exited with code {}.", exitCode);
Expand All @@ -385,6 +386,8 @@ public void run() {
child.destroy();
} catch (Exception e) {
LOG.warn("Exception while waiting for child process.", e);
} finally {
RSCClientFactory.childProcesses().decrementAndGet();
}
}
};
Expand Down
6 changes: 6 additions & 0 deletions rsc/src/main/java/org/apache/livy/rsc/RSCClientFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ public final class RSCClientFactory implements LivyClientFactory {

private final AtomicInteger refCount = new AtomicInteger();
private RpcServer server = null;
// interactive session child processes number
private static AtomicInteger iscpn = new AtomicInteger();

public static AtomicInteger childProcesses() {
return iscpn;
}

/**
* Creates a local Livy client if the URI has the "rsc" scheme.
Expand Down
2 changes: 2 additions & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ object LivyConf {
val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h")
// How long a finished session state will be kept in memory
val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s")
// Max creating session in livyServer
val SESSION_MAX_CREATION = Entry("livy.server.session.max-creation", 100)

val SPARK_MASTER = "spark.master"
val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
Expand Down
28 changes: 20 additions & 8 deletions server/src/main/scala/org/apache/livy/server/SessionServlet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.concurrent._
import scala.concurrent.duration._

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.rsc.RSCClientFactory
import org.apache.livy.server.batch.BatchSession
import org.apache.livy.sessions.{Session, SessionManager}
import org.apache.livy.sessions.Session.RecoveryMetadata

Expand All @@ -38,7 +40,7 @@ object SessionServlet extends Logging
*/
abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
private[livy] val sessionManager: SessionManager[S, R],
livyConf: LivyConf,
val livyConf: LivyConf,
accessManager: AccessManager)
extends JsonServlet
with ApiVersioningSupport
Expand Down Expand Up @@ -117,14 +119,24 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
}
}

def tooManySessions(): Boolean = {
val totalChildProceses = RSCClientFactory.childProcesses().get() +
BatchSession.childProcesses.get()
totalChildProceses >= livyConf.getInt(LivyConf.SESSION_MAX_CREATION)
}

post("/") {
val session = sessionManager.register(createSession(request))
// Because it may take some time to establish the session, update the last activity
// time before returning the session info to the client.
session.recordActivity()
Created(clientSessionView(session, request),
headers = Map("Location" ->
(getRequestPathInfo(request) + url(getSession, "id" -> session.id.toString))))
if (tooManySessions) {
BadRequest("Rejected, too many sessions are being created!")
} else {
val session = sessionManager.register(createSession(request))
// Because it may take some time to establish the session, update the last activity
// time before returning the session info to the client.
session.recordActivity()
Created(clientSessionView(session, request),
headers = Map("Location" ->
(getRequestPathInfo(request) + url(getSession, "id" -> session.id.toString))))
}
}

private def getRequestPathInfo(request: HttpServletRequest): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.livy.server.batch

import java.lang.ProcessBuilder.Redirect
import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.Random

import com.fasterxml.jackson.annotation.JsonIgnoreProperties

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.{LivyConf, Logging, Utils}
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.server.SessionServlet
import org.apache.livy.sessions.{Session, SessionState}
import org.apache.livy.sessions.Session._
import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}
Expand All @@ -42,6 +44,12 @@ case class BatchRecoveryMetadata(

object BatchSession extends Logging {
val RECOVERY_SESSION_TYPE = "batch"
// batch session child processes number
private val bscpn = new AtomicInteger

def childProcesses(): AtomicInteger = {
bscpn
}

def create(
id: Int,
Expand Down Expand Up @@ -85,6 +93,18 @@ object BatchSession extends Logging {
val file = resolveURIs(Seq(request.file), livyConf)(0)
val sparkSubmit = builder.start(Some(file), request.args)

Utils.startDaemonThread(s"batch-session-process-$id") {
childProcesses.incrementAndGet()
try {
sparkSubmit.waitFor() match {
case 0 =>
case exitCode =>
warn(s"spark-submit exited with code $exitCode")
}
} finally {
childProcesses.decrementAndGet()
}
}
SparkApp.create(appTag, None, Option(sparkSubmit), livyConf, Option(s))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import scala.concurrent.duration.Duration
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar.mock

import org.apache.livy.Utils
import org.apache.livy.{LivyConf, Utils}
import org.apache.livy.server.{AccessManager, BaseSessionServletSpec}
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.{BatchSessionManager, SessionState}
Expand Down Expand Up @@ -146,6 +146,29 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover
view.appInfo shouldEqual appInfo
view.log shouldEqual log
}
}

it("should fail session creation when max session creation is hit") {
val createRequest = new CreateBatchRequest()
createRequest.file = script.toString
createRequest.conf = Map("spark.driver.extraClassPath" -> sys.props("java.class.path"))

jpost[Map[String, Any]]("/", createRequest) { data =>
header("Location") should equal("/2")
data("id") should equal (2)

val batch = servlet.sessionManager.get(2)
batch should be (defined)
}

servlet.livyConf.set(LivyConf.SESSION_MAX_CREATION, 1)
jpost[Map[String, Any]]("/", createRequest, SC_BAD_REQUEST) { data => None }

jdelete[Map[String, Any]]("/2") { data =>
data should equal (Map("msg" -> "deleted"))

val batch = servlet.sessionManager.get(2)
batch should not be defined
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,16 @@ import java.util.concurrent.atomic.AtomicInteger
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}

import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.concurrent.Future

import org.json4s.jackson.Json4sScalaModule
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.Mockito.when
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.Entry
import org.scalatest.concurrent.Eventually._
import org.scalatest.mock.MockitoSugar.mock

import org.apache.livy.{ExecuteRequest, LivyConf}
Expand All @@ -52,6 +54,8 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
private var statements = IndexedSeq[Statement]()

override protected def createSession(req: HttpServletRequest): InteractiveSession = {
super.createSession(req)

val statementCounter = new AtomicInteger()

val session = mock[InteractiveSession]
Expand Down Expand Up @@ -183,4 +187,25 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
view.log shouldEqual log.asJava
}

private def waitSession(): Unit = {
eventually(timeout(1 minute), interval(100 millis)) {
servlet.tooManySessions should be(true)
}
}

it("should failed create session when too many creating session ") {
var id = 1
jpost[SessionInfo]("/", createRequest(inProcess = false)) { data =>
id = data.id
}

servlet.livyConf.set(LivyConf.SESSION_MAX_CREATION, 1)

waitSession
jpost[Map[String, Any]]("/", createRequest(), HttpServletResponse.SC_BAD_REQUEST) { data =>
None
}

jdelete[Map[String, Any]](s"/${id}") { _ => }
}
}

0 comments on commit ef5dccb

Please sign in to comment.