Skip to content

Commit

Permalink
Merge pull request #362 from InterestingLab/rickyhuo.fixbug.streaming
Browse files Browse the repository at this point in the history
Fix a bug of streaming pipeline when register table view
  • Loading branch information
garyelephant authored Aug 12, 2019
2 parents 167d639 + 3f79832 commit 433560a
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 6 deletions.
1 change: 1 addition & 0 deletions config/structuredstreaming.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ input {
kafkaStream {
consumer.bootstrap.servers = "127.0.0.1:9092"
topics = "waterdrop_input"
consumer.group.id = "waterdrop_input_group_id"
result_table_name = "waterdrop_message"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ object Waterdrop extends Logging {
for (f <- filters) {
if (ds.take(1).length > 0) {
ds = filterProcess(sparkSession, f, ds)
registerFilterTempView(f, ds)
registerStreamingFilterTempView(f, ds)
}
}

Expand Down Expand Up @@ -293,7 +293,7 @@ object Waterdrop extends Logging {
}
}

private[waterdrop] def registerInputTempView(input: Plugin, ds: Dataset[Row]): Unit = {
private[waterdrop] def registerInputTempView(input: BaseStaticInput, ds: Dataset[Row]): Unit = {
val config = input.getConfig()
config.hasPath("table_name") || config.hasPath("result_table_name") match {
case true => {
Expand All @@ -316,6 +316,29 @@ object Waterdrop extends Logging {
}
}

private[waterdrop] def registerInputTempView(input: BaseStreamingInput[Any], ds: Dataset[Row]): Unit = {
val config = input.getConfig()
config.hasPath("table_name") || config.hasPath("result_table_name") match {
case true => {
val tableName = config.hasPath("table_name") match {
case true => {
@deprecated
val oldTableName = config.getString("table_name")
oldTableName
}
case false => config.getString("result_table_name")
}
registerStreamingTempView(tableName, ds)
}

case false => {
throw new ConfigRuntimeException(
"Plugin[" + input.name + "] must be registered as dataset/table, please set \"result_table_name\" config")

}
}
}

private[waterdrop] def registerFilterTempView(plugin: Plugin, ds: Dataset[Row]): Unit = {
val config = plugin.getConfig()
if (config.hasPath("result_table_name")) {
Expand All @@ -324,6 +347,14 @@ object Waterdrop extends Logging {
}
}

private[waterdrop] def registerStreamingFilterTempView(plugin: BaseFilter, ds: Dataset[Row]): Unit = {
val config = plugin.getConfig()
if (config.hasPath("result_table_name")) {
val tableName = config.getString("result_table_name")
registerStreamingTempView(tableName, ds)
}
}

private[waterdrop] def registerTempView(tableName: String, ds: Dataset[Row]): Unit = {
viewTableMap.contains(tableName) match {
case true =>
Expand All @@ -337,6 +368,10 @@ object Waterdrop extends Logging {
}
}

private[waterdrop] def registerStreamingTempView(tableName: String, ds: Dataset[Row]): Unit = {
ds.createOrReplaceTempView(tableName)
}

private[waterdrop] def deployModeCheck(): Unit = {
Common.getDeployMode match {
case Some(m) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object WaterdropStructuredStreaming extends Logging {

val datasetList = structuredStreamingInputs.map(p => {
val ds = p.getDataset(sparkSession)
Waterdrop.registerInputTempView(p, ds)
registerInputTempView(p, ds)
ds
})

Expand All @@ -107,7 +107,7 @@ object WaterdropStructuredStreaming extends Logging {
var ds: Dataset[Row] = datasetList.get(0)
for (f <- filters) {
ds = Waterdrop.filterProcess(sparkSession, f, ds)
Waterdrop.registerFilterTempView(f, ds)
Waterdrop.registerStreamingFilterTempView(f, ds)
}

var streamingQueryList = List[StreamingQuery]()
Expand All @@ -122,6 +122,29 @@ object WaterdropStructuredStreaming extends Logging {
}
}

private def registerInputTempView(input: BaseStructuredStreamingInput, ds: Dataset[Row]): Unit = {
val config = input.getConfig()
config.hasPath("table_name") || config.hasPath("result_table_name") match {
case true => {
val tableName = config.hasPath("table_name") match {
case true => {
@deprecated
val oldTableName = config.getString("table_name")
oldTableName
}
case false => config.getString("result_table_name")
}
Waterdrop.registerStreamingTempView(tableName, ds)
}

case false => {
throw new ConfigRuntimeException(
"Plugin[" + input.name + "] must be registered as dataset/table, please set \"result_table_name\" config")

}
}
}

private def structuredStreamingOutputProcess(
sparkSession: SparkSession,
output: BaseStructuredStreamingOutputIntra,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ class Sql extends BaseFilter {
}

override def checkConfig(): (Boolean, String) = {
conf.hasPath("sql") match {
conf.hasPath("table_name") match {
case true => {
if (conf.hasPath("sql")) {
if (conf.hasPath("table_name")) {
logWarning("parameter [table_name] is deprecated since 1.4")
}
(true, "")
Expand Down

0 comments on commit 433560a

Please sign in to comment.