Skip to content

Commit

Permalink
[LIVY-732][SERVER] A common zookeeper wrapper utility
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Currently, the utilities of zookeeper mixed with ZooKeeperStateStore. To use the utility of zookeeper, the instance of ZooKeeperStateStore has to be created , which looks weird.

This PR aims to achieve the follow target:

1.  Extract the utilities of zookeeper from ZooKeeperStateStore to support such as distributed lock, service discovery and so on.

## How was this patch tested?

Existed UT and IT.

Author: runzhiwang <[email protected]>

Closes apache#267 from runzhiwang/zk-wrapper-utility.
  • Loading branch information
runzhiwang authored and jerryshao committed Jan 7, 2020
1 parent 1ca3dfd commit 40ea8cc
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 94 deletions.
20 changes: 20 additions & 0 deletions conf/livy.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@
# Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to
# configure the state store.
# livy.server.recovery.mode = off
# Zookeeper address used for HA and state store. e.g. host1:port1, host2:port2
# livy.server.zookeeper.url =

# Where Livy should store state to for recovery. Possible values:
# <empty>: Default. State store disabled.
Expand All @@ -117,8 +119,26 @@
# For filesystem state store, the path of the state store directory. Please don't use a filesystem
# that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
# For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
# If livy.server.recovery.state-store is zookeeper, this config is for back-compatibility,
# so if both this config and livy.server.zookeeper.url exist,
# livy uses livy.server.zookeeper.url first.
# livy.server.recovery.state-store.url =

# The policy of curator connecting to zookeeper.
# For example, m, n means retry m times and the interval of retry is n milliseconds.
# Please use the new config: livy.server.zk.retry-policy.
# Keep this config for back-compatibility.
# If both this config and livy.server.zk.retry-policy exist,
# livy uses livy.server.zk.retry-policy first.
# livy.server.recovery.zk-state-store.retry-policy = 5,100

# The policy of curator connecting to zookeeper.
# For example, m, n means retry m times and the interval of retry is n milliseconds
# livy.server.zk.retry-policy =

# The dir in zk to store the data about session.
# livy.server.recovery.zk-state-store.key-prefix = livy

# If Livy can't find the yarn app within this time, consider it lost.
# livy.server.yarn.app-lookup-timeout = 120s
# When the cluster is busy, we may fail to launch yarn app in app-lookup-timeout, then it would
Expand Down
30 changes: 29 additions & 1 deletion server/src/main/scala/org/apache/livy/LivyConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ object LivyConf {
* configure the state store.
*/
val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off")

// Zookeeper address used for HA and state store. e.g. host1:port1, host2:port2
val ZOOKEEPER_URL = Entry("livy.server.zookeeper.url", null)

/**
* Where Livy should store state to for recovery. Possible values:
* <empty>: Default. State store disabled.
Expand All @@ -196,8 +200,32 @@ object LivyConf {
* For filesystem state store, the path of the state store directory. Please don't use a
* filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
* For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
* If livy.server.recovery.state-store is zookeeper, this config is for back-compatibility,
* so if both this config and livy.server.zookeeper.url exist,
* livy uses livy.server.zookeeper.url first.
*/
val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", null)

/**
* The policy of curator connecting to zookeeper.
* For example, m, n means retry m times and the interval of retry is n milliseconds.
* Please use the new config: livy.server.zk.retry-policy.
* Keep this config for back-compatibility.
* If both this config and livy.server.zk.retry-policy exist,
* livy uses livy.server.zk.retry-policy first.
*/
val RECOVERY_ZK_STATE_STORE_RETRY_POLICY =
Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100")

/**
* The policy of curator connecting to zookeeper.
* For example, m, n means retry m times and the interval of retry is n milliseconds
*/
val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")
val ZK_RETRY_POLICY = Entry("livy.server.zk.retry-policy", null)

// The dir in zookeeper to store the data about session.
val RECOVERY_ZK_STATE_STORE_KEY_PREFIX =
Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")

// Livy will cache the max no of logs specified. 0 means don't cache the logs.
val SPARK_LOGS_SIZE = Entry("livy.cache-log.size", 200)
Expand Down
12 changes: 10 additions & 2 deletions server/src/main/scala/org/apache/livy/server/LivyServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.livy._
import org.apache.livy.server.auth.LdapAuthenticationHandlerImpl
import org.apache.livy.server.batch.BatchSessionServlet
import org.apache.livy.server.interactive.InteractiveSessionServlet
import org.apache.livy.server.recovery.{SessionStore, StateStore}
import org.apache.livy.server.recovery.{SessionStore, StateStore, ZooKeeperManager}
import org.apache.livy.server.ui.UIServlet
import org.apache.livy.sessions.{BatchSessionManager, InteractiveSessionManager}
import org.apache.livy.sessions.SessionManager.SESSION_RECOVERY_MODE_OFF
Expand All @@ -60,6 +60,8 @@ class LivyServer extends Logging {
private var accessManager: AccessManager = _
private var _thriftServerFactory: Option[ThriftServerFactory] = None

private var zkManager: Option[ZooKeeperManager] = None

private var ugi: UserGroupInformation = _

def start(): Unit = {
Expand Down Expand Up @@ -146,7 +148,12 @@ class LivyServer extends Logging {
Future { SparkYarnApp.yarnClient }
}

StateStore.init(livyConf)
if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
zkManager = Some(new ZooKeeperManager(livyConf))
zkManager.foreach(_.start())
}

StateStore.init(livyConf, zkManager)
val sessionStore = new SessionStore(livyConf)
val batchSessionManager = new BatchSessionManager(livyConf, sessionStore)
val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore)
Expand Down Expand Up @@ -323,6 +330,7 @@ class LivyServer extends Logging {
Runtime.getRuntime().addShutdownHook(new Thread("Livy Server Shutdown") {
override def run(): Unit = {
info("Shutting down Livy server.")
zkManager.foreach(_.stop())
server.stop()
_thriftServerFactory.foreach(_.stop())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class FileSystemStateStore(

private val fsUri = {
val fsPath = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
require(!fsPath.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
require(fsPath != null && !fsPath.isEmpty,
s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
new URI(fsPath)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.server.recovery.ZooKeeperManager
import org.apache.livy.sessions.SessionKindModule
import org.apache.livy.sessions.SessionManager._

Expand Down Expand Up @@ -78,11 +79,17 @@ abstract class StateStore(livyConf: LivyConf) extends JsonMapper {
object StateStore extends Logging {
private[this] var stateStore: Option[StateStore] = None

def init(livyConf: LivyConf): Unit = synchronized {
def init(livyConf: LivyConf, zkManager: Option[ZooKeeperManager] = None): Unit = synchronized {
if (stateStore.isEmpty) {
val fileStateStoreClassTag = pickStateStore(livyConf)
stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf])
.newInstance(livyConf).asInstanceOf[StateStore])
if (fileStateStoreClassTag == classOf[ZooKeeperStateStore]) {
stateStore = Option(fileStateStoreClassTag.
getDeclaredConstructor(classOf[LivyConf], classOf[ZooKeeperManager])
.newInstance(livyConf, zkManager.get).asInstanceOf[StateStore])
} else {
stateStore = Option(fileStateStoreClassTag.getDeclaredConstructor(classOf[LivyConf])
.newInstance(livyConf).asInstanceOf[StateStore])
}
info(s"Using ${stateStore.get.getClass.getSimpleName} for recovery.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.livy.server.recovery

import scala.collection.JavaConverters._
import scala.reflect.ClassTag

import org.apache.curator.framework.api.UnhandledErrorListener
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.RetryNTimes
import org.apache.zookeeper.KeeperException.NoNodeException

import org.apache.livy.LivyConf
import org.apache.livy.Logging
import org.apache.livy.utils.LivyUncaughtException

class ZooKeeperManager(
livyConf: LivyConf,
mockCuratorClient: Option[CuratorFramework] = None)
extends JsonMapper with Logging {

def this(livyConf: LivyConf) {
this(livyConf, None)
}

private val zkAddress = Option(livyConf.get(LivyConf.ZOOKEEPER_URL)).
orElse(Option(livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL))).
map(_.trim).orNull

require(zkAddress != null && !zkAddress.isEmpty,
s"Please config ${LivyConf.ZOOKEEPER_URL.key}.")

private val retryValue = Option(livyConf.get(LivyConf.ZK_RETRY_POLICY)).
orElse(Option(livyConf.get(LivyConf.RECOVERY_ZK_STATE_STORE_RETRY_POLICY))).
map(_.trim).orNull

require(retryValue != null && !retryValue.isEmpty,
s"Please config ${LivyConf.ZK_RETRY_POLICY.key}.")

// a regex to match patterns like "m, n" where m and n both are integer values
private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
private[recovery] val retryPolicy = retryValue match {
case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
case _ => throw new IllegalArgumentException(
s"contains bad value: $retryValue. " +
"Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
}

private val curatorClient = mockCuratorClient.getOrElse {
CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
}

curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
def unhandledError(message: String, e: Throwable): Unit = {
error(s"Fatal Zookeeper error: ${message}.", e)
throw new LivyUncaughtException(e.getMessage)
}
})

def start(): Unit = {
curatorClient.start()
}

def stop(): Unit = {
curatorClient.close()
}

// TODO Make sure ZK path has proper secure permissions so that other users cannot read its
// contents.
def set(key: String, value: Object): Unit = {
val data = serializeToBytes(value)
if (curatorClient.checkExists().forPath(key) == null) {
curatorClient.create().creatingParentsIfNeeded().forPath(key, data)
} else {
curatorClient.setData().forPath(key, data)
}
}

def get[T: ClassTag](key: String): Option[T] = {
if (curatorClient.checkExists().forPath(key) == null) {
None
} else {
Option(deserialize[T](curatorClient.getData().forPath(key)))
}
}

def getChildren(key: String): Seq[String] = {
if (curatorClient.checkExists().forPath(key) == null) {
Seq.empty[String]
} else {
curatorClient.getChildren.forPath(key).asScala
}
}

def remove(key: String): Unit = {
try {
curatorClient.delete().guaranteed().forPath(key)
} catch {
case _: NoNodeException => warn(s"Fail to remove non-existed zookeeper node: ${key}")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,103 +16,35 @@
*/
package org.apache.livy.server.recovery

import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.Try

import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
import org.apache.curator.framework.api.UnhandledErrorListener
import org.apache.curator.retry.RetryNTimes
import org.apache.zookeeper.KeeperException.NoNodeException

import org.apache.livy.{LivyConf, Logging}
import org.apache.livy.LivyConf.Entry

object ZooKeeperStateStore {
val ZK_KEY_PREFIX_CONF = Entry("livy.server.recovery.zk-state-store.key-prefix", "livy")
val ZK_RETRY_CONF = Entry("livy.server.recovery.zk-state-store.retry-policy", "5,100")
}
import org.apache.livy.LivyConf

class ZooKeeperStateStore(
livyConf: LivyConf,
mockCuratorClient: Option[CuratorFramework] = None) // For testing
extends StateStore(livyConf) with Logging {

import ZooKeeperStateStore._

// Constructor defined for StateStore factory to new this class using reflection.
def this(livyConf: LivyConf) {
this(livyConf, None)
}

private val zkAddress = livyConf.get(LivyConf.RECOVERY_STATE_STORE_URL)
require(!zkAddress.isEmpty, s"Please config ${LivyConf.RECOVERY_STATE_STORE_URL.key}.")
private val zkKeyPrefix = livyConf.get(ZK_KEY_PREFIX_CONF)
private val retryValue = livyConf.get(ZK_RETRY_CONF)
// a regex to match patterns like "m, n" where m and n both are integer values
private val retryPattern = """\s*(\d+)\s*,\s*(\d+)\s*""".r
private[recovery] val retryPolicy = retryValue match {
case retryPattern(n, sleepMs) => new RetryNTimes(n.toInt, sleepMs.toInt)
case _ => throw new IllegalArgumentException(
s"$ZK_KEY_PREFIX_CONF contains bad value: $retryValue. " +
"Correct format is <max retry count>,<sleep ms between retry>. e.g. 5,100")
}

private val curatorClient = mockCuratorClient.getOrElse {
CuratorFrameworkFactory.newClient(zkAddress, retryPolicy)
}

Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
override def run(): Unit = {
curatorClient.close()
}
}))
zkManager: ZooKeeperManager)
extends StateStore(livyConf) {

curatorClient.getUnhandledErrorListenable().addListener(new UnhandledErrorListener {
def unhandledError(message: String, e: Throwable): Unit = {
error(s"Fatal Zookeeper error. Shutting down Livy server.")
System.exit(1)
}
})
curatorClient.start()
// TODO Make sure ZK path has proper secure permissions so that other users cannot read its
// contents.
private val zkKeyPrefix = livyConf.get(LivyConf.RECOVERY_ZK_STATE_STORE_KEY_PREFIX)
private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"

override def set(key: String, value: Object): Unit = {
val prefixedKey = prefixKey(key)
val data = serializeToBytes(value)
if (curatorClient.checkExists().forPath(prefixedKey) == null) {
curatorClient.create().creatingParentsIfNeeded().forPath(prefixedKey, data)
} else {
curatorClient.setData().forPath(prefixedKey, data)
}
zkManager.set(prefixKey(key), value)
}

override def get[T: ClassTag](key: String): Option[T] = {
val prefixedKey = prefixKey(key)
if (curatorClient.checkExists().forPath(prefixedKey) == null) {
None
} else {
Option(deserialize[T](curatorClient.getData().forPath(prefixedKey)))
}
zkManager.get(prefixKey(key))
}

override def getChildren(key: String): Seq[String] = {
val prefixedKey = prefixKey(key)
if (curatorClient.checkExists().forPath(prefixedKey) == null) {
Seq.empty[String]
} else {
curatorClient.getChildren.forPath(prefixedKey).asScala
}
zkManager.getChildren(prefixKey(key))
}

override def remove(key: String): Unit = {
try {
curatorClient.delete().guaranteed().forPath(prefixKey(key))
} catch {
case _: NoNodeException => warn(s"Fail to remove non-existed zookeeper node: ${key}")
}
zkManager.remove(prefixKey(key))
}

private def prefixKey(key: String) = s"/$zkKeyPrefix/$key"
def getZooKeeperManager(): ZooKeeperManager = {
zkManager
}
}
Loading

0 comments on commit 40ea8cc

Please sign in to comment.