Skip to content

Commit

Permalink
ddl: Speed up adding index phase (pingcap#2309)
Browse files Browse the repository at this point in the history
* ddl: speed up adding index phase
  • Loading branch information
zimulala authored Dec 23, 2016
1 parent e84c442 commit 7414bed
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 104 deletions.
11 changes: 10 additions & 1 deletion ddl/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
colMeta := &columnMeta{
colID: columnInfo.ID,
oldColMap: make(map[int64]*types.FieldType)}
handles := make([]int64, 0, defaultBatchCnt)
// Get column default value.
var err error
if columnInfo.DefaultValue != nil {
Expand All @@ -317,7 +318,15 @@ func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI

for {
startTime := time.Now()
handles, err := d.getSnapshotRows(t, version, seekHandle)
handles = handles[:0]
err = d.iterateSnapshotRows(t, version, seekHandle,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
handles = append(handles, h)
if len(handles) == defaultBatchCnt {
return false, nil
}
return true, nil
})
if err != nil {
return errors.Trace(err)
} else if len(handles) == 0 {
Expand Down
195 changes: 92 additions & 103 deletions ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package ddl

import (
"math"
"time"

"github.com/juju/errors"
Expand Down Expand Up @@ -325,54 +324,39 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
return errors.Trace(err)
}

func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, handles []int64, indexInfo *model.IndexInfo) (
[]*indexRecord, error) {
// Through handles access to get all row keys.
handlesLen := len(handles)
rowKeys := make([]kv.Key, 0, handlesLen)
for _, h := range handles {
rowKey := tablecodec.EncodeRecordKey(t.RecordPrefix(), h)
rowKeys = append(rowKeys, rowKey)
}
func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, batchOpInfo *indexBatchOpInfo, seekHandle int64) error {
cols := t.Cols()
idxInfo := batchOpInfo.tblIndex.Meta()

// Get corresponding raw values for rowKeys.
ver := kv.Version{Ver: txn.StartTS()}
snap, err := d.store.GetSnapshot(ver)
if err != nil {
return nil, errors.Trace(err)
}
pairMap, err := snap.BatchGet(rowKeys)
err := d.iterateSnapshotRows(t, txn.StartTS(), seekHandle,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
rowMap, err := tablecodec.DecodeRow(rawRecord, batchOpInfo.colMap)
if err != nil {
return false, errors.Trace(err)
}
idxVal := make([]types.Datum, 0, len(idxInfo.Columns))
for _, v := range idxInfo.Columns {
col := cols[v.Offset]
idxVal = append(idxVal, rowMap[col.ID])
}

indexRecord := &indexRecord{handle: h, key: rowKey, vals: idxVal}
batchOpInfo.idxRecords = append(batchOpInfo.idxRecords, indexRecord)
if len(batchOpInfo.idxRecords) == defaultSmallBatchCnt {
return false, nil
}
return true, nil
})
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
} else if len(batchOpInfo.idxRecords) == 0 {
return nil
}

// Get corresponding values for pairMap.
cols := t.Cols()
colMap := make(map[int64]*types.FieldType)
for _, v := range indexInfo.Columns {
col := cols[v.Offset]
colMap[col.ID] = &col.FieldType
}
idxRecords := make([]*indexRecord, 0, handlesLen)
for i, rowKey := range rowKeys {
rawVal, ok := pairMap[string(rowKey)]
if !ok {
// Row doesn't exist, skip it.
continue
}
row, err := tablecodec.DecodeRow(rawVal, colMap)
if err != nil {
return nil, errors.Trace(err)
}
rowVal := make([]types.Datum, 0, len(indexInfo.Columns))
for _, v := range indexInfo.Columns {
col := cols[v.Offset]
rowVal = append(rowVal, row[col.ID])
}
idxRecord := &indexRecord{handle: handles[i], key: rowKey, vals: rowVal}
idxRecords = append(idxRecords, idxRecord)
}
return idxRecords, nil
count := len(batchOpInfo.idxRecords)
batchOpInfo.addedCount += int64(count)
batchOpInfo.handle = batchOpInfo.idxRecords[count-1].handle
return nil
}

const defaultBatchCnt = 1024
Expand All @@ -385,50 +369,71 @@ const defaultSmallBatchCnt = 128
// 4. If not deleted, check whether index has existed, if existed, skip to next row.
// 5. If index doesn't exist, create the index and then continue to handle next row.
func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo *reorgInfo, job *model.Job) error {
seekHandle := reorgInfo.Handle
version := reorgInfo.SnapshotVer
kvIdx := tables.NewIndex(t.Meta(), indexInfo)
count := job.GetRowCount()
cols := t.Cols()
colMap := make(map[int64]*types.FieldType)
for _, v := range indexInfo.Columns {
col := cols[v.Offset]
colMap[col.ID] = &col.FieldType
}
batchCnt := defaultSmallBatchCnt
batchOpInfo := &indexBatchOpInfo{
tblIndex: tables.NewIndex(t.Meta(), indexInfo),
addedCount: job.GetRowCount(),
colMap: colMap,
handle: reorgInfo.Handle,
idxRecords: make([]*indexRecord, 0, batchCnt),
}

seekHandle := reorgInfo.Handle
for {
startTime := time.Now()
handles, err := d.getSnapshotRows(t, version, seekHandle)
if err != nil {
return errors.Trace(err)
} else if len(handles) == 0 {
return nil
}

count += int64(len(handles))
seekHandle = handles[len(handles)-1] + 1
err = d.backfillTableIndex(t, kvIdx, handles, reorgInfo)
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
err1 := d.isReorgRunnable(txn, ddlJobFlag)
if err1 != nil {
return errors.Trace(err1)
}
batchOpInfo.idxRecords = batchOpInfo.idxRecords[:0]
err1 = d.backfillIndexInTxn(t, txn, batchOpInfo, seekHandle)
if err1 != nil {
return errors.Trace(err1)
}
// Update the reorg handle that has been processed.
return errors.Trace(reorgInfo.UpdateHandle(txn, batchOpInfo.handle))
})
sub := time.Since(startTime).Seconds()
if err != nil {
log.Warnf("[ddl] added index for %v rows failed, take time %v", count, sub)
log.Warnf("[ddl] added index for %v rows failed, take time %v", batchOpInfo.addedCount, sub)
return errors.Trace(err)
}

job.SetRowCount(count)
job.SetRowCount(batchOpInfo.addedCount)
batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub)
log.Infof("[ddl] added index for %v rows, take time %v", count, sub)
log.Infof("[ddl] added index for %v rows, take time %v", batchOpInfo.addedCount, sub)

if len(batchOpInfo.idxRecords) < batchCnt {
return nil
}
seekHandle = batchOpInfo.handle + 1
}
}

func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) ([]int64, error) {
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)

func (d *ddl) iterateSnapshotRows(t table.Table, version uint64, seekHandle int64, fn recordIterFunc) error {
ver := kv.Version{Ver: version}
snap, err := d.store.GetSnapshot(ver)
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}

firstKey := t.RecordKey(seekHandle)
it, err := snap.Seek(firstKey)
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
defer it.Close()

handles := make([]int64, 0, defaultBatchCnt)
for it.Valid() {
if !it.Key().HasPrefix(t.RecordPrefix()) {
break
Expand All @@ -437,24 +442,32 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) (
var handle int64
handle, err = tablecodec.DecodeRowKey(it.Key())
if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
rk := t.RecordKey(handle)

handles = append(handles, handle)
if len(handles) == defaultBatchCnt {
break
more, err := fn(handle, rk, it.Value())
if !more || err != nil {
return errors.Trace(err)
}

rk := t.RecordKey(handle)
err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
if terror.ErrorEqual(err, kv.ErrNotExist) {
break
} else if err != nil {
return nil, errors.Trace(err)
return errors.Trace(err)
}
}

return handles, nil
return nil
}

type indexBatchOpInfo struct {
tblIndex table.Index
addedCount int64
handle int64 // This is the last reorg handle that has been processed.
colMap map[int64]*types.FieldType
idxRecords []*indexRecord
}

type indexRecord struct {
Expand All @@ -465,53 +478,29 @@ type indexRecord struct {

// backfillIndexInTxn deals with a part of backfilling index data in a Transaction.
// This part of the index data rows is defaultSmallBatchCnt.
func (d *ddl) backfillIndexInTxn(t table.Table, kvIdx table.Index, handles []int64, txn kv.Transaction) (int64, error) {
idxRecords, err := d.fetchRowColVals(txn, t, handles, kvIdx.Meta())
func (d *ddl) backfillIndexInTxn(t table.Table, txn kv.Transaction, batchOpInfo *indexBatchOpInfo, seekHandle int64) error {
err := d.fetchRowColVals(txn, t, batchOpInfo, seekHandle)
if err != nil {
return 0, errors.Trace(err)
return errors.Trace(err)
}

for _, idxRecord := range idxRecords {
for _, idxRecord := range batchOpInfo.idxRecords {
log.Debug("[ddl] backfill index...", idxRecord.handle)
err = txn.LockKeys(idxRecord.key)
if err != nil {
return 0, errors.Trace(err)
return errors.Trace(err)
}

// Create the index.
handle, err := kvIdx.Create(txn, idxRecord.vals, idxRecord.handle)
handle, err := batchOpInfo.tblIndex.Create(txn, idxRecord.vals, idxRecord.handle)
if err != nil {
if terror.ErrorEqual(err, kv.ErrKeyExists) && idxRecord.handle == handle {
// Index already exists, skip it.
continue
}
return 0, errors.Trace(err)
}
}
return idxRecords[len(idxRecords)-1].handle, nil
}

func (d *ddl) backfillTableIndex(t table.Table, kvIdx table.Index, handles []int64, reorgInfo *reorgInfo) error {
for len(handles) > 0 {
endIdx := int(math.Min(float64(defaultSmallBatchCnt), float64(len(handles))))
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
if err1 := d.isReorgRunnable(txn, ddlJobFlag); err1 != nil {
return errors.Trace(err1)
}
nextHandle, err1 := d.backfillIndexInTxn(t, kvIdx, handles[:endIdx], txn)
if err1 != nil {
return errors.Trace(err1)
}
// Update reorg next handle.
return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle))
})
if err != nil {
return errors.Trace(err)
}

handles = handles[endIdx:]
}

return nil
}

Expand Down

0 comments on commit 7414bed

Please sign in to comment.