Skip to content

Commit

Permalink
Merge pull request byzer-org#191 from allwefantasy/mlsql
Browse files Browse the repository at this point in the history
insert/create 语法支持变量替换
  • Loading branch information
allwefantasy authored Apr 28, 2018
2 parents 0c56536 + 3e5946f commit 9ee05fd
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package streaming.dsl
import org.antlr.v4.runtime.misc.Interval
import streaming.dsl.parser.DSLSQLLexer
import streaming.dsl.parser.DSLSQLParser.SqlContext
import streaming.dsl.template.TemplateMerge

/**
* Created by allwefantasy on 27/8/2017.
Expand All @@ -14,6 +15,7 @@ class CreateAdaptor(scriptSQLExecListener: ScriptSQLExecListener) extends DslAda
val stop = ctx.stop.getStopIndex()
val interval = new Interval(start, stop)
val originalText = input.getText(interval)
scriptSQLExecListener.sparkSession.sql(originalText).count()
val sql = TemplateMerge.merge(originalText, scriptSQLExecListener.env().toMap)
scriptSQLExecListener.sparkSession.sql(sql).count()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package streaming.dsl
import org.antlr.v4.runtime.misc.Interval
import streaming.dsl.parser.DSLSQLLexer
import streaming.dsl.parser.DSLSQLParser.SqlContext
import streaming.dsl.template.TemplateMerge

/**
* Created by allwefantasy on 27/8/2017.
Expand All @@ -14,6 +15,7 @@ class InsertAdaptor(scriptSQLExecListener: ScriptSQLExecListener) extends DslAda
val stop = ctx.stop.getStopIndex()
val interval = new Interval(start, stop)
val originalText = input.getText(interval)
scriptSQLExecListener.sparkSession.sql(originalText).count()
val sql = TemplateMerge.merge(originalText, scriptSQLExecListener.env().toMap)
scriptSQLExecListener.sparkSession.sql(sql).count()
}
}
19 changes: 19 additions & 0 deletions streamingpro-spark-2.0/src/test/java/streaming/core/DslSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,23 @@ class DslSpec extends BasicSparkOperation {
}
}

// "insert with variable" should "work fine" in {
//
// withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { runtime: SparkRuntime =>
// //执行sql
// implicit val spark = runtime.sparkSession
//
// var sq = createSSEL
// sq = createSSEL
// ScriptSQLExec.parse("select \"a\" as a,\"b\" as b\n,\"c\" as c\nas tod_boss_dashboard_sheet_1;", sq)
//
// sq = createSSEL
// ScriptSQLExec.parse("set hive.exec.dynamic.partition.mode=nonstric options type = \"conf\" ;" +
// "set HADOOP_DATE_YESTERDAY=`2017-01-02` ;" +
// "INSERT OVERWRITE TABLE default.abc partition (hp_stat_date = '${HADOOP_DATE_YESTERDAY}') " +
// "select * from tod_boss_dashboard_sheet_1;", sq)
//
// }
// }

}

0 comments on commit 9ee05fd

Please sign in to comment.