Skip to content

Commit

Permalink
[SPARK-33102][SQL] Use stringToSeq on SQL list typed parameters
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
While I've implemented JDBC provider disable functionality it has been popped up [here](apache#29964 (comment)) that `Utils.stringToSeq` must be used when String list type SQL parameter handled. In this PR I've fixed the problematic parameters.

### Why are the changes needed?
`Utils.stringToSeq` must be used when String list type SQL parameter handled.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing unit tests.

Closes apache#29989 from gaborgsomogyi/SPARK-33102.

Authored-by: Gabor Somogyi <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
gaborgsomogyi authored and HyukjinKwon committed Oct 10, 2020
1 parent 018811f commit 1e63dcc
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relat
import org.apache.spark.sql.execution.streaming.sources.WriteToMicroBatchDataSource
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.util.Clock
import org.apache.spark.util.{Clock, Utils}

class MicroBatchExecution(
sparkSession: SparkSession,
Expand Down Expand Up @@ -76,7 +76,7 @@ class MicroBatchExecution(
// transformation is responsible for replacing attributes with their final values.

val disabledSources =
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")
Utils.stringToSeq(sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders)

import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
val _logicalPlan = analyzedPlan.transform {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils

/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
Expand Down Expand Up @@ -366,7 +367,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
startQuery(sink, extraOptions)
} else {
val cls = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
val disabledSources =
Utils.stringToSeq(df.sparkSession.sqlContext.conf.disabledV2StreamingWriters)
val useV1Source = disabledSources.contains(cls.getCanonicalName) ||
// file source v2 does not support streaming yet.
classOf[FileDataSourceV2].isAssignableFrom(cls)
Expand Down

0 comments on commit 1e63dcc

Please sign in to comment.