Skip to content

Commit

Permalink
*: use less memory when many rows inserted in one transaction (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
jackysp authored Feb 13, 2019
1 parent 9bdba3a commit ba68605
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 31 deletions.
61 changes: 33 additions & 28 deletions executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,13 @@ type DirtyDB struct {
tables map[int64]*DirtyTable
}

// AddRow adds a row to the DirtyDB.
func (udb *DirtyDB) AddRow(tid, handle int64, row []types.Datum) {
dt := udb.GetDirtyTable(tid)
for i := range row {
if row[i].Kind() == types.KindString {
row[i].SetBytes(row[i].GetBytes())
}
}
dt.addedRows[handle] = row
}

// DeleteRow deletes a row from the DirtyDB.
func (udb *DirtyDB) DeleteRow(tid int64, handle int64) {
dt := udb.GetDirtyTable(tid)
delete(dt.addedRows, handle)
dt.deletedRows[handle] = struct{}{}
}

// TruncateTable truncates a table.
func (udb *DirtyDB) TruncateTable(tid int64) {
dt := udb.GetDirtyTable(tid)
dt.addedRows = make(map[int64][]types.Datum)
dt.truncated = true
}

// GetDirtyTable gets the DirtyTable by id from the DirtyDB.
func (udb *DirtyDB) GetDirtyTable(tid int64) *DirtyTable {
dt, ok := udb.tables[tid]
if !ok {
dt = &DirtyTable{
addedRows: make(map[int64][]types.Datum),
tid: tid,
addedRows: make(map[int64]struct{}),
deletedRows: make(map[int64]struct{}),
}
udb.tables[tid] = dt
Expand All @@ -73,13 +49,31 @@ func (udb *DirtyDB) GetDirtyTable(tid int64) *DirtyTable {

// DirtyTable stores uncommitted write operation for a transaction.
type DirtyTable struct {
tid int64
// addedRows ...
// the key is handle.
addedRows map[int64][]types.Datum
addedRows map[int64]struct{}
deletedRows map[int64]struct{}
truncated bool
}

// AddRow adds a row to the DirtyDB.
func (dt *DirtyTable) AddRow(handle int64, row []types.Datum) {
dt.addedRows[handle] = struct{}{}
}

// DeleteRow deletes a row from the DirtyDB.
func (dt *DirtyTable) DeleteRow(handle int64) {
delete(dt.addedRows, handle)
dt.deletedRows[handle] = struct{}{}
}

// TruncateTable truncates a table.
func (dt *DirtyTable) TruncateTable() {
dt.addedRows = make(map[int64]struct{})
dt.truncated = true
}

// GetDirtyDB returns the DirtyDB bind to the context.
func GetDirtyDB(ctx sessionctx.Context) *DirtyDB {
var udb *DirtyDB
Expand Down Expand Up @@ -276,8 +270,19 @@ func (us *UnionScanExec) compare(a, b []types.Datum) (int, error) {
func (us *UnionScanExec) buildAndSortAddedRows() error {
us.addedRows = make([][]types.Datum, 0, len(us.dirty.addedRows))
mutableRow := chunk.MutRowFromTypes(us.retTypes())
for h, data := range us.dirty.addedRows {
t, found := GetInfoSchema(us.ctx).TableByID(us.dirty.tid)
if !found {
// t is got from a snapshot InfoSchema, so it should be found, this branch should not happen.
return errors.Errorf("table not found (tid: %d, schema version: %d)",
us.dirty.tid, GetInfoSchema(us.ctx).SchemaMetaVersion())
}
cols := t.WritableCols()
for h := range us.dirty.addedRows {
newData := make([]types.Datum, 0, us.schema.Len())
data, err := t.RowWithCols(us.ctx, h, cols)
if err != nil {
return err
}
for _, col := range us.columns {
if col.ID == model.ExtraHandleID {
newData = append(newData, types.NewIntDatum(h))
Expand Down
7 changes: 4 additions & 3 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,14 @@ func mergeToMutation(m1, m2 *binlog.TableMutation) {
}

func mergeToDirtyDB(dirtyDB *executor.DirtyDB, op dirtyTableOperation) {
dt := dirtyDB.GetDirtyTable(op.tid)
switch op.kind {
case table.DirtyTableAddRow:
dirtyDB.AddRow(op.tid, op.handle, op.row)
dt.AddRow(op.handle, op.row)
case table.DirtyTableDeleteRow:
dirtyDB.DeleteRow(op.tid, op.handle)
dt.DeleteRow(op.handle)
case table.DirtyTableTruncate:
dirtyDB.TruncateTable(op.tid)
dt.TruncateTable()
}
}

Expand Down

0 comments on commit ba68605

Please sign in to comment.