Skip to content

Commit

Permalink
support UPDATE and DELETE in new plan (pingcap#1582)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored Aug 19, 2016
1 parent 6b77129 commit 6e87814
Show file tree
Hide file tree
Showing 18 changed files with 508 additions and 34 deletions.
4 changes: 4 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildDeallocate(v)
case *plan.Delete:
return b.buildDelete(v)
case *plan.NewDelete:
return b.buildNewDelete(v)
case *plan.Distinct:
return b.buildDistinct(v)
case *plan.Execute:
Expand Down Expand Up @@ -112,6 +114,8 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildNewUnion(v)
case *plan.Update:
return b.buildUpdate(v)
case *plan.NewUpdate:
return b.buildNewUpdate(v)
case *plan.PhysicalHashJoin:
return b.buildJoin(v)
case *plan.PhysicalHashSemiJoin:
Expand Down
10 changes: 5 additions & 5 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type Row struct {
RowKeys []*RowKeyEntry
}

// RowKeyEntry is designed for Delete statement in multi-table mode,
// RowKeyEntry is designed for Update/Delete statement in multi-table mode,
// we should know which table this row comes from.
type RowKeyEntry struct {
// The table which this row come from.
Expand Down Expand Up @@ -817,7 +817,7 @@ type FilterExec struct {

// Schema implements Executor Schema interface.
func (e *FilterExec) Schema() expression.Schema {
return nil
return e.Src.Schema()
}

// Fields implements Executor Fields interface.
Expand Down Expand Up @@ -965,7 +965,7 @@ type SortExec struct {

// Schema implements Executor Schema interface.
func (e *SortExec) Schema() expression.Schema {
return nil
return e.Src.Schema()
}

// Fields implements Executor Fields interface.
Expand Down Expand Up @@ -1095,7 +1095,7 @@ type AggregateExec struct {

// Schema implements Executor Schema interface.
func (e *AggregateExec) Schema() expression.Schema {
return nil
return e.Src.Schema()
}

// Fields implements Executor Fields interface.
Expand Down Expand Up @@ -1327,7 +1327,7 @@ type ReverseExec struct {

// Schema implements Executor Schema interface.
func (e *ReverseExec) Schema() expression.Schema {
return nil
return e.Src.Schema()
}

// Fields implements Executor Fields interface.
Expand Down
70 changes: 47 additions & 23 deletions executor/executor_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (e *UpdateExec) Next() (*Row, error) {
e.fetched = true
}

columns, err := getUpdateColumns(e.OrderedList)
assignFlag, err := getUpdateColumns(e.OrderedList)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -92,7 +92,7 @@ func (e *UpdateExec) Next() (*Row, error) {
continue
}
// Update row
err1 := updateRecord(e.ctx, handle, oldData, newTableData, columns, tbl, offset, false)
err1 := updateRecord(e.ctx, handle, oldData, newTableData, assignFlag, tbl, offset, false)
if err1 != nil {
return nil, errors.Trace(err1)
}
Expand All @@ -102,12 +102,16 @@ func (e *UpdateExec) Next() (*Row, error) {
return &Row{}, nil
}

func getUpdateColumns(assignList []*ast.Assignment) (map[int]*ast.Assignment, error) {
m := make(map[int]*ast.Assignment, len(assignList))
func getUpdateColumns(assignList []*ast.Assignment) ([]bool, error) {
assignFlag := make([]bool, len(assignList))
for i, v := range assignList {
m[i] = v
if v != nil {
assignFlag[i] = true
} else {
assignFlag[i] = false
}
}
return m, nil
return assignFlag, nil
}

func (e *UpdateExec) fetchRows() error {
Expand Down Expand Up @@ -156,14 +160,13 @@ func (e *UpdateExec) getTableOffset(t table.Table) int {
return 0
}

func updateRecord(ctx context.Context, h int64, oldData, newData []types.Datum, updateColumns map[int]*ast.Assignment, t table.Table, offset int, onDuplicateUpdate bool) error {
func updateRecord(ctx context.Context, h int64, oldData, newData []types.Datum, assignFlag []bool, t table.Table, offset int, onDuplicateUpdate bool) error {
cols := t.Cols()
touched := make(map[int]bool, len(cols))

assignExists := false
var newHandle types.Datum
for i, asgn := range updateColumns {
if asgn == nil {
for i, hasSetExpr := range assignFlag {
if !hasSetExpr {
continue
}
if i < offset || i >= offset+len(cols) {
Expand Down Expand Up @@ -302,21 +305,34 @@ func (e *DeleteExec) Next() (*Row, error) {
tblNames := make(map[string]string)
if e.IsMultiTable {
// Delete from multiple tables should consider table ident list.
fs := e.SelectExec.Fields()
for _, f := range fs {
if len(f.TableAsName.L) > 0 {
tblNames[f.TableAsName.L] = f.TableName.Name.L
} else {
tblNames[f.TableName.Name.L] = f.TableName.Name.L
if !plan.UseNewPlanner {
fs := e.SelectExec.Fields()
for _, f := range fs {
if len(f.TableAsName.L) > 0 {
tblNames[f.TableAsName.L] = f.TableName.Name.L
} else {
tblNames[f.TableName.Name.L] = f.TableName.Name.L
}
}
} else {
schema := e.SelectExec.Schema()
for _, s := range schema {
tblNames[s.TblName.L] = s.TblName.L
}
}
for _, t := range e.Tables {
// Consider DBName.
_, ok := tblNames[t.Name.L]
if !ok {
return nil, errors.Errorf("Unknown table '%s' in MULTI DELETE", t.Name.O)
if len(tblNames) != 0 {
for _, t := range e.Tables {
// Consider DBName.
_, ok := tblNames[t.Name.L]
if !ok {
return nil, errors.Errorf("Unknown table '%s' in MULTI DELETE", t.Name.O)
}
tblMap[t.TableInfo.ID] = append(tblMap[t.TableInfo.ID], t.Name.L)
}
} else { // all columns have been pruned
for _, t := range e.Tables {
tblMap[t.TableInfo.ID] = append(tblMap[t.TableInfo.ID], t.Name.L)
}
tblMap[t.TableInfo.ID] = append(tblMap[t.TableInfo.ID], t.Name.L)
}
}

Expand Down Expand Up @@ -783,7 +799,15 @@ func (e *InsertExec) onDuplicateUpdate(row []types.Datum, h int64, cols map[int]
}
newData[i] = val
}
if err = updateRecord(e.ctx, h, data, newData, cols, e.Table, 0, true); err != nil {
assignFlag := make([]bool, len(e.Table.Cols()))
for i, asgn := range cols {
if asgn != nil {
assignFlag[i] = true
} else {
assignFlag[i] = false
}
}
if err = updateRecord(e.ctx, h, data, newData, assignFlag, e.Table, 0, true); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
21 changes: 20 additions & 1 deletion executor/new_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func (b *executorBuilder) toPBExpr(conditions []expression.Expression, tbl *mode

func (b *executorBuilder) buildSelection(v *plan.Selection) Executor {
child := v.GetChildByIndex(0)
oldConditions := v.Conditions
var src Executor
switch x := child.(type) {
case *plan.PhysicalTableScan:
Expand All @@ -220,15 +221,18 @@ func (b *executorBuilder) buildSelection(v *plan.Selection) Executor {
}

if len(v.Conditions) == 0 {
v.Conditions = oldConditions
return src
}

return &SelectionExec{
exec := &SelectionExec{
Src: src,
Condition: expression.ComposeCNFCondition(v.Conditions),
schema: v.GetSchema(),
ctx: b.ctx,
}
copy(v.Conditions, oldConditions)
return exec
}

func (b *executorBuilder) buildProjection(v *plan.Projection) Executor {
Expand Down Expand Up @@ -411,8 +415,23 @@ func (b *executorBuilder) buildNewUnion(v *plan.NewUnion) Executor {
return e
}

func (b *executorBuilder) buildNewUpdate(v *plan.NewUpdate) Executor {
selExec := b.build(v.SelectPlan)
return &NewUpdateExec{ctx: b.ctx, SelectExec: selExec, OrderedList: v.OrderedList}
}

func (b *executorBuilder) buildDummyScan(v *plan.PhysicalDummyScan) Executor {
return &DummyScanExec{
schema: v.GetSchema(),
}
}

func (b *executorBuilder) buildNewDelete(v *plan.NewDelete) Executor {
selExec := b.build(v.SelectPlan)
return &DeleteExec{
ctx: b.ctx,
SelectExec: selExec,
Tables: v.Tables,
IsMultiTable: v.IsMultiTable,
}
}
3 changes: 1 addition & 2 deletions executor/new_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,7 @@ func (e *HashJoinExec) constructMatchedRows(bigRow *Row) (matchedRows []*Row, er

func (e *HashJoinExec) fillNullRow(bigRow *Row) (returnRow *Row) {
smallRow := &Row{
RowKeys: make([]*RowKeyEntry, len(e.smallExec.Schema())),
Data: make([]types.Datum, len(e.smallExec.Schema())),
Data: make([]types.Datum, len(e.smallExec.Schema())),
}

for _, data := range smallRow.Data {
Expand Down
Loading

0 comments on commit 6e87814

Please sign in to comment.