Skip to content

Commit

Permalink
Merge branch 'ISSUE-790' of github.com:allwefantasy/streamingpro into…
Browse files Browse the repository at this point in the history
… ISSUE-790
  • Loading branch information
allwefantasy committed Dec 17, 2018
2 parents 1f1671d + 8d4d524 commit 220aa94
Showing 1 changed file with 9 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ package org.apache.spark.ps.cluster

import java.util.concurrent.atomic.AtomicReference

import org.apache.spark.{MLSQLConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.cluster.PSDriverEndpoint
import org.apache.spark.security.CryptoStreamUtils
import org.apache.spark.{MLSQLConf, SparkContext}

/**
* Created by allwefantasy on 30/1/2018.
Expand All @@ -21,30 +21,23 @@ class PSDriverBackend(sc: SparkContext) extends Logging {
val isDriver = sc.env.executorId == SparkContext.DRIVER_IDENTIFIER
val bindAddress = sc.conf.get(DRIVER_BIND_ADDRESS)
val advertiseAddress = sc.conf.get(DRIVER_HOST_ADDRESS)
var port = sc.conf.getOption(MLSQLConf.MLSQL_CLUSTER_PS_DRIVER_PORT.key).getOrElse("7777").toInt
val port = sc.conf.getOption(MLSQLConf.MLSQL_CLUSTER_PS_DRIVER_PORT.key).getOrElse("7777").toInt
val ioEncryptionKey = if (sc.conf.get(IO_ENCRYPTION_ENABLED)) {
Some(CryptoStreamUtils.createKey(sc.conf))
} else {
None
}
logInfo(s"setup ps driver rpc env: ${bindAddress}:${port} clientMode=${!isDriver}")
var createSucess = false
var count = 0
val env = new AtomicReference[RpcEnv]()
while (!createSucess && count < 10) {
try {
env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf,
sc.env.securityManager, clientMode = !isDriver))
createSucess = true
} catch {
case e: Exception =>
logInfo("fail to create rpcenv", e)
count += 1
port += 1
}
try {
env.set(RpcEnv.create("PSDriverEndpoint", bindAddress, port, sc.conf,
sc.env.securityManager, clientMode = !isDriver))
} catch {
case e: Exception =>
logInfo("fail to create rpcenv", e)
}
if (env.get() == null) {
logError(s"fail to create rpcenv finally with attemp ${count} ")
logError(s"fail to create rpcenv finally ")
}
env.get()
}
Expand Down

0 comments on commit 220aa94

Please sign in to comment.