Skip to content

Commit

Permalink
[LIVY-494] Add thriftserver to Livy server
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

The PR adds a configuration parameter in order to startup also the thriftserver when starting Livy server.

Apart from this trivial change, other 3 main things were needed and are present in this PR:

 - Add the thriftserver JARs to the assembly and the livy-server script;
 - A small refactor in order to enforce impersonation in the `*Session` classes, instead of in the `*Servlet` ones, so that it is picked up by the thriftserver module too (this change is not strictly needed, but I consider it a better option that duplicating this logic in the thriftserver module too);
 - Creating a UGI from the configured keytab. This is needed because the thriftserver requires the UGI to be created from a keytab in order to work properly and previously Livy was using a UGI generated from the cached TGT (created by the `kinit` command).

## How was this patch tested?

Manual test: starting the server and having it up for more than 9 days.

Author: Marco Gaido <[email protected]>

Closes apache#107 from mgaido91/LIVY-494.
  • Loading branch information
mgaido91 authored and Marcelo Vanzin committed Sep 24, 2018
1 parent d39ab35 commit 134713d
Show file tree
Hide file tree
Showing 19 changed files with 236 additions and 73 deletions.
7 changes: 7 additions & 0 deletions assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,12 @@
<include>*</include>
</includes>
</fileSet>
<fileSet>
<directory>${project.parent.basedir}/thriftserver/server/target/jars</directory>
<outputDirectory>${assembly.name}/jars</outputDirectory>
<includes>
<include>*</include>
</includes>
</fileSet>
</fileSets>
</assembly>
11 changes: 11 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@
<assembly.format>tar.gz</assembly.format>
</properties>
</profile>

<profile>
<id>thriftserver</id>
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>livy-thriftserver</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
</profiles>

</project>
5 changes: 5 additions & 0 deletions bin/livy-server
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,15 @@ start_livy_server() {
LIBDIR="$LIVY_HOME/jars"
if [ ! -d "$LIBDIR" ]; then
LIBDIR="$LIVY_HOME/server/target/jars"
THRIFT_LIBDIR="$LIVY_HOME/thriftserver/server/target/jars"
fi
if [ ! -d "$LIBDIR" ]; then
echo "Could not find Livy jars directory." 1>&2
exit 1
else
if [ -d "$THRIFT_LIBDIR" ]; then
LIBDIR="$THRIFT_LIBDIR/*:$LIBDIR"
fi
fi

LIVY_CLASSPATH="$LIBDIR/*:$LIVY_CONF_DIR"
Expand Down
6 changes: 6 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<scope>${hadoop.scope}</scope>
<exclusions>
<exclusion>
<groupId>jline</groupId>
<artifactId>jline</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
Expand Down
1 change: 1 addition & 0 deletions server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ object LivyConf {
val LAUNCH_KERBEROS_REFRESH_INTERVAL = Entry("livy.server.launch.kerberos.refresh-interval", "1h")
val KINIT_FAIL_THRESHOLD = Entry("livy.server.launch.kerberos.kinit-fail-threshold", 5)

val THRIFT_SERVER_ENABLED = Entry("livy.server.thrift.enabled", false)
val THRIFT_INCR_COLLECT_ENABLED = Entry("livy.server.thrift.incrementalCollect", false)
val THRIFT_SESSION_CREATION_TIMEOUT = Entry("livy.server.thrift.session.creationTimeout", "10m")
val THRIFT_SERVER_JAR_LOCATION = Entry("livy.server.thrift.jarLocation", null)
Expand Down
46 changes: 46 additions & 0 deletions server/src/main/scala/org/apache/livy/server/AccessManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.livy.server

import java.security.AccessControlException

import org.apache.livy.{LivyConf, Logging}

private[livy] class AccessManager(conf: LivyConf) extends Logging {
Expand Down Expand Up @@ -94,4 +96,48 @@ private[livy] class AccessManager(conf: LivyConf) extends Logging {
* Check whether access control is enabled or not.
*/
def isAccessControlOn: Boolean = aclsOn

/**
* Checks that the requesting user can impersonate the target user.
* If the user does not have permission to impersonate, then throws an `AccessControlException`.
*
* @return The user that should be impersonated. That can be the target user if defined, the
* request's user - which may not be defined - otherwise, or `None` if impersonation is
* disabled.
*/
def checkImpersonation(
target: Option[String],
requestUser: String,
livyConf: LivyConf): Option[String] = {
if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
if (!target.forall(hasSuperAccess(_, requestUser))) {
throw new AccessControlException(
s"User '$requestUser' not allowed to impersonate '$target'.")
}
target.orElse(Option(requestUser))
} else {
None
}
}

/**
* Check that the requesting user has admin access to resources owned by the given target user.
*/
def hasSuperAccess(target: String, requestUser: String): Boolean = {
requestUser == target || checkSuperUser(requestUser)
}

/**
* Check that the request's user has modify access to resources owned by the given target user.
*/
def hasModifyAccess(target: String, requestUser: String): Boolean = {
requestUser == target || checkModifyPermissions(requestUser)
}

/**
* Check that the request's user has view access to resources owned by the given target user.
*/
def hasViewAccess(target: String, requestUser: String): Boolean = {
requestUser == target || checkViewPermissions(requestUser)
}
}
23 changes: 23 additions & 0 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class LivyServer extends Logging {
private var executor: ScheduledExecutorService = _
private var accessManager: AccessManager = _

private var ugi: UserGroupInformation = _

def start(): Unit = {
livyConf = new LivyConf().loadFromFile("livy.conf")
accessManager = new AccessManager(livyConf)
Expand Down Expand Up @@ -115,6 +117,16 @@ class LivyServer extends Logging {
error("Failed to run kinit, stopping the server.")
sys.exit(1)
}
// This is and should be the only place where a login() on the UGI is performed.
// If an other login in the codebase is strictly needed, a needLogin check should be added to
// avoid anyway that 2 logins are performed.
// This is needed because the thriftserver requires the UGI to be created from a keytab in
// order to work properly and previously Livy was using a UGI generated from the cached TGT
// (created by the kinit command).
if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) {
UserGroupInformation.loginUserFromKeytab(launch_principal, launch_keytab)
}
ugi = UserGroupInformation.getCurrentUser
startKinitThread(launch_keytab, launch_principal)
}

Expand Down Expand Up @@ -266,6 +278,11 @@ class LivyServer extends Logging {
}
})

if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) {
ThriftServerFactory.getInstance.start(
livyConf, interactiveSessionManager, sessionStore, accessManager)
}

_serverUrl = Some(s"${server.protocol}://${server.host}:${server.port}")
sys.props("livy.server.server-url") = _serverUrl.get
}
Expand All @@ -292,6 +309,12 @@ class LivyServer extends Logging {
new Runnable() {
override def run(): Unit = {
if (runKinit(keytab, principal)) {
// The current UGI should never change. If that happens, it is an error condition and
// relogin the original UGI would not update the current UGI. So the server will fail
// due to no valid credentials. The assert here allows to fast detect this error
// condition and fail immediately with a meaningful error.
assert(ugi.equals(UserGroupInformation.getCurrentUser), "Current UGI has changed.")
ugi.reloginFromTicketCache()
// schedule another kinit run with a fixed delay.
executor.schedule(this, refreshInterval, TimeUnit.MILLISECONDS)
} else {
Expand Down
56 changes: 6 additions & 50 deletions server/src/main/scala/org/apache/livy/server/SessionServlet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.livy.server

import java.security.AccessControlException
import javax.servlet.http.HttpServletRequest

import org.scalatra._
Expand Down Expand Up @@ -149,59 +150,14 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](

error {
case e: IllegalArgumentException => BadRequest(e.getMessage)
case e: AccessControlException => Forbidden(e.getMessage)
}

/**
* Returns the remote user for the given request. Separate method so that tests can override it.
*/
protected def remoteUser(req: HttpServletRequest): String = req.getRemoteUser()

/**
* Checks that the request's user can impersonate the target user.
*
* If the user does not have permission to impersonate, then halt execution.
*
* @return The user that should be impersonated. That can be the target user if defined, the
* request's user - which may not be defined - otherwise, or `None` if impersonation is
* disabled.
*/
protected def checkImpersonation(
target: Option[String],
req: HttpServletRequest): Option[String] = {
if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
if (!target.map(hasSuperAccess(_, req)).getOrElse(true)) {
halt(Forbidden(s"User '${remoteUser(req)}' not allowed to impersonate '$target'."))
}
target.orElse(Option(remoteUser(req)))
} else {
None
}
}

/**
* Check that the request's user has view access to resources owned by the given target user.
*/
protected def hasViewAccess(target: String, req: HttpServletRequest): Boolean = {
val user = remoteUser(req)
user == target || accessManager.checkViewPermissions(user)
}

/**
* Check that the request's user has modify access to resources owned by the given target user.
*/
protected def hasModifyAccess(target: String, req: HttpServletRequest): Boolean = {
val user = remoteUser(req)
user == target || accessManager.checkModifyPermissions(user)
}

/**
* Check that the request's user has admin access to resources owned by the given target user.
*/
protected def hasSuperAccess(target: String, req: HttpServletRequest): Boolean = {
val user = remoteUser(req)
user == target || accessManager.checkSuperUser(user)
}

/**
* Performs an operation on the session, without checking for ownership. Operations executed
* via this method must not modify the session in any way, or return potentially sensitive
Expand All @@ -214,22 +170,22 @@ abstract class SessionServlet[S <: Session, R <: RecoveryMetadata](
* session.
*/
protected def withViewAccessSession(fn: (S => Any)): Any =
doWithSession(fn, false, Some(hasViewAccess))
doWithSession(fn, false, Some(accessManager.hasViewAccess))

/**
* Performs an operation on the session, verifying whether the caller has view access of the
* session.
*/
protected def withModifyAccessSession(fn: (S => Any)): Any =
doWithSession(fn, false, Some(hasModifyAccess))
doWithSession(fn, false, Some(accessManager.hasModifyAccess))

private def doWithSession(fn: (S => Any),
allowAll: Boolean,
checkFn: Option[(String, HttpServletRequest) => Boolean]): Any = {
checkFn: Option[(String, String) => Boolean]): Any = {
val sessionId = params("id").toInt
sessionManager.get(sessionId) match {
case Some(session) =>
if (allowAll || checkFn.map(_(session.owner, request)).getOrElse(false)) {
if (allowAll || checkFn.map(_(session.owner, remoteUser(request))).getOrElse(false)) {
fn(session)
} else {
Forbidden()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.server

import org.apache.livy.LivyConf
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.InteractiveSessionManager

/**
* Its implementation starts Livy ThriftServer
*/
trait ThriftServerFactory {
def start(
livyConf: LivyConf,
livySessionManager: InteractiveSessionManager,
sessionStore: SessionStore,
accessManager: AccessManager): Unit
}

object ThriftServerFactory {
def getInstance: ThriftServerFactory = {
Class.forName("org.apache.livy.thriftserver.ThriftServerFactoryImpl").newInstance()
.asInstanceOf[ThriftServerFactory]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import scala.util.Random
import com.fasterxml.jackson.annotation.JsonIgnoreProperties

import org.apache.livy.{LivyConf, Logging, Utils}
import org.apache.livy.server.AccessManager
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.server.SessionServlet
import org.apache.livy.sessions.{Session, SessionState}
import org.apache.livy.sessions.Session._
import org.apache.livy.utils.{AppInfo, SparkApp, SparkAppListener, SparkProcessBuilder}
Expand Down Expand Up @@ -55,11 +55,12 @@ object BatchSession extends Logging {
id: Int,
request: CreateBatchRequest,
livyConf: LivyConf,
accessManager: AccessManager,
owner: String,
proxyUser: Option[String],
sessionStore: SessionStore,
mockApp: Option[SparkApp] = None): BatchSession = {
val appTag = s"livy-batch-$id-${Random.alphanumeric.take(8).mkString}"
val impersonatedUser = accessManager.checkImpersonation(request.proxyUser, owner, livyConf)

def createSparkApp(s: BatchSession): SparkApp = {
val conf = SparkApp.prepareSparkConf(
Expand All @@ -72,7 +73,7 @@ object BatchSession extends Logging {
val builder = new SparkProcessBuilder(livyConf)
builder.conf(conf)

proxyUser.foreach(builder.proxyUser)
impersonatedUser.foreach(builder.proxyUser)
request.className.foreach(builder.className)
request.driverMemory.foreach(builder.driverMemory)
request.driverCores.foreach(builder.driverCores)
Expand Down Expand Up @@ -116,7 +117,7 @@ object BatchSession extends Logging {
SessionState.Starting,
livyConf,
owner,
proxyUser,
impersonatedUser,
sessionStore,
mockApp.map { m => (_: BatchSession) => m }.getOrElse(createSparkApp))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,20 @@ class BatchSessionServlet(

override protected def createSession(req: HttpServletRequest): BatchSession = {
val createRequest = bodyAs[CreateBatchRequest](req)
val proxyUser = checkImpersonation(createRequest.proxyUser, req)
BatchSession.create(
sessionManager.nextId(), createRequest, livyConf, remoteUser(req), proxyUser, sessionStore)
sessionManager.nextId(),
createRequest,
livyConf,
accessManager,
remoteUser(req),
sessionStore)
}

override protected[batch] def clientSessionView(
session: BatchSession,
req: HttpServletRequest): Any = {
val logs =
if (hasViewAccess(session.owner, req)) {
if (accessManager.hasViewAccess(session.owner, remoteUser(req))) {
val lines = session.logLines()

val size = 10
Expand Down
Loading

0 comments on commit 134713d

Please sign in to comment.