From 787cfc120f391f9509ab4154b7a36d4e03b7256b Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 25 May 2017 03:53:04 +0100 Subject: [PATCH] Use reflection to call BlockManager.putBytes in Spark 2.x to handle different method signatures (#907) * Use reflection to call BlockManager.putBytes in Spark 2.x to handle different method signatures * Remove Spark 1.6, Spark 2.1.x, Cloudera profiles that shouldn't be necessary for compatibility with Spark versions * Rename profile to spark_2.x per dding3 and make further related changes to reflect new profile name * Per yiheng, precompute entire putBytes function instead of checking arg length at runtime --- .travis.yml | 3 +- pom.xml | 48 ++----------------- pyspark/dl/README.md | 4 +- scripts/run.example.sh | 2 +- spark/dl/pom.xml | 8 +--- spark/dl/src/test/integration-test.robot | 8 ++-- .../spark/storage/BlockManagerWrapper.scala | 45 ++++++++++++++++- 7 files changed, 56 insertions(+), 62 deletions(-) diff --git a/.travis.yml b/.travis.yml index 487260f427c..8f567676649 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,10 +9,9 @@ matrix: include: # Covers Java 7, Spark 1.x - jdk: openjdk7 - env: SPARK=-Pspark_1.6 # Covers Java 8, Spark 2.x - jdk: oraclejdk8 - env: SPARK=-Pspark_2.1 + env: SPARK=-Pspark_2.x cache: directories: - $HOME/.m2 diff --git a/pom.xml b/pom.xml index 0055946c1fb..b2b9d9209cf 100644 --- a/pom.xml +++ b/pom.xml @@ -525,18 +525,12 @@ + spark_1.6 - - 1.5-plus - 1.6.0 - 2.10 - 2.10.5 - 2.0.1 - - spark_2.0 + spark_2.x 2.0 2.0.0 @@ -545,26 +539,6 @@ 2.1.0 - - spark_2.1 - - 2.0 - 2.1.0 - 2.11 - 2.11.8 - 2.1.0 - - - - spark_2.1.1 - - 2.0 - 2.1.1 - 2.11 - 2.11.8 - 2.1.0 - - scala_2.11 @@ -573,7 +547,7 @@ 2.1.0 - + scala_2.10 @@ -582,22 +556,6 @@ 2.0.1 - - cloudera - - 2.0 - 2.0.0.cloudera2 - 2.11 - 2.11.8 - 2.1.0 - - - - cloudera - https://repository.cloudera.com/artifactory/cloudera-repos/ - - - all-in-one diff --git a/pyspark/dl/README.md b/pyspark/dl/README.md index 981c7700dc1..b2a51b07695 100644 --- a/pyspark/dl/README.md +++ b/pyspark/dl/README.md @@ -11,8 +11,8 @@ This Python binding has been tested with Python 2.7 and Spark 1.6.0 / Spark 2.0. ## Installing on Ubuntu 1. Build BigDL [Build Page](https://github.com/intel-analytics/BigDL/wiki/Build-Page) - * With Spark1.6: ``` $BIGDL_HOME/make-dist.sh ``` - * With Spark2.0: ``` $BIGDL_HOME/make-dist.sh -P spark_2.0 ``` + * With Spark 1.6: ``` $BIGDL_HOME/make-dist.sh ``` + * With Spark 2.0 or later: ``` $BIGDL_HOME/make-dist.sh -P spark_2.x ``` 2. Install python dependensies(if you're running cluster mode, you need to install them on client and each worker node): * Installing Numpy: diff --git a/scripts/run.example.sh b/scripts/run.example.sh index 310ac339995..5592f33c074 100755 --- a/scripts/run.example.sh +++ b/scripts/run.example.sh @@ -47,7 +47,7 @@ eval set -- "$options" while true; do case $1 in -p|--spark) - if [ "$2" == "spark_2.0" ]; then + if [ "$2" == "spark_2.x" ]; then SPARK_DIR=$SPARK2_DIR SPARK_LINK=$SPARK2_LINK BIGDL_JAR=$BIGDL2_JAR diff --git a/spark/dl/pom.xml b/spark/dl/pom.xml index 9ddee7fb863..99997186dae 100644 --- a/spark/dl/pom.xml +++ b/spark/dl/pom.xml @@ -248,13 +248,7 @@ - spark_2.0 - - "" - - - - spark_2.1 + spark_2.x "" diff --git a/spark/dl/src/test/integration-test.robot b/spark/dl/src/test/integration-test.robot index 7b68552a234..eda14f674b0 100644 --- a/spark/dl/src/test/integration-test.robot +++ b/spark/dl/src/test/integration-test.robot @@ -41,13 +41,13 @@ Build SparkJar Log To Console build jar finished Spark2.0 Test Suite - Build SparkJar spark_2.0 + Build SparkJar spark_2.x Set Environment Variable SPARK_HOME /opt/work/spark-2.0.0-bin-hadoop2.7 ${submit}= Catenate SEPARATOR=/ /opt/work/spark-2.0.0-bin-hadoop2.7/bin spark-submit Run Shell ${submit} --master ${spark_200_3_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 150g --executor-cores 28 --total-executor-cores 84 --class com.intel.analytics.bigdl.models.lenet.Train ${jar_path} -f ${mnist_data_source} -b 336 -e 3 Spark2.1 Test Suite - Build SparkJar spark_2.1 + Build SparkJar spark_2.x Set Environment Variable SPARK_HOME /opt/work/spark-2.1.0-bin-hadoop2.7 ${submit}= Catenate SEPARATOR=/ /opt/work/spark-2.1.0-bin-hadoop2.7/bin spark-submit Run Shell ${submit} --master ${spark_210_3_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 150g --executor-cores 28 --total-executor-cores 84 --class com.intel.analytics.bigdl.models.lenet.Train ${jar_path} -f ${mnist_data_source} -b 336 -e 3 @@ -56,7 +56,7 @@ Hdfs Test Suite Run Shell mvn clean test -Dsuites=com.intel.analytics.bigdl.integration.HdfsSpec -DhdfsMaster=${hdfs_264_3_master} -Dmnist=${mnist_data_source} -P integration-test -DforkMode=never Yarn Test Suite - Build SparkJar spark_2.0 + Build SparkJar spark_2.x Set Environment Variable SPARK_HOME /opt/work/spark-2.0.0-bin-hadoop2.7 Set Environment Variable http_proxy http://child-prc.intel.com:913 Set Environment Variable https_proxy https://child-prc.intel.com:913 @@ -69,7 +69,7 @@ Yarn Test Suite TensorFlow Spark2.1 Test Suite - Build SparkJar spark_2.1 + Build SparkJar spark_2.x Set Environment Variable SPARK_HOME /opt/work/spark-2.1.0-bin-hadoop2.7 ${submit}= Catenate SEPARATOR=/ /opt/work/spark-2.1.0-bin-hadoop2.7/bin spark-submit Run Shell ${submit} --master ${spark_tf_210_3_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 150g --executor-cores 28 --total-executor-cores 56 --py-files ${curdir}/dist/lib/bigdl-0.2.0-SNAPSHOT-python-api.zip --jars ${jar_path} --properties-file ${curdir}/dist/conf/spark-bigdl.conf ${curdir}/pyspark/dl/models/lenet/lenet5.py -b 224 diff --git a/spark/spark-version/2.0/src/main/scala/org/apache/spark/storage/BlockManagerWrapper.scala b/spark/spark-version/2.0/src/main/scala/org/apache/spark/storage/BlockManagerWrapper.scala index 4a3b1ce7482..b43969f0d4c 100644 --- a/spark/spark-version/2.0/src/main/scala/org/apache/spark/storage/BlockManagerWrapper.scala +++ b/spark/spark-version/2.0/src/main/scala/org/apache/spark/storage/BlockManagerWrapper.scala @@ -16,11 +16,14 @@ package org.apache.spark.storage +import java.lang.{Boolean => JBoolean} import java.nio.ByteBuffer import org.apache.spark.SparkEnv import org.apache.spark.util.io.ChunkedByteBuffer +import scala.reflect.ClassTag + object BlockManagerWrapper { def putBytes( blockId: BlockId, @@ -29,7 +32,7 @@ object BlockManagerWrapper { require(bytes != null, "Bytes is null") val blockManager = SparkEnv.get.blockManager blockManager.removeBlock(blockId) - blockManager.putBytes(blockId, new ChunkedByteBuffer(bytes), level) + putBytesFn(blockId, new ChunkedByteBuffer(bytes), level) } def getLocal(blockId: BlockId): Option[BlockResult] = { @@ -57,4 +60,44 @@ object BlockManagerWrapper { blockInfoManager.unlock(blockId) } } + + private val putBytesFn: (BlockId, ChunkedByteBuffer, StorageLevel) => Unit = { + val bmClass = classOf[BlockManager] + // Spark 2.0.0 - 2.1.0, and 2.2.0+ (as of this writing), declare the method: + // def putBytes[T: ClassTag]( + // blockId: BlockId, + // bytes: ChunkedByteBuffer, + // level: StorageLevel, + // tellMaster: Boolean = true): Boolean + val putBytesMethod = + try { + bmClass.getMethod("putBytes", + classOf[BlockId], classOf[ChunkedByteBuffer], classOf[StorageLevel], + classOf[Boolean], classOf[ClassTag[_]]) + } catch { + case _: NoSuchMethodException => + // But Spark 2.1.1 and distros like Cloudera 2.0.0 / 2.1.0 had an extra boolean + // param: + // def putBytes[T: ClassTag]( + // blockId: BlockId, + // bytes: ChunkedByteBuffer, + // level: StorageLevel, + // tellMaster: Boolean = true, + // encrypt: Boolean = false): Boolean + bmClass.getMethod("putBytes", + classOf[BlockId], classOf[ChunkedByteBuffer], classOf[StorageLevel], + classOf[Boolean], classOf[Boolean], classOf[ClassTag[_]]) + } + putBytesMethod.getParameterTypes.length match { + case 5 => + (blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel) => + putBytesMethod.invoke(SparkEnv.get.blockManager, + blockId, bytes, level, JBoolean.TRUE, null) + case 6 => + (blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel) => + putBytesMethod.invoke(SparkEnv.get.blockManager, + blockId, bytes, level, JBoolean.TRUE, JBoolean.FALSE, null) + } + } + }