Skip to content

Commit

Permalink
*: add Table.Seek method and use handle for RowKeyEntry.
Browse files Browse the repository at this point in the history
Avoid calling `table/tables` methods in `executor`, makes `Table` interface independent.
  • Loading branch information
coocood committed Feb 19, 2016
1 parent c818e51 commit b47cf73
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 142 deletions.
2 changes: 1 addition & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, ha
return nil
}

value, _, err := tables.GetColDefaultValue(nil, columnInfo)
value, _, err := table.GetColDefaultValue(nil, columnInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ func (a *recordsetAdapter) Next() (*oplan.Row, error) {
}
for _, v := range row.RowKeys {
oldRowKey := &oplan.RowKeyEntry{
Key: v.Key,
Tbl: v.Tbl,
Handle: v.Handle,
Tbl: v.Tbl,
}
oRow.RowKeys = append(oRow.RowKeys, oldRowKey)
}
Expand Down
50 changes: 10 additions & 40 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/pingcap/tidb/sessionctx/db"
"github.com/pingcap/tidb/sessionctx/forupdate"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/distinct"
Expand Down Expand Up @@ -82,7 +81,7 @@ type RowKeyEntry struct {
// The table which this row come from.
Tbl table.Table
// Row key.
Key string
Handle int64
}

// Executor executes a query.
Expand Down Expand Up @@ -223,12 +222,8 @@ func (e *TableScanExec) Next() (*Row, error) {
e.cursor++
continue
}
rowKey, err := e.seek()
if err != nil || rowKey == nil {
return nil, errors.Trace(err)
}
handle, err := tables.DecodeRecordKeyHandle(rowKey)
if err != nil {
handle, found, err := e.t.Seek(e.ctx, e.seekHandle)
if err != nil || !found {
return nil, errors.Trace(err)
}
if handle > ran.HighVal {
Expand All @@ -242,7 +237,7 @@ func (e *TableScanExec) Next() (*Row, error) {
continue
}
}
row, err := e.getRow(handle, rowKey)
row, err := e.getRow(handle)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -251,27 +246,6 @@ func (e *TableScanExec) Next() (*Row, error) {
}
}

func (e *TableScanExec) seek() (kv.Key, error) {
seekKey := tables.EncodeRecordKey(e.t.Meta().ID, e.seekHandle, 0)
txn, err := e.ctx.GetTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
if e.iter != nil {
e.iter.Close()
}
e.iter, err = txn.Seek(seekKey)
if err != nil {
return nil, errors.Trace(err)
}
if !e.iter.Valid() || !e.iter.Key().HasPrefix(e.t.RecordPrefix()) {
// No more records in the table, skip to the end.
e.cursor = len(e.ranges)
return nil, nil
}
return e.iter.Key(), nil
}

// seekRange increments the range cursor to the range
// with high value greater or equal to handle.
func (e *TableScanExec) seekRange(handle int64) (inRange bool) {
Expand All @@ -291,7 +265,7 @@ func (e *TableScanExec) seekRange(handle int64) (inRange bool) {
}
}

func (e *TableScanExec) getRow(handle int64, rowKey kv.Key) (*Row, error) {
func (e *TableScanExec) getRow(handle int64) (*Row, error) {
row := &Row{}
var err error
row.Data, err = e.t.Row(e.ctx, handle)
Expand All @@ -305,8 +279,8 @@ func (e *TableScanExec) getRow(handle int64, rowKey kv.Key) (*Row, error) {

// Put rowKey to the tail of record row
rke := &RowKeyEntry{
Tbl: e.t,
Key: string(rowKey),
Tbl: e.t,
Handle: handle,
}
row.RowKeys = append(row.RowKeys, rke)
return row, nil
Expand Down Expand Up @@ -462,8 +436,8 @@ func (e *IndexRangeExec) lookupRow(h int64) (*Row, error) {
return nil, errors.Trace(err)
}
rowKey := &RowKeyEntry{
Tbl: e.scan.tbl,
Key: string(e.scan.tbl.RecordKey(h, nil)),
Tbl: e.scan.tbl,
Handle: h,
}
row.RowKeys = append(row.RowKeys, rowKey)
return row, nil
Expand Down Expand Up @@ -811,12 +785,8 @@ func (e *SelectLockExec) Next() (*Row, error) {
}
if len(row.RowKeys) != 0 && e.Lock == ast.SelectLockForUpdate {
forupdate.SetForUpdate(e.ctx)
txn, err := e.ctx.GetTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
for _, k := range row.RowKeys {
err = txn.LockKeys([]byte(k.Key))
err = k.Tbl.LockRow(e.ctx, k.Handle, true)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
56 changes: 27 additions & 29 deletions executor/executor_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/types"
)
Expand All @@ -41,7 +40,7 @@ type UpdateExec struct {
SelectExec Executor
OrderedList []*ast.Assignment

updatedRowKeys map[string]bool
updatedRowKeys map[table.Table]map[int64]bool
ctx context.Context

rows []*Row // The rows fetched from TableExec.
Expand All @@ -68,34 +67,32 @@ func (e *UpdateExec) Next() (*Row, error) {
return nil, nil
}
if e.updatedRowKeys == nil {
e.updatedRowKeys = map[string]bool{}
e.updatedRowKeys = make(map[table.Table]map[int64]bool)
}
row := e.rows[e.cursor]
newData := e.newRowsData[e.cursor]
for _, entry := range row.RowKeys {
tbl := entry.Tbl
if e.updatedRowKeys[tbl] == nil {
e.updatedRowKeys[tbl] = make(map[int64]bool)
}
offset := e.getTableOffset(tbl)
k := entry.Key
handle := entry.Handle
oldData := row.Data[offset : offset+len(tbl.Cols())]
newTableData := newData[offset : offset+len(tbl.Cols())]

_, ok := e.updatedRowKeys[k]
_, ok := e.updatedRowKeys[tbl][handle]
if ok {
// Each matching row is updated once, even if it matches the conditions multiple times.
continue
}

// Update row
handle, err1 := tables.DecodeRecordKeyHandle(kv.Key(k))
if err1 != nil {
return nil, errors.Trace(err1)
}

err1 = updateRecord(e.ctx, handle, oldData, newTableData, columns, tbl, offset, false)
err1 := updateRecord(e.ctx, handle, oldData, newTableData, columns, tbl, offset, false)
if err1 != nil {
return nil, errors.Trace(err1)
}
e.updatedRowKeys[k] = true
e.updatedRowKeys[tbl][handle] = true
}
e.cursor++
return &Row{}, nil
Expand Down Expand Up @@ -151,7 +148,7 @@ func (e *UpdateExec) getTableOffset(t table.Table) int {
}

func updateRecord(ctx context.Context, h int64, oldData, newData []interface{}, updateColumns map[int]*ast.Assignment, t table.Table, offset int, onDuplicateUpdate bool) error {
if err := t.LockRow(ctx, h); err != nil {
if err := t.LockRow(ctx, h, false); err != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -279,7 +276,7 @@ func (e *DeleteExec) Next() (*Row, error) {
tblIDMap := make(map[int64]bool, len(e.Tables))
// Get table alias map.
tblNames := make(map[string]string)
rowKeyMap := make(map[string]table.Table)
rowKeyMap := make(map[table.Table]map[int64]bool)
if e.IsMultiTable {
// Delete from multiple tables should consider table ident list.
fs := e.SelectExec.Fields()
Expand Down Expand Up @@ -315,21 +312,22 @@ func (e *DeleteExec) Next() (*Row, error) {
continue
}
}
rowKeyMap[entry.Key] = entry.Tbl
if rowKeyMap[entry.Tbl] == nil {
rowKeyMap[entry.Tbl] = make(map[int64]bool)
}
rowKeyMap[entry.Tbl][entry.Handle] = true
}
}
for k, t := range rowKeyMap {
handle, err := tables.DecodeRecordKeyHandle(kv.Key(k))
if err != nil {
return nil, errors.Trace(err)
}
data, err := t.Row(e.ctx, handle)
if err != nil {
return nil, errors.Trace(err)
}
err = e.removeRow(e.ctx, t, handle, data)
if err != nil {
return nil, errors.Trace(err)
for t, handleMap := range rowKeyMap {
for handle := range handleMap {
data, err := t.Row(e.ctx, handle)
if err != nil {
return nil, errors.Trace(err)
}
err = e.removeRow(e.ctx, t, handle, data)
if err != nil {
return nil, errors.Trace(err)
}
}
}
return nil, nil
Expand Down Expand Up @@ -530,7 +528,7 @@ func (e *InsertValues) checkValueCount(insertValueCount, valueCount, num int, co
func (e *InsertValues) getColumnDefaultValues(cols []*column.Col) (map[string]interface{}, error) {
defaultValMap := map[string]interface{}{}
for _, col := range cols {
if value, ok, err := tables.GetColDefaultValue(e.ctx, &col.ColumnInfo); ok {
if value, ok, err := table.GetColDefaultValue(e.ctx, &col.ColumnInfo); ok {
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -662,7 +660,7 @@ func (e *InsertValues) initDefaultValues(row []interface{}, marked map[int]struc
}
} else {
var value interface{}
value, _, err := tables.GetColDefaultValue(e.ctx, &c.ColumnInfo)
value, _, err := table.GetColDefaultValue(e.ctx, &c.ColumnInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion plan/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,5 @@ type RowKeyEntry struct {
// The table which this row come from.
Tbl table.Table
// Row handle.
Key string
Handle int64
}
8 changes: 4 additions & 4 deletions plan/plans/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ func (r *TableDefaultPlan) Next(ctx context.Context) (row *plan.Row, err error)
}
// Put rowKey to the tail of record row
rke := &plan.RowKeyEntry{
Tbl: r.T,
Key: string(rowKey),
Tbl: r.T,
Handle: handle,
}
row.RowKeys = append(row.RowKeys, rke)

Expand Down Expand Up @@ -444,8 +444,8 @@ func (r *TableDefaultPlan) rangeNext(ctx context.Context) (*plan.Row, error) {
}
// Put rowKey to the tail of record row
rke := &plan.RowKeyEntry{
Tbl: r.T,
Key: string(rowKey),
Tbl: r.T,
Handle: handle,
}
row.RowKeys = append(row.RowKeys, rke)
return row, nil
Expand Down
4 changes: 2 additions & 2 deletions plan/plans/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,8 @@ func (r *indexPlan) lookupRow(ctx context.Context, h int64) (*plan.Row, error) {
return nil, errors.Trace(err)
}
rowKey := &plan.RowKeyEntry{
Tbl: r.src,
Key: string(r.src.RecordKey(h, nil)),
Tbl: r.src,
Handle: h,
}
row.RowKeys = append(row.RowKeys, rowKey)
return row, nil
Expand Down
7 changes: 1 addition & 6 deletions plan/plans/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/field"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/coldef"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/sessionctx/forupdate"
Expand Down Expand Up @@ -65,12 +64,8 @@ func (r *SelectLockPlan) Next(ctx context.Context) (row *plan.Row, err error) {
}
if len(row.RowKeys) != 0 && r.Lock == coldef.SelectLockForUpdate {
forupdate.SetForUpdate(ctx)
txn, err := ctx.GetTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
for _, k := range row.RowKeys {
err = txn.LockKeys(kv.Key(k.Key))
err = k.Tbl.LockRow(ctx, k.Handle, true)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 3 additions & 9 deletions stmt/stmts/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import (
"github.com/ngaut/log"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/rset"
"github.com/pingcap/tidb/rset/rsets"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/stmt"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/format"
)

Expand Down Expand Up @@ -154,7 +152,7 @@ func (s *DeleteStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) {
tblIDMap[tbl.Meta().ID] = true
}
}
rowKeyMap := make(map[string]table.Table)
rowKeyMap := make(map[int64]table.Table)
for {
row, err1 := p.Next(ctx)
if err1 != nil {
Expand All @@ -171,15 +169,11 @@ func (s *DeleteStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) {
continue
}
}
rowKeyMap[entry.Key] = entry.Tbl
rowKeyMap[entry.Handle] = entry.Tbl
}
}

for k, t := range rowKeyMap {
handle, err := tables.DecodeRecordKeyHandle(kv.Key(k))
if err != nil {
return nil, errors.Trace(err)
}
for handle, t := range rowKeyMap {
data, err := t.Row(ctx, handle)
if err != nil {
return nil, errors.Trace(err)
Expand Down
5 changes: 2 additions & 3 deletions stmt/stmts/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/stmt"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/format"
)
Expand Down Expand Up @@ -137,7 +136,7 @@ func (s *InsertValues) getColumns(tableCols []*column.Col) ([]*column.Col, error
func (s *InsertValues) getColumnDefaultValues(ctx context.Context, cols []*column.Col) (map[interface{}]interface{}, error) {
defaultValMap := map[interface{}]interface{}{}
for _, col := range cols {
if value, ok, err := tables.GetColDefaultValue(ctx, &col.ColumnInfo); ok {
if value, ok, err := table.GetColDefaultValue(ctx, &col.ColumnInfo); ok {
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -404,7 +403,7 @@ func (s *InsertValues) initDefaultValues(ctx context.Context, t table.Table, row
}
} else {
var value interface{}
value, _, err := tables.GetColDefaultValue(ctx, &c.ColumnInfo)
value, _, err := table.GetColDefaultValue(ctx, &c.ColumnInfo)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit b47cf73

Please sign in to comment.