Skip to content

Commit

Permalink
[SPARK-23096][SS] Migrate rate source to V2
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR migrate micro batch rate source to V2 API and rewrite UTs to suite V2 test.

## How was this patch tested?

UTs.

Author: jerryshao <[email protected]>

Closes apache#20688 from jerryshao/SPARK-23096.
  • Loading branch information
jerryshao authored and tdas committed Mar 27, 2018
1 parent 35997b5 commit c68ec4e
Show file tree
Hide file tree
Showing 10 changed files with 715 additions and 844 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,5 @@ org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
org.apache.spark.sql.execution.datasources.text.TextFileFormat
org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
org.apache.spark.sql.execution.streaming.RateSourceProvider
org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.streaming.OutputMode
Expand Down Expand Up @@ -566,6 +566,7 @@ object DataSource extends Logging {
val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName
val rate = classOf[RateStreamProvider].getCanonicalName

Map(
"org.apache.spark.sql.jdbc" -> jdbc,
Expand All @@ -587,7 +588,8 @@ object DataSource extends Logging {
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv,
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
)
}

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.json4s.jackson.Serialization

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
Expand All @@ -40,8 +40,8 @@ class RateStreamContinuousReader(options: DataSourceOptions)

val creationTime = System.currentTimeMillis()

val numPartitions = options.get(RateStreamSourceV2.NUM_PARTITIONS).orElse("5").toInt
val rowsPerSecond = options.get(RateStreamSourceV2.ROWS_PER_SECOND).orElse("6").toLong
val numPartitions = options.get(RateStreamProvider.NUM_PARTITIONS).orElse("5").toInt
val rowsPerSecond = options.get(RateStreamProvider.ROWS_PER_SECOND).orElse("6").toLong
val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble

override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
Expand All @@ -57,12 +57,12 @@ class RateStreamContinuousReader(options: DataSourceOptions)
RateStreamOffset(Serialization.read[Map[Int, ValueRunTimeMsPair]](json))
}

override def readSchema(): StructType = RateSourceProvider.SCHEMA
override def readSchema(): StructType = RateStreamProvider.SCHEMA

private var offset: Offset = _

override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime))
this.offset = offset.orElse(createInitialOffset(numPartitions, creationTime))
}

override def getStartOffset(): Offset = offset
Expand Down Expand Up @@ -98,6 +98,19 @@ class RateStreamContinuousReader(options: DataSourceOptions)
override def commit(end: Offset): Unit = {}
override def stop(): Unit = {}

private def createInitialOffset(numPartitions: Int, creationTimeMs: Long) = {
RateStreamOffset(
Range(0, numPartitions).map { i =>
// Note that the starting offset is exclusive, so we have to decrement the starting value
// by the increment that will later be applied. The first row output in each
// partition will have a value equal to the partition index.
(i,
ValueRunTimeMsPair(
(i - numPartitions).toLong,
creationTimeMs))
}.toMap)
}

}

case class RateStreamContinuousDataReaderFactory(
Expand Down
Loading

0 comments on commit c68ec4e

Please sign in to comment.