Skip to content

Commit

Permalink
fixed spark 2.0 incompatibility issue. Dataframe no longer exists in… (
Browse files Browse the repository at this point in the history
…sryza#162)

* fixed spark 2.0 incompatibility issue.  Dataframe no longer exists in Java API, now we use Dataset<Row>.

* updating travis ci and maven config for the transition from Spark 1.x to 2.x
  • Loading branch information
wgorman authored and sryza committed Sep 8, 2016
1 parent 8cba383 commit b75164a
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 34 deletions.
15 changes: 5 additions & 10 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,11 @@ script: mvn ${SPARK} ${SCALA} -Dmaven.javadoc.skip=true -q -B -Dgpg.skip=true ve
env:
global:
- MAVEN_OPTS=-Xmx2g
matrix:
include:
# Covers Spark 1.3.x
- env: SPARK=-Dspark.version=1.3.1
# Covers Spark 1.4.x
- env: SPARK=-Dspark.version=1.4.1
# Covers Spark 1.5.x
- env: SPARK=-Dspark.version=1.5.2
# Covers Scala 2.11, Spark 1.6.x
- env: SCALA=-Pscala-2.11 SPARK=-Dspark.version=1.6.0
# Use this to test future version of the 2.x Spark environment
# matrix:
# include:
# # Covers Spark 2.0.x
# - env: SPARK=-Dspark.version=2.0.0
cache:
directories:
- $HOME/.m2
19 changes: 6 additions & 13 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<scala.minor.version>2.10</scala.minor.version>
<scala.complete.version>${scala.minor.version}.4</scala.complete.version>
<spark.version>1.3.1</spark.version>
<breeze.version>0.10</breeze.version>
<scala.minor.version>2.11</scala.minor.version>
<scala.complete.version>${scala.minor.version}.8</scala.complete.version>
<spark.version>2.0.0</spark.version>
<breeze.version>0.12</breeze.version>
</properties>

<distributionManagement>
Expand Down Expand Up @@ -451,19 +451,12 @@

<profiles>
<profile>
<id>scala-2.11</id>
<id>scala-2.10</id>
<properties>
<scala.minor.version>2.11</scala.minor.version>
<scala.minor.version>2.10</scala.minor.version>
<scala.complete.version>${scala.minor.version}.6</scala.complete.version>
</properties>
</profile>
<profile>
<id>spark-1.6</id>
<properties>
<spark.version>1.6.1</spark.version>
<breeze.version>0.12</breeze.version>
</properties>
</profile>
</profiles>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.Tuple3;
import scala.reflect.ClassTag$;

Expand Down Expand Up @@ -88,10 +89,11 @@ public static <K> JavaTimeSeriesRDD<K> timeSeriesRDD(
*/
public static JavaTimeSeriesRDD<String> timeSeriesRDDFromObservations(
DateTimeIndex targetIndex,
DataFrame df,
Dataset<Row> df,
String tsCol,
String keyCol,
String valueCol) {

return JAVA_TIME_SERIES_RDD.javaTimeSeriesRDDFromObservations(
targetIndex,
df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.{TaskContext, Partition}
import org.apache.spark.api.java.{JavaSparkContext, JavaRDD, JavaPairRDD}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.{RowMatrix, IndexedRowMatrix}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SQLContext}
import org.apache.spark.util.StatCounter

import scala.reflect.ClassTag
Expand Down Expand Up @@ -280,7 +280,7 @@ object JavaTimeSeriesRDD {
*/
def javaTimeSeriesRDDFromObservations(
targetIndex: DateTimeIndex,
df: DataFrame,
df: Dataset[Row],
tsCol: String,
keyCol: String,
valueCol: String): JavaTimeSeriesRDD[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.spark.mllib.linalg.distributed.IndexedRow;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Row$;
import org.apache.spark.sql.SQLContext;
Expand Down Expand Up @@ -190,8 +190,8 @@ public void testToInstantsDataFrame() {
JavaTimeSeriesRDD<String> tsRdd = JavaTimeSeriesRDDFactory.timeSeriesRDD(
index, rdd);

DataFrame samplesDF = tsRdd.toInstantsDataFrame(sqlContext);
Row[] sampleRows = samplesDF.collect();
Dataset<Row> samplesDF = tsRdd.toInstantsDataFrame(sqlContext);
Row[] sampleRows = (Row[])samplesDF.collect();
String[] columnNames = samplesDF.columns();
String[] columnNamesTail = new String[columnNames.length - 1];
System.arraycopy(columnNames, 1, columnNamesTail, 0, columnNamesTail.length);
Expand All @@ -205,7 +205,7 @@ public void testToInstantsDataFrame() {
rowFrom(Timestamp.from(start.plusDays(1).toInstant()), untilBy(1, 20, 4)),
rowFrom(Timestamp.from(start.plusDays(2).toInstant()), untilBy(2, 20, 4)),
rowFrom(Timestamp.from(start.plusDays(3).toInstant()), untilBy(3, 20, 4))
}, sampleRows);
}, (Row[])sampleRows);

sc.close();
}
Expand Down Expand Up @@ -337,7 +337,7 @@ public void testTimeSeriesRDDFromObservationsDataFrame() {
JavaTimeSeriesRDD<String> tsRdd = JavaTimeSeriesRDDFactory.timeSeriesRDD(
index, rdd);

DataFrame obsDF = tsRdd.toObservationsDataFrame(sqlContext, "timestamp", "key", "value");
Dataset<Row> obsDF = tsRdd.toObservationsDataFrame(sqlContext, "timestamp", "key", "value");
JavaTimeSeriesRDD<String> tsRddFromDF = JavaTimeSeriesRDDFactory
.timeSeriesRDDFromObservations(
index, obsDF, "timestamp", "key", "value");
Expand All @@ -348,8 +348,8 @@ public void testTimeSeriesRDDFromObservationsDataFrame() {
tsRddFromDF.sortByKey().collect().toArray()
);

Row[] df1 = obsDF.collect();
Row[] df2 = tsRddFromDF.toObservationsDataFrame(
Row[] df1 = (Row[])obsDF.collect();
Row[] df2 = (Row[])tsRddFromDF.toObservationsDataFrame(
sqlContext, "timestamp", "key", "value").collect();

Comparator<Row> comparator = (r1, r2) -> {
Expand Down

0 comments on commit b75164a

Please sign in to comment.