Skip to content

Commit

Permalink
Merge pull request byzer-org#1552 from allwefantasy/TRY
Browse files Browse the repository at this point in the history
ISSUE#1551 Plugin dynamic load fail in JDK14
  • Loading branch information
chncaesar authored Sep 16, 2021
2 parents 7f850c3 + 53ea3f0 commit e493203
Showing 1 changed file with 28 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import org.apache.spark.SparkCoreVersion
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.mlsql.session.MLSQLException
import streaming.core.datasource.MLSQLRegistry
import streaming.core.strategy.platform.{PlatformManager, SparkRuntime}
import streaming.log.WowLog
import tech.mlsql.common.utils.classloader.ClassLoaderTool
import tech.mlsql.common.utils.log.Logging
Expand Down Expand Up @@ -96,13 +95,13 @@ object PluginUtils extends Logging with WowLog {
val dataLake = new DataLake(spark)

val hdfsPath = PathFun(dataLake.identifyToPath(TABLE_FILES)).add("store").add("plugins")
saveStream(pluginName, fileLen, hdfsPath.toPath, fieldValue, inputStream,spark.sparkContext.hadoopConfiguration)
saveStream(pluginName, fileLen, hdfsPath.toPath, fieldValue, inputStream, spark.sparkContext.hadoopConfiguration)
HDFSOperatorV2.deleteDir("." + hdfsPath.toPath + ".crc")
(fieldValue, PathFun(hdfsPath.toPath).add(fieldValue).toPath)

}

def saveStream(pluginName: String, fileLen: Long, path: String, fileName: String, inputStream: InputStream,hadoopConf:Configuration) = {
def saveStream(pluginName: String, fileLen: Long, path: String, fileName: String, inputStream: InputStream, hadoopConf: Configuration) = {

def formatNumber(wow: Double): String = {
if (wow == -1) return "UNKNOW"
Expand Down Expand Up @@ -205,10 +204,32 @@ object PluginUtils extends Logging with WowLog {

def loadJarInDriver(path: String) = {
//.getSystemClassLoader()
val systemClassLoader = ClassLoaderTool.getContextOrDefaultLoader.asInstanceOf[URLClassLoader]
val method = classOf[URLClassLoader].getDeclaredMethod("addURL", classOf[URL])
method.setAccessible(true)
method.invoke(systemClassLoader, new File(path).toURI.toURL)
val systemClassLoader = ClassLoaderTool.getContextOrDefaultLoader
if (systemClassLoader.isInstanceOf[URLClassLoader]) {
val method = classOf[URLClassLoader].getDeclaredMethod("addURL", classOf[URL])
method.setAccessible(true)
method.invoke(systemClassLoader, new File(path).toURI.toURL)
} else {

try {
val method = systemClassLoader.getClass()
.getDeclaredMethod("appendToClassPathForInstrumentation", classOf[String])
method.setAccessible(true)
method.invoke(systemClassLoader, path)
} catch {
case e: Exception =>
throw new RuntimeException(
s"""
|In order to dynamically install plugin in JDK[${System.getProperty("java.version")}] > 8
|you may need to add following to VM options:
|
|--add-opens java.base/jdk.internal.loader=ALL-UNNAMED --add-opens jdk.zipfs/jdk.nio.zipfs=ALL-UNNAMED
|
|""".stripMargin)
}

}

}

def checkVersionCompatibility(pluginName: String, className: String) = {
Expand Down

0 comments on commit e493203

Please sign in to comment.