Skip to content

Commit

Permalink
Merge pull request byzer-org#520 from allwefantasy/mlsql
Browse files Browse the repository at this point in the history
dsl grammar test
  • Loading branch information
allwefantasy authored Sep 11, 2018
2 parents 43e689d + 0867a2d commit a11e646
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 1 deletion.
95 changes: 95 additions & 0 deletions streamingpro-mlsql/src/main/java/streaming/dsl/ScriptSQLExec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,101 @@ object ScriptSQLExec extends Logging with WowLog {
}
}

class GrammarProcessListener(scriptSQLExecListener: ScriptSQLExecListener) extends DSLSQLListener {
override def exitSql(ctx: SqlContext): Unit = {
}

override def enterStatement(ctx: StatementContext): Unit = {}

override def exitStatement(ctx: StatementContext): Unit = {}

override def enterSql(ctx: SqlContext): Unit = {}

override def enterFormat(ctx: FormatContext): Unit = {}

override def exitFormat(ctx: FormatContext): Unit = {}

override def enterPath(ctx: PathContext): Unit = {}

override def exitPath(ctx: PathContext): Unit = {}

override def enterTableName(ctx: TableNameContext): Unit = {}

override def exitTableName(ctx: TableNameContext): Unit = {}

override def enterCol(ctx: ColContext): Unit = {}

override def exitCol(ctx: ColContext): Unit = {}

override def enterQualifiedName(ctx: QualifiedNameContext): Unit = {}

override def exitQualifiedName(ctx: QualifiedNameContext): Unit = {}

override def enterIdentifier(ctx: IdentifierContext): Unit = {}

override def exitIdentifier(ctx: IdentifierContext): Unit = {}

override def enterStrictIdentifier(ctx: StrictIdentifierContext): Unit = {}

override def exitStrictIdentifier(ctx: StrictIdentifierContext): Unit = {}

override def enterQuotedIdentifier(ctx: QuotedIdentifierContext): Unit = {}

override def exitQuotedIdentifier(ctx: QuotedIdentifierContext): Unit = {}

override def visitTerminal(node: TerminalNode): Unit = {}

override def visitErrorNode(node: ErrorNode): Unit = {}

override def exitEveryRule(ctx: ParserRuleContext): Unit = {}

override def enterEveryRule(ctx: ParserRuleContext): Unit = {}

override def enterEnder(ctx: EnderContext): Unit = {}

override def exitEnder(ctx: EnderContext): Unit = {}

override def enterExpression(ctx: ExpressionContext): Unit = {}

override def exitExpression(ctx: ExpressionContext): Unit = {}

override def enterBooleanExpression(ctx: BooleanExpressionContext): Unit = {}

override def exitBooleanExpression(ctx: BooleanExpressionContext): Unit = {}

override def enterDb(ctx: DbContext): Unit = {}

override def exitDb(ctx: DbContext): Unit = {}

override def enterOverwrite(ctx: OverwriteContext): Unit = {}

override def exitOverwrite(ctx: OverwriteContext): Unit = {}

override def enterAppend(ctx: AppendContext): Unit = {}

override def exitAppend(ctx: AppendContext): Unit = {}

override def enterErrorIfExists(ctx: ErrorIfExistsContext): Unit = {}

override def exitErrorIfExists(ctx: ErrorIfExistsContext): Unit = {}

override def enterIgnore(ctx: IgnoreContext): Unit = {}

override def exitIgnore(ctx: IgnoreContext): Unit = {}

override def enterFunctionName(ctx: FunctionNameContext): Unit = {}

override def exitFunctionName(ctx: FunctionNameContext): Unit = {}

override def enterSetValue(ctx: SetValueContext): Unit = {}

override def exitSetValue(ctx: SetValueContext): Unit = {}

override def enterSetKey(ctx: SetKeyContext): Unit = {}

override def exitSetKey(ctx: SetKeyContext): Unit = {}
}

class PreProcessListener(scriptSQLExecListener: ScriptSQLExecListener) extends DSLSQLListener {
private val _includes = new scala.collection.mutable.HashMap[String, String]

Expand Down
20 changes: 19 additions & 1 deletion streamingpro-mlsql/src/test/scala/streaming/core/DslSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import net.sf.json.JSONObject
import org.apache.commons.io.FileUtils
import org.apache.spark.streaming.BasicSparkOperation
import streaming.core.strategy.platform.SparkRuntime
import streaming.dsl.ScriptSQLExec
import streaming.dsl.{GrammarProcessListener, MLSQLExecuteContext, ScriptSQLExec, ScriptSQLExecListener}
import streaming.dsl.template.TemplateMerge

/**
Expand Down Expand Up @@ -812,6 +812,24 @@ class DslSpec extends BasicSparkOperation with SpecFunctions with BasicMLSQLConf
}
}

"save-partitionby" should "work fine" in {

withBatchContext(setupBatchContext(batchParams, "classpath:///test/empty.json")) { runtime: SparkRuntime =>
//执行sql
implicit val spark = runtime.sparkSession

val ssel = createSSEL
val sq = new GrammarProcessListener(ssel)
withClue("MLSQL Parser error : mismatched input 'save1' expecting {<EOF>, 'load', 'LOAD', 'save', 'SAVE', 'select', 'SELECT', 'insert', 'INSERT', 'create', 'CREATE', 'drop', 'DROP', 'refresh', 'REFRESH', 'set', 'SET', 'connect', 'CONNECT', 'train', 'TRAIN', 'run', 'RUN', 'register', 'REGISTER', 'unRegister', 'UNREGISTER', 'include', 'INCLUDE', SIMPLE_COMMENT}") {
assertThrows[RuntimeException] {
ScriptSQLExec.parse("save1 append skone_task_log\nas parquet.`${data_monitor_skone_task_log_2_parquet_data_path}`\noptions mode = \"Append\"\nand duration = \"10\"\nand checkpointLocation = \"${data_monitor_skone_task_log_2_parquet_checkpoint_path}\"\npartitionBy hp_stat_date;", sq)
}
}
ScriptSQLExec.parse("save append skone_task_log\nas parquet.`${data_monitor_skone_task_log_2_parquet_data_path}`\noptions mode = \"Append\"\nand duration = \"10\"\nand checkpointLocation = \"${data_monitor_skone_task_log_2_parquet_checkpoint_path}\"\npartitionBy hp_stat_date;", sq)

}
}

}


Expand Down

0 comments on commit a11e646

Please sign in to comment.