Skip to content

Commit

Permalink
[DataSource][add] add default behavior for stream datasource.
Browse files Browse the repository at this point in the history
people can easy add a new stream datasource by implement `org.apache.spark.sql.execution.streaming.Source`,
 and than add option `stramn.source` in mlsql script.

here is an example:
```sql
set streamName = "test";
load fchen.`` options stream.source = "true"
and provider = "xxx.xxx.SourceProvider"
as table1;
```
  • Loading branch information
cfmcgrady committed Jan 11, 2019
1 parent 03e6e15 commit 856126a
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LoadAdaptor(scriptSQLExecListener: ScriptSQLExecListener) extends DslAdapt
}


if (format.startsWith("kafka") || format.startsWith("mockStream")) {
if (format.startsWith("kafka") || format.startsWith("mockStream") || option.contains("stream.source")) {
scriptSQLExecListener.addEnv("stream", "true")
new StreamLoadAdaptor(scriptSQLExecListener, option, path, tableName, format).parse
} else {
Expand Down Expand Up @@ -185,6 +185,8 @@ class StreamLoadAdaptor(scriptSQLExecListener: ScriptSQLExecListener,
val format = "org.apache.spark.sql.execution.streaming.mock.MockStreamSourceProvider"
table = reader.format(format).options(option + ("path" -> cleanStr(path))).load()
case _ =>
val provider = option.getOrElse("provider", format)
table = reader.format(provider).options(option).load()
}
table = withWaterMark(table, option)

Expand Down

0 comments on commit 856126a

Please sign in to comment.