Skip to content

Commit

Permalink
executor: optimize delete statement. (pingcap#2421)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jan 10, 2017
1 parent 468af82 commit 7c66138
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 37 deletions.
82 changes: 52 additions & 30 deletions executor/executor_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,53 +166,58 @@ func (e *DeleteExec) Next() (*Row, error) {
defer func() {
e.finished = true
}()
if e.IsMultiTable && len(e.Tables) == 0 {
return &Row{}, nil

if e.IsMultiTable {
return nil, e.deleteMultiTables()
}
return nil, e.deleteSingleTable()
}

func (e *DeleteExec) deleteMultiTables() error {
if len(e.Tables) == 0 {
return nil
}

// Table ID may not be unique for deleting multiple tables, for statements like
// `delete from t as t1, t as t2`, the same table has two alias, we have to identify a table
// by its alias instead of ID, so the table map value is an array which contains table aliases.
tblMap := make(map[int64][]string, len(e.Tables))
// Get table alias map.
if e.IsMultiTable {
// Delete from multiple tables should consider table ident list.
for _, t := range e.Tables {
tblMap[t.TableInfo.ID] = append(tblMap[t.TableInfo.ID], t.Name.L)
}
for _, t := range e.Tables {
tblMap[t.TableInfo.ID] = append(tblMap[t.TableInfo.ID], t.Name.L)
}

// Map for unique (Table, handle) pair.
rowKeyMap := make(map[table.Table]map[int64]struct{})
// Map for unique (Table, Row) pair.
tblRowMap := make(map[table.Table]map[int64][]types.Datum)
for {
row, err := e.SelectExec.Next()
joinedRow, err := e.SelectExec.Next()
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
if row == nil {
if joinedRow == nil {
break
}

for _, entry := range row.RowKeys {
if e.IsMultiTable && !isMatchTableName(entry, tblMap) {
for _, entry := range joinedRow.RowKeys {
if !isMatchTableName(entry, tblMap) {
continue
}
if rowKeyMap[entry.Tbl] == nil {
rowKeyMap[entry.Tbl] = make(map[int64]struct{})
if tblRowMap[entry.Tbl] == nil {
tblRowMap[entry.Tbl] = make(map[int64][]types.Datum)
}
rowKeyMap[entry.Tbl][entry.Handle] = struct{}{}
offset := getTableOffset(e.SelectExec.Schema(), entry)
data := joinedRow.Data[offset : offset+len(entry.Tbl.WritableCols())]
tblRowMap[entry.Tbl][entry.Handle] = data
}
}
for t, handleMap := range rowKeyMap {
for handle := range handleMap {
data, err := t.Row(e.ctx, handle)
for t, rowMap := range tblRowMap {
for handle, data := range rowMap {
err := e.removeRow(e.ctx, t, handle, data)
if err != nil {
return nil, errors.Trace(err)
}
err = e.removeRow(e.ctx, t, handle, data)
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
}
}
return nil, nil
return nil
}

func isMatchTableName(entry *RowKeyEntry, tblMap map[int64][]string) bool {
Expand All @@ -237,6 +242,24 @@ func isMatchTableName(entry *RowKeyEntry, tblMap map[int64][]string) bool {
return false
}

func (e *DeleteExec) deleteSingleTable() error {
for {
row, err := e.SelectExec.Next()
if err != nil {
return errors.Trace(err)
}
if row == nil {
break
}
rowKey := row.RowKeys[0]
err = e.removeRow(e.ctx, rowKey.Tbl, rowKey.Handle, row.Data)
if err != nil {
return errors.Trace(err)
}
}
return nil
}

func (e *DeleteExec) removeRow(ctx context.Context, t table.Table, h int64, data []types.Datum) error {
err := t.RemoveRecord(ctx, h, data)
if err != nil {
Expand Down Expand Up @@ -1104,7 +1127,7 @@ func (e *UpdateExec) Next() (*Row, error) {
if e.updatedRowKeys[tbl] == nil {
e.updatedRowKeys[tbl] = make(map[int64]struct{})
}
offset := e.getTableOffset(*entry)
offset := getTableOffset(e.SelectExec.Schema(), entry)
handle := entry.Handle
oldData := row.Data[offset : offset+len(tbl.WritableCols())]
newTableData := newData[offset : offset+len(tbl.WritableCols())]
Expand Down Expand Up @@ -1167,15 +1190,14 @@ func (e *UpdateExec) fetchRows() error {
}
}

func (e *UpdateExec) getTableOffset(entry RowKeyEntry) int {
func getTableOffset(schema expression.Schema, entry *RowKeyEntry) int {
t := entry.Tbl
var tblName string
if entry.TableAsName == nil || len(entry.TableAsName.L) == 0 {
tblName = t.Meta().Name.L
} else {
tblName = entry.TableAsName.L
}
schema := e.SelectExec.Schema()
for i := 0; i < schema.Len(); i++ {
s := schema.Columns[i]
if s.TblName.L == tblName {
Expand Down
2 changes: 1 addition & 1 deletion executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (s *testSuite) TestExplain(c *C) {
"ranges": "[[1,1]]",
"desc": false,
"out of order": true,
"double read": false,
"double read": true,
"push down info": {
"limit": 0,
"access conditions": [
Expand Down
5 changes: 5 additions & 0 deletions plan/column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,8 @@ func (p *Apply) PruneColumns(parentUsedCols []*expression.Column) {
func (p *Update) PruneColumns(parentUsedCols []*expression.Column) {
p.baseLogicalPlan.PruneColumns(p.GetChildByIndex(0).GetSchema().Columns)
}

// PruneColumns implements LogicalPlan interface.
func (p *Delete) PruneColumns(parentUsedCols []*expression.Column) {
p.baseLogicalPlan.PruneColumns(p.GetChildByIndex(0).GetSchema().Columns)
}
2 changes: 1 addition & 1 deletion plan/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ func (nr *nameResolver) handleTableName(tn *ast.TableName) {
}, len(tn.TableInfo.Columns))
status := nr.Ctx.GetSessionVars().StmtCtx
for i, v := range tn.TableInfo.Columns {
if status.InUpdateStmt {
if status.InUpdateOrDeleteStmt {
switch v.State {
case model.StatePublic, model.StateWriteOnly, model.StateWriteReorganization:
default:
Expand Down
6 changes: 3 additions & 3 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ func (s *SessionVars) GetTiDBSystemVar(name string) (string, error) {
// It should be reset before executing a statement.
type StatementContext struct {
/* Variables that are set before execution */
InUpdateStmt bool
IgnoreTruncate bool
TruncateAsWarning bool
InUpdateOrDeleteStmt bool
IgnoreTruncate bool
TruncateAsWarning bool

/* Variables that changes during execution. */
mu struct {
Expand Down
4 changes: 2 additions & 2 deletions tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ func resetStmtCtx(ctx context.Context, s ast.StmtNode) {
case *ast.UpdateStmt, *ast.InsertStmt, *ast.DeleteStmt:
sc.IgnoreTruncate = false
sc.TruncateAsWarning = !sessVars.StrictSQLMode
if _, ok := s.(*ast.UpdateStmt); ok {
sc.InUpdateStmt = true
if _, ok := s.(*ast.InsertStmt); !ok {
sc.InUpdateOrDeleteStmt = true
}
default:
sc.IgnoreTruncate = true
Expand Down

0 comments on commit 7c66138

Please sign in to comment.