Skip to content

Commit

Permalink
Saving to/Loading from dataframes (sryza#187)
Browse files Browse the repository at this point in the history
* added locAtOrBeforeDateTime and locAtOrAfterDateTime with associated unit tests

* Checking in compilation error fix

* fixed bugs in hybrid locAtOrAfterDateTime + submitted a unit test for hybrid locAtOrBeforeDateTime/locAtOrAfterDateTime

* Added TimeSeriesRDD.saveAsParquetDataFrame and timeSeriesRDDFromParquet

* code style fixes and added comment to saveAsParquetDataFrame function.
  • Loading branch information
souellette-faimdata authored and sryza committed Jan 10, 2017
1 parent 6bd8a3f commit cc2b18d
Showing 1 changed file with 59 additions and 1 deletion.
60 changes: 59 additions & 1 deletion src/main/scala/com/cloudera/sparkts/TimeSeriesRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix,
import org.apache.spark.mllib.linalg.{DenseMatrix, DenseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql._
import org.apache.spark.util.StatCounter

import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -475,6 +475,48 @@ class TimeSeriesRDD[K](val index: DateTimeIndex, parent: RDD[(K, Vector)])
ps.close()
}

/**
* Writes out the contents of this TimeSeriesRDD to a parquet file in the provided path
* with an accompanying file in the same directory including the time index.
*
* Because TimeSeriesRDDs are structured such that each element of the RDD is a (key, column)
* pair, the parquet file will have the same column-based representation.
*
* To instantiate a TimeSeriesRDD from a file written using this method, call
* TimeSeriesRDD.timeSeriesRDDFromParquet().
*
* @param path: full HDFS path to the file to save to. The datetime index file will be
* saved to the same path, but with a ".idx" extension appended to it.
*
* @param spark: your current SparkSession instance.
*/
def saveAsParquetDataFrame(path: String, spark: SparkSession): Unit = {
// Write out contents
import spark.implicits._
spark.sqlContext.setConf("spark.sql.parquet.compression.codec.", "snappy")

// NOTE: toDF() doesn't work with generic types, need to force String type (which should
// encompass most other types, even complex ones if toString() is implemented properly)
val df = parent.map(pair => {
if (pair == null) {
("", Vectors.dense(Array[Double]()))
} else {
if (pair._1 == null) {
("", Vectors.dense(Array[Double]()))
} else if (pair._2 == null) {
("", Vectors.dense(Array[Double]()))
} else {
(pair._1.toString(), Vectors.dense(pair._2.toArray))
}
}
}).toDF()

df.write.mode(SaveMode.Overwrite).parquet(path)

// Write out time index
spark.sparkContext.parallelize(Array(index.toString())).saveAsTextFile(path + ".idx")
}

/**
* Returns a TimeSeriesRDD rebased on top of a new index. Any timestamps that exist in the new
* index but not in the existing index will be filled in with NaNs. [[resample]] offers similar
Expand Down Expand Up @@ -688,6 +730,22 @@ object TimeSeriesRDD {
new TimeSeriesRDD[String](dtIndex, rdd)
}

/**
* Loads a TimeSeriesRDD from a parquet file and a date-time index.
*/
def timeSeriesRDDFromParquet(path: String, spark: SparkSession) = {
val df = spark.read.parquet(path)

import spark.implicits._
val parent = df.map(row => (row(0).toString(), Vectors.dense(row(1).asInstanceOf[Vector].toArray)))

// Write out time index
val textDateTime: String = spark.sparkContext.textFile(path + ".idx").collect().head
val index = DateTimeIndex.fromString(textDateTime)

new TimeSeriesRDD[String](index, parent.rdd)
}

/**
* Creates a TimeSeriesRDD from rows in a binary format that Python can write to.
* Not a public API. For use only by the Python API.
Expand Down

0 comments on commit cc2b18d

Please sign in to comment.