Skip to content

Commit

Permalink
Use reflection to call BlockManager.putBytes in Spark 2.x to handle d…
Browse files Browse the repository at this point in the history
…ifferent method signatures (intel#907)

* Use reflection to call BlockManager.putBytes in Spark 2.x to handle different method signatures

* Remove Spark 1.6, Spark 2.1.x, Cloudera profiles that shouldn't be necessary for compatibility with Spark versions

* Rename profile to spark_2.x per dding3 and make further related changes to reflect new profile name

* Per yiheng, precompute entire putBytes function instead of checking arg length at runtime
  • Loading branch information
srowen authored and yiheng-wang-intel committed May 25, 2017
1 parent 497d006 commit 787cfc1
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 62 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ matrix:
include:
# Covers Java 7, Spark 1.x
- jdk: openjdk7
env: SPARK=-Pspark_1.6
# Covers Java 8, Spark 2.x
- jdk: oraclejdk8
env: SPARK=-Pspark_2.1
env: SPARK=-Pspark_2.x
cache:
directories:
- $HOME/.m2
48 changes: 3 additions & 45 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -525,18 +525,12 @@
</reporting>

<profiles>
<!-- no-op profile for backwards-compatibility with build scripts, for now -->
<profile>
<id>spark_1.6</id>
<properties>
<spark-version.project>1.5-plus</spark-version.project>
<spark.version>1.6.0</spark.version>
<scala.major.version>2.10</scala.major.version>
<scala.version>2.10.5</scala.version>
<scala.macros.version>2.0.1</scala.macros.version>
</properties>
</profile>
<profile>
<id>spark_2.0</id>
<id>spark_2.x</id>
<properties>
<spark-version.project>2.0</spark-version.project>
<spark.version>2.0.0</spark.version>
Expand All @@ -545,26 +539,6 @@
<scala.macros.version>2.1.0</scala.macros.version>
</properties>
</profile>
<profile>
<id>spark_2.1</id>
<properties>
<spark-version.project>2.0</spark-version.project>
<spark.version>2.1.0</spark.version>
<scala.major.version>2.11</scala.major.version>
<scala.version>2.11.8</scala.version>
<scala.macros.version>2.1.0</scala.macros.version>
</properties>
</profile>
<profile>
<id>spark_2.1.1</id>
<properties>
<spark-version.project>2.0</spark-version.project>
<spark.version>2.1.1</spark.version>
<scala.major.version>2.11</scala.major.version>
<scala.version>2.11.8</scala.version>
<scala.macros.version>2.1.0</scala.macros.version>
</properties>
</profile>
<profile>
<id>scala_2.11</id>
<properties>
Expand All @@ -573,7 +547,7 @@
<scala.macros.version>2.1.0</scala.macros.version>
</properties>
</profile>
<!-- put this profile after spark_2.0 profile -->
<!-- put this profile after spark_2.x profile -->
<profile>
<id>scala_2.10</id>
<properties>
Expand All @@ -582,22 +556,6 @@
<scala.macros.version>2.0.1</scala.macros.version>
</properties>
</profile>
<profile>
<id>cloudera</id>
<properties>
<spark-version.project>2.0</spark-version.project>
<spark.version>2.0.0.cloudera2</spark.version>
<scala.major.version>2.11</scala.major.version>
<scala.version>2.11.8</scala.version>
<scala.macros.version>2.1.0</scala.macros.version>
</properties>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
</profile>
<profile>
<id>all-in-one</id>
<properties>
Expand Down
4 changes: 2 additions & 2 deletions pyspark/dl/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ This Python binding has been tested with Python 2.7 and Spark 1.6.0 / Spark 2.0.
## Installing on Ubuntu
1. Build BigDL
[Build Page](https://github.com/intel-analytics/BigDL/wiki/Build-Page)
* With Spark1.6: ``` $BIGDL_HOME/make-dist.sh ```
* With Spark2.0: ``` $BIGDL_HOME/make-dist.sh -P spark_2.0 ```
* With Spark 1.6: ``` $BIGDL_HOME/make-dist.sh ```
* With Spark 2.0 or later: ``` $BIGDL_HOME/make-dist.sh -P spark_2.x ```

2. Install python dependensies(if you're running cluster mode, you need to install them on client and each worker node):
* Installing Numpy:
Expand Down
2 changes: 1 addition & 1 deletion scripts/run.example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ eval set -- "$options"
while true; do
case $1 in
-p|--spark)
if [ "$2" == "spark_2.0" ]; then
if [ "$2" == "spark_2.x" ]; then
SPARK_DIR=$SPARK2_DIR
SPARK_LINK=$SPARK2_LINK
BIGDL_JAR=$BIGDL2_JAR
Expand Down
8 changes: 1 addition & 7 deletions spark/dl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,13 +248,7 @@

<profiles>
<profile>
<id>spark_2.0</id>
<properties>
<filesToExclude>""</filesToExclude> <!-- we do not want exclude anything here -->
</properties>
</profile>
<profile>
<id>spark_2.1</id>
<id>spark_2.x</id>
<properties>
<filesToExclude>""</filesToExclude> <!-- we do not want exclude anything here -->
</properties>
Expand Down
8 changes: 4 additions & 4 deletions spark/dl/src/test/integration-test.robot
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ Build SparkJar
Log To Console build jar finished

Spark2.0 Test Suite
Build SparkJar spark_2.0
Build SparkJar spark_2.x
Set Environment Variable SPARK_HOME /opt/work/spark-2.0.0-bin-hadoop2.7
${submit}= Catenate SEPARATOR=/ /opt/work/spark-2.0.0-bin-hadoop2.7/bin spark-submit
Run Shell ${submit} --master ${spark_200_3_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 150g --executor-cores 28 --total-executor-cores 84 --class com.intel.analytics.bigdl.models.lenet.Train ${jar_path} -f ${mnist_data_source} -b 336 -e 3

Spark2.1 Test Suite
Build SparkJar spark_2.1
Build SparkJar spark_2.x
Set Environment Variable SPARK_HOME /opt/work/spark-2.1.0-bin-hadoop2.7
${submit}= Catenate SEPARATOR=/ /opt/work/spark-2.1.0-bin-hadoop2.7/bin spark-submit
Run Shell ${submit} --master ${spark_210_3_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 150g --executor-cores 28 --total-executor-cores 84 --class com.intel.analytics.bigdl.models.lenet.Train ${jar_path} -f ${mnist_data_source} -b 336 -e 3
Expand All @@ -56,7 +56,7 @@ Hdfs Test Suite
Run Shell mvn clean test -Dsuites=com.intel.analytics.bigdl.integration.HdfsSpec -DhdfsMaster=${hdfs_264_3_master} -Dmnist=${mnist_data_source} -P integration-test -DforkMode=never

Yarn Test Suite
Build SparkJar spark_2.0
Build SparkJar spark_2.x
Set Environment Variable SPARK_HOME /opt/work/spark-2.0.0-bin-hadoop2.7
Set Environment Variable http_proxy http://child-prc.intel.com:913
Set Environment Variable https_proxy https://child-prc.intel.com:913
Expand All @@ -69,7 +69,7 @@ Yarn Test Suite


TensorFlow Spark2.1 Test Suite
Build SparkJar spark_2.1
Build SparkJar spark_2.x
Set Environment Variable SPARK_HOME /opt/work/spark-2.1.0-bin-hadoop2.7
${submit}= Catenate SEPARATOR=/ /opt/work/spark-2.1.0-bin-hadoop2.7/bin spark-submit
Run Shell ${submit} --master ${spark_tf_210_3_master} --conf "spark.serializer=org.apache.spark.serializer.JavaSerializer" --driver-memory 150g --executor-cores 28 --total-executor-cores 56 --py-files ${curdir}/dist/lib/bigdl-0.2.0-SNAPSHOT-python-api.zip --jars ${jar_path} --properties-file ${curdir}/dist/conf/spark-bigdl.conf ${curdir}/pyspark/dl/models/lenet/lenet5.py -b 224
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

package org.apache.spark.storage

import java.lang.{Boolean => JBoolean}
import java.nio.ByteBuffer

import org.apache.spark.SparkEnv
import org.apache.spark.util.io.ChunkedByteBuffer

import scala.reflect.ClassTag

object BlockManagerWrapper {

def putBytes( blockId: BlockId,
Expand All @@ -29,7 +32,7 @@ object BlockManagerWrapper {
require(bytes != null, "Bytes is null")
val blockManager = SparkEnv.get.blockManager
blockManager.removeBlock(blockId)
blockManager.putBytes(blockId, new ChunkedByteBuffer(bytes), level)
putBytesFn(blockId, new ChunkedByteBuffer(bytes), level)
}

def getLocal(blockId: BlockId): Option[BlockResult] = {
Expand Down Expand Up @@ -57,4 +60,44 @@ object BlockManagerWrapper {
blockInfoManager.unlock(blockId)
}
}

private val putBytesFn: (BlockId, ChunkedByteBuffer, StorageLevel) => Unit = {
val bmClass = classOf[BlockManager]
// Spark 2.0.0 - 2.1.0, and 2.2.0+ (as of this writing), declare the method:
// def putBytes[T: ClassTag](
// blockId: BlockId,
// bytes: ChunkedByteBuffer,
// level: StorageLevel,
// tellMaster: Boolean = true): Boolean
val putBytesMethod =
try {
bmClass.getMethod("putBytes",
classOf[BlockId], classOf[ChunkedByteBuffer], classOf[StorageLevel],
classOf[Boolean], classOf[ClassTag[_]])
} catch {
case _: NoSuchMethodException =>
// But Spark 2.1.1 and distros like Cloudera 2.0.0 / 2.1.0 had an extra boolean
// param:
// def putBytes[T: ClassTag](
// blockId: BlockId,
// bytes: ChunkedByteBuffer,
// level: StorageLevel,
// tellMaster: Boolean = true,
// encrypt: Boolean = false): Boolean
bmClass.getMethod("putBytes",
classOf[BlockId], classOf[ChunkedByteBuffer], classOf[StorageLevel],
classOf[Boolean], classOf[Boolean], classOf[ClassTag[_]])
}
putBytesMethod.getParameterTypes.length match {
case 5 =>
(blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel) =>
putBytesMethod.invoke(SparkEnv.get.blockManager,
blockId, bytes, level, JBoolean.TRUE, null)
case 6 =>
(blockId: BlockId, bytes: ChunkedByteBuffer, level: StorageLevel) =>
putBytesMethod.invoke(SparkEnv.get.blockManager,
blockId, bytes, level, JBoolean.TRUE, JBoolean.FALSE, null)
}
}

}

0 comments on commit 787cfc1

Please sign in to comment.