Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
allwefantasy committed Oct 6, 2018
1 parent a362157 commit 0b554a9
Show file tree
Hide file tree
Showing 7 changed files with 678 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,57 @@ object APIDeployPythonRunnerEnv {
pythonWorkers.size
}

def createPythonWorker(pythonExec: String, envVars: Map[String, String], logCallback: (String) => Unit): java.net.Socket = {
def generate_key(daemonCommand: Option[Seq[String]],
workerCommand: Option[Seq[String]]) = {
daemonCommand.get.mkString(" ") + workerCommand.get.mkString(" ")
}

def createPythonWorker(daemonCommand: Option[Seq[String]],
workerCommand: Option[Seq[String]],
envVars: Map[String, String],
logCallback: (String) => Unit,
idleWorkerTimeoutMS: Long,
noCache: Boolean = true
): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
pythonWorkers.getOrElseUpdate(key, new WowPythonWorkerFactory(pythonExec, envVars, logCallback)).create()
val key = (generate_key(daemonCommand, workerCommand), envVars)
if (noCache) {
pythonWorkers.getOrElseUpdate(key, new WowPythonWorkerFactory(
daemonCommand,
workerCommand,
envVars,
logCallback,
idleWorkerTimeoutMS)).create()
} else {
new WowPythonWorkerFactory(
daemonCommand,
workerCommand,
envVars,
logCallback,
idleWorkerTimeoutMS).create()
}


}
}


def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
def destroyPythonWorker(daemonCommand: Option[Seq[String]],
workerCommand: Option[Seq[String]],
envVars: Map[String, String],
worker: Socket) {
synchronized {
val key = (pythonExec, envVars)
val key = (generate_key(daemonCommand, workerCommand), envVars)
pythonWorkers.get(key).foreach(_.stopWorker(worker))
}
}


def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
def releasePythonWorker(daemonCommand: Option[Seq[String]],
workerCommand: Option[Seq[String]],
envVars: Map[String, String], worker: Socket) {
synchronized {
val key = (pythonExec, envVars)
val key = (generate_key(daemonCommand, workerCommand), envVars)
pythonWorkers.get(key).foreach(_.releaseWorker(worker))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,29 @@ import java.net.Socket
*/
class MLSQLPythonEnv(env: SparkEnv, deployAPI: Boolean) {

def createPythonWorker(pythonExec: String, envVars: Map[String, String], logCallback: (String) => Unit): java.net.Socket = {
APIDeployPythonRunnerEnv.createPythonWorker(pythonExec, envVars, logCallback)
def sparkEnv = env

def createPythonWorker(daemonCommand: Option[Seq[String]],
workerCommand: Option[Seq[String]],
envVars: Map[String, String],
logCallback: (String) => Unit,
idleWorkerTimeoutMS: Long,
noCache: Boolean = true
): java.net.Socket = {
APIDeployPythonRunnerEnv.createPythonWorker(daemonCommand, workerCommand, envVars, logCallback, idleWorkerTimeoutMS, noCache)
}


def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
APIDeployPythonRunnerEnv.destroyPythonWorker(pythonExec, envVars, worker)
def destroyPythonWorker(daemonCommand: Option[Seq[String]],
workerCommand: Option[Seq[String]],
envVars: Map[String, String], worker: Socket) {
APIDeployPythonRunnerEnv.destroyPythonWorker(daemonCommand, workerCommand, envVars, worker)
}

def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
APIDeployPythonRunnerEnv.releasePythonWorker(pythonExec, envVars, worker)

def releasePythonWorker(daemonCommand: Option[Seq[String]],
workerCommand: Option[Seq[String]],
envVars: Map[String, String], worker: Socket) {
APIDeployPythonRunnerEnv.releasePythonWorker(daemonCommand, workerCommand, envVars, worker)
}
}
Loading

0 comments on commit 0b554a9

Please sign in to comment.