Skip to content

Commit

Permalink
Fix mo stream bugs (matrixorigin#13117)
Browse files Browse the repository at this point in the history
1. Fix the data conversion bugs
2. Fix drop source bug
3. Fix options bugs when creating dt with options

Approved by: @iamlinjunhong, @daviszhen, @ouyuanning, @nnsgmsone, @sukki37
  • Loading branch information
gavinyue authored Dec 4, 2023
1 parent fd9cf7a commit 87b1ce9
Show file tree
Hide file tree
Showing 13 changed files with 555 additions and 116 deletions.
10 changes: 9 additions & 1 deletion pkg/frontend/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,17 @@ func (mce *MysqlCmdExecutor) handleCreateDynamicTable(ctx context.Context, st *t
return moerr.NewNoSuchTable(ctx, dbName, tableName)
}
options := make(map[string]string)
for _, option := range st.DTOptions {
switch opt := option.(type) {
case *tree.CreateSourceWithOption:
key := string(opt.Key)
val := opt.Val.(*tree.NumVal).OrigString()
options[key] = val
}
}

ses := mce.GetSession()

//get query optimizer and execute Optimize
generatedPlan, err := buildPlan(ctx, ses, ses.GetTxnCompileCtx(), st.AsSource)
if err != nil {
return err
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/compile/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,7 +1427,10 @@ func (s *Scope) DropTable(c *Compile) error {
dbName := qry.GetDatabase()
tblName := qry.GetTable()
isView := qry.GetIsView()

var isSource = false
if qry.TableDef != nil {
isSource = qry.TableDef.TableType == catalog.SystemSourceRel
}
var dbSource engine.Database
var rel engine.Relation
var err error
Expand Down Expand Up @@ -1462,7 +1465,7 @@ func (s *Scope) DropTable(c *Compile) error {
isTemp = true
}

if !isTemp && !isView && c.proc.TxnOperator.Txn().IsPessimistic() {
if !isTemp && !isView && !isSource && c.proc.TxnOperator.Txn().IsPessimistic() {
var err error
if e := lockMoTable(c, dbName, tblName, lock.LockMode_Exclusive); e != nil {
if !moerr.IsMoErrCode(e, moerr.ErrTxnNeedRetry) &&
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/parsers/dialect/mysql/mysql_sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/parsers/dialect/mysql/mysql_sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -6442,7 +6442,7 @@ create_table_stmt:
IfNotExists: $4,
Table: *$5,
AsSource: $7,
Options: $8,
DTOptions: $8,
}
}
load_param_opt_2:
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/parsers/dialect/mysql/mysql_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,9 @@ var (
}, {
input: "create dynamic table t as select a from t1",
output: "create dynamic table t as select a from t1",
}, {
input: "create dynamic table t as select a from t1 with (\"type\"='kafka')",
output: "create dynamic table t as select a from t1 with (type = kafka)",
}, {
input: "create external table t (a int) infile 'data.txt'",
output: "create external table t (a int) infile 'data.txt'",
Expand Down
13 changes: 12 additions & 1 deletion pkg/sql/parsers/tree/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ type CreateTable struct {
Param *ExternParam
AsSource *Select
IsDynamicTable bool
DTOptions []TableOption
}

func (node *CreateTable) Format(ctx *FmtCtx) {
Expand Down Expand Up @@ -185,6 +186,16 @@ func (node *CreateTable) Format(ctx *FmtCtx) {
if node.IsDynamicTable {
ctx.WriteString(" as ")
node.AsSource.Format(ctx)

if node.DTOptions != nil {
prefix := " with ("
for _, t := range node.DTOptions {
ctx.WriteString(prefix)
t.Format(ctx)
prefix = ", "
}
ctx.WriteByte(')')
}
} else {

ctx.WriteString(" (")
Expand All @@ -198,7 +209,7 @@ func (node *CreateTable) Format(ctx *FmtCtx) {
ctx.WriteByte(')')
}

if node.Options != nil {
if node.Options != nil && !node.IsDynamicTable {
prefix := " "
for _, t := range node.Options {
ctx.WriteString(prefix)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/plan/explain/explain_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (ndesc *NodeDescribeImpl) GetNodeBasicInfo(ctx context.Context, options *Ex
case plan.Node_EXTERNAL_SCAN:
pname = ExternalScan
case plan.Node_SOURCE_SCAN:
pname = "Stream Scan"
pname = "Source Scan"
case plan.Node_MATERIAL_SCAN:
pname = "Material Scan"
case plan.Node_PROJECT:
Expand Down
Loading

0 comments on commit 87b1ce9

Please sign in to comment.