Skip to content

Commit

Permalink
LIVY-67. Support impersonation in client sessions.
Browse files Browse the repository at this point in the history
There are a few different things going on here to reach that goal:

- client-local now supports spark submit's --proxy-user command line
  argument, and a new configuration to define the user to be proxied.

- the Livy server now has a list of "super users" who can do anything;
  relevant to this change, super users can impersonate anyone.

- SessionServlet now has a helper method to figure out who should be
  impersonated and check for appropriate permissions.

- The three session servlets now use that method; the semantics are:

  * if there is no user information (i.e. auth is off), users are
    allowed to impersonate anyone
  * when users are authenticated, normal users can only impersonate
    themselves; even if they don't specify a proxy user, the server
    will impersonate the calling user.
  * super users can impersonate anyone; if a specific proxy user is not
    requested, the session will run as the super user.

The rest of the changes are mostly test code, and a small tweak to
JsonServlet to be able to serialize raw String into a JSON object
until LIVY-54 is fixed.

Closes apache#74
  • Loading branch information
Marcelo Vanzin committed Feb 24, 2016
1 parent 0081921 commit 8287f4c
Show file tree
Hide file tree
Showing 21 changed files with 292 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,19 @@ private CreateClientRequest() {
public static class SessionInfo implements ClientMessage {

public final int id;
public final String owner;
public final String proxyUser;
public final String state;

public SessionInfo(int id, String state) {
public SessionInfo(int id, String owner, String proxyUser, String state) {
this.id = id;
this.owner = owner;
this.proxyUser = proxyUser;
this.state = state;
}

private SessionInfo() {
this(-1, null);
this(-1, null, null, null);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
val id = sessionManager.nextId()
when(session.id).thenReturn(id)
when(session.state).thenReturn(SessionState.Idle())
when(session.proxyUser).thenReturn(None)
when(session.stop()).thenReturn(Future { })
require(HttpClientSpec.session == null, "Session already created?")
HttpClientSpec.session = session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ public void run() {
argv.add("--class");
argv.add(RemoteDriver.class.getName());

if (conf.get(PROXY_USER) != null) {
argv.add("--proxy-user");
argv.add(conf.get(PROXY_USER));
}

String jar = "spark-internal";
String livyJars = conf.get(LIVY_JARS);
if (livyJars == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public static enum Entry implements ConfEntry {

LIVY_JARS("jars", null),

PROXY_USER("proxy_user", null),

RPC_SERVER_ADDRESS("rpc.server.address", null),
RPC_CLIENT_HANDSHAKE_TIMEOUT("server.connect.timeout", "90000ms"),
RPC_CLIENT_CONNECT_TIMEOUT("client.connect.timeout", "10000ms"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.common.base.Objects;
import com.google.common.base.Strings;
import com.google.common.io.ByteStreams;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaRDD;
Expand Down Expand Up @@ -247,7 +248,7 @@ void call(LivyClient client) throws Exception {
}

@Test
public void testStreamingContext() throws Exception{
public void testStreamingContext() throws Exception {
runTest(true, new TestFunction() {
@Override
void call(LivyClient client) throws Exception {
Expand All @@ -258,6 +259,25 @@ void call(LivyClient client) throws Exception {
});
}

@Test
public void testImpersonation() throws Exception {
final String PROXY = "__proxy__";

runTest(false, new TestFunction() {
@Override
void config(Properties conf) {
conf.put(LocalConf.Entry.PROXY_USER.key(), PROXY);
}

@Override
void call(LivyClient client) throws Exception {
JobHandle<String> handle = client.submit(new GetCurrentUserJob());
String userName = handle.get(TIMEOUT, TimeUnit.SECONDS);
assertEquals(PROXY, userName);
}
});
}

@Test
public void testBypass() throws Exception {
runBypassTest(false);
Expand Down Expand Up @@ -518,6 +538,15 @@ public String call(JobContext jc) {

}

private static class GetCurrentUserJob implements Job<String> {

@Override
public String call(JobContext jc) throws Exception {
return UserGroupInformation.getCurrentUser().getUserName();
}

}

private abstract static class TestFunction {
abstract void call(LivyClient client) throws Exception;
void config(Properties conf) { }
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/com/cloudera/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ object LivyConf {
val IMPERSONATION_ENABLED = Entry("livy.impersonation.enabled", false)
val LIVY_HOME = Entry("livy.home", null)
val FILE_UPLOAD_MAX_SIZE = Entry("livy.file.upload.max.size", 100L * 1024 * 1024)
val SUPERUSERS = Entry("livy.superusers", null)

lazy val TEST_LIVY_HOME = Files.createTempDirectory("livyTemp").toUri.toString
}
Expand All @@ -55,6 +56,8 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {

import LivyConf._

private lazy val _superusers = Option(get(SUPERUSERS)).map(_.split("[, ]+").toSeq).getOrElse(Nil)

/**
* Create a LivyConf that loads defaults from the system properties and the classpath.
* @return
Expand Down Expand Up @@ -92,6 +95,9 @@ class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
.getOrElse("spark-submit")
}

/** Return the list of superusers. */
def superusers(): Seq[String] = _superusers

private def loadFromMap(map: Iterable[(String, String)]): Unit = {
map.foreach { case (k, v) =>
if (k.startsWith("livy.")) {
Expand Down
12 changes: 7 additions & 5 deletions server/src/main/scala/com/cloudera/livy/server/JsonServlet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,20 @@ abstract class JsonServlet extends ScalatraServlet with ApiFormats with FutureSu
}
}

override protected def renderResponse(actionResult: Any): Unit = {
override protected def renderResponseBody(actionResult: Any): Unit = {
val result = actionResult match {
case async: AsyncResult =>
async
case ActionResult(status, body, headers) if format == "json" =>
ActionResult(status, toJson(body), headers)
case str: String if format == "json" =>
// This should be changed when we implement LIVY-54. For now, just create a dummy
// JSON object when a raw string is being returned.
toJson(Map("msg" -> str))
case other if format == "json" =>
Ok(toJson(other))
toJson(other)
case other =>
other
}
super.renderResponse(result)
super.renderResponseBody(result)
}

protected def bodyAs[T: ClassTag](req: HttpServletRequest)
Expand Down
4 changes: 4 additions & 0 deletions server/src/main/scala/com/cloudera/livy/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ object Main extends Logging {
livyConf.get(KERBEROS_NAME_RULES))
server.context.addFilter(holder, "/*", EnumSet.allOf(classOf[DispatcherType]))
info(s"SPNEGO auth enabled (principal = $principal)")
if (!livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
info(s"Enabling impersonation since auth type is $authType.")
livyConf.set(LivyConf.IMPERSONATION_ENABLED, true)
}

case null =>
// Nothing to do.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ object SessionServlet extends Logging
*
* Type parameters:
* S: the session type
* R: the type representing the session create parameters.
*/
abstract class SessionServlet[S <: Session](livyConf: LivyConf)
extends JsonServlet
Expand Down Expand Up @@ -159,6 +158,36 @@ abstract class SessionServlet[S <: Session](livyConf: LivyConf)
*/
protected def remoteUser(req: HttpServletRequest): String = req.getRemoteUser()

/**
* Checks that the request's user can impersonate the target user.
*
* If the user does not have permission to impersonate, then halt execution.
*
* @return The user that should be impersonated. That can be the target user if defined, the
* request's user - which may not be defined - otherwise, or `None` if impersonation is
* disabled.
*/
protected def checkImpersonation(
target: Option[String],
req: HttpServletRequest): Option[String] = {
if (livyConf.getBoolean(LivyConf.IMPERSONATION_ENABLED)) {
if (!target.map(hasAccess(_, req)).getOrElse(true)) {
halt(Forbidden(s"User '${remoteUser(req)}' not allowed to impersonate '$target'."))
}
target.orElse(Option(remoteUser(req)))
} else {
None
}
}

/**
* Check that the request's user has access to resources owned by the given target user.
*/
protected def hasAccess(target: String, req: HttpServletRequest): Boolean = {
val user = remoteUser(req)
user == null || user == target || livyConf.superusers().contains(user)
}

/**
* Performs an operation on the session, without checking for ownership. Operations executed
* via this method must not modify the session in any way, or return potentially sensitive
Expand All @@ -176,7 +205,7 @@ abstract class SessionServlet[S <: Session](livyConf: LivyConf)
val sessionId = params("id").toInt
sessionManager.get(sessionId) match {
case Some(session) =>
if (allowAll || isOwner(session, request)) {
if (allowAll || hasAccess(session.owner, request)) {
fn(session)
} else {
Forbidden()
Expand All @@ -186,13 +215,6 @@ abstract class SessionServlet[S <: Session](livyConf: LivyConf)
}
}

/**
* Returns whether the current request's user is the owner of the given session.
*/
protected def isOwner(session: Session, req: HttpServletRequest): Boolean = {
session.owner == remoteUser(req)
}

private def serializeLogs(session: S, fromOpt: Option[Int], sizeOpt: Option[Int]) = {
val lines = session.logLines()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@ import com.cloudera.livy.LivyConf
import com.cloudera.livy.sessions.{Session, SessionState}
import com.cloudera.livy.utils.SparkProcessBuilder

class BatchSession(id: Int, owner: String, livyConf: LivyConf, request: CreateBatchRequest)
class BatchSession(
id: Int,
owner: String,
proxyUser: Option[String],
livyConf: LivyConf,
request: CreateBatchRequest)
extends Session(id, owner) {

private val process = {
require(request.file != null, "File is required.")

val builder = new SparkProcessBuilder(livyConf)
builder.conf(request.conf)
request.proxyUser.foreach(builder.proxyUser)
proxyUser.foreach(builder.proxyUser)
request.className.foreach(builder.className)
request.jars.foreach(builder.jar)
request.pyFiles.foreach(builder.pyFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ class BatchSessionServlet(livyConf: LivyConf)

override protected def createSession(req: HttpServletRequest): BatchSession = {
val createRequest = bodyAs[CreateBatchRequest](req)
new BatchSession(sessionManager.nextId(), remoteUser(req), livyConf, createRequest)
val proxyUser = checkImpersonation(createRequest.proxyUser, req)
new BatchSession(sessionManager.nextId(), remoteUser(req), proxyUser, livyConf, createRequest)
}

override protected def clientSessionView(session: BatchSession, req: HttpServletRequest): Any = {
val logs =
if (isOwner(session, req)) {
if (hasAccess(session.owner, req)) {
val lines = session.logLines()

val size = 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ import org.apache.hadoop.fs.{FileSystem, Path}

import com.cloudera.livy.{LivyClientBuilder, Logging}
import com.cloudera.livy.client.common.HttpMessages._
import com.cloudera.livy.client.local.LocalClient
import com.cloudera.livy.client.local.{LocalClient, LocalConf}
import com.cloudera.livy.sessions.{Session, SessionState}

class ClientSession(id: Int, owner: String, createRequest: CreateClientRequest, livyHome: String)
class ClientSession(
id: Int,
owner: String,
val proxyUser: Option[String],
createRequest: CreateClientRequest,
livyHome: String)
extends Session(id, owner) with Logging {
implicit val executionContext = ExecutionContext.global

Expand All @@ -47,13 +52,15 @@ class ClientSession(id: Int, owner: String, createRequest: CreateClientRequest,

private val client = {
info(s"Creating LivyClient for sessionId: $id")
new LivyClientBuilder()
val builder = new LivyClientBuilder()
.setConf("spark.app.name", s"livy-session-$id")
.setConf("spark.master", "yarn-cluster")
.setAll(Option(createRequest.conf).getOrElse(new JHashMap()))
.setURI(new URI("local:spark"))
.setConf("livy.client.sessionId", id.toString)
.build()

proxyUser.foreach(builder.setConf(LocalConf.Entry.PROXY_USER.key(), _))
builder.build()
}.asInstanceOf[LocalClient]

private val fs = FileSystem.get(new Configuration())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.scalatra.servlet.{FileUploadSupport, MultipartConfig}

import com.cloudera.livy.{JobHandle, LivyConf}
import com.cloudera.livy.client.common.HttpMessages._
import com.cloudera.livy.client.local.LocalConf
import com.cloudera.livy.server.SessionServlet
import com.cloudera.livy.sessions.SessionManager

Expand All @@ -39,7 +40,15 @@ class ClientSessionServlet(livyConf: LivyConf)
override protected def createSession(req: HttpServletRequest): ClientSession = {
val id = sessionManager.nextId()
val createRequest = bodyAs[CreateClientRequest](req)
new ClientSession(id, remoteUser(req), createRequest, livyConf.livyHome)
val user = remoteUser(req)
val requestedProxy =
if (createRequest.conf != null) {
Option(createRequest.conf.get(LocalConf.Entry.PROXY_USER.key()))
} else {
None
}
val proxyUser = checkImpersonation(requestedProxy, req)
new ClientSession(id, user, proxyUser, createRequest, livyConf.livyHome)
}

jpost[SerializedJob]("/:id/submit-job") { req =>
Expand Down Expand Up @@ -123,7 +132,8 @@ class ClientSessionServlet(livyConf: LivyConf)
}

override protected def clientSessionView(session: ClientSession, req: HttpServletRequest): Any = {
new SessionInfo(session.id, session.state.toString)
new SessionInfo(session.id, session.owner, session.proxyUser.getOrElse(null),
session.state.toString)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ object InteractiveSession {
class InteractiveSession(
id: Int,
owner: String,
_proxyUser: Option[String],
livyConf: LivyConf,
request: CreateInteractiveRequest)
extends Session(id, owner) {
Expand Down Expand Up @@ -83,7 +84,7 @@ class InteractiveSession(
val jars = request.jars ++ livyJars(livyConf)
jars.foreach(builder.jar)

request.proxyUser.foreach(builder.proxyUser)
_proxyUser.foreach(builder.proxyUser)
request.queue.foreach(builder.queue)
request.name.foreach(builder.name)

Expand Down Expand Up @@ -220,7 +221,7 @@ class InteractiveSession(

def kind: Kind = request.kind

def proxyUser: Option[String] = request.proxyUser
def proxyUser: Option[String] = _proxyUser

def url: Option[URL] = _url

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ class InteractiveSessionServlet(livyConf: LivyConf)

override protected def createSession(req: HttpServletRequest): InteractiveSession = {
val createRequest = bodyAs[CreateInteractiveRequest](req)
new InteractiveSession(sessionManager.nextId(), remoteUser(req), livyConf, createRequest)
val proxyUser = checkImpersonation(createRequest.proxyUser, req)
new InteractiveSession(sessionManager.nextId(), remoteUser(req), proxyUser, livyConf,
createRequest)
}

override protected def clientSessionView(
session: InteractiveSession,
req: HttpServletRequest): Any = {
val logs =
if (isOwner(session, req)) {
if (hasAccess(session.owner, req)) {
val lines = session.logLines()

val size = 10
Expand Down
Loading

0 comments on commit 8287f4c

Please sign in to comment.