Skip to content

Commit

Permalink
table/tables: fix IterRecords. (pingcap#1356)
Browse files Browse the repository at this point in the history
  • Loading branch information
coocood authored Jun 29, 2016
1 parent a0e3c2e commit 83a6c94
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 28 deletions.
6 changes: 5 additions & 1 deletion inspectkv/inspectkv.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,11 @@ func iterRecords(retriever kv.Retriever, t table.Table, startKey kv.Key, cols []
}
data := make([]types.Datum, 0, len(cols))
for _, col := range cols {
data = append(data, rowMap[col.ID])
if col.IsPKHandleColumn(t.Meta()) {
data = append(data, types.NewIntDatum(handle))
} else {
data = append(data, rowMap[col.ID])
}
}
more, err := fn(handle, data, cols)
if !more || err != nil {
Expand Down
61 changes: 35 additions & 26 deletions inspectkv/inspectkv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,40 +68,49 @@ func (s *testSuite) SetUpSuite(c *C) {
}
err = t.CreateDatabase(s.dbInfo)
c.Assert(err, IsNil)

pkFieldType := types.NewFieldType(mysql.TypeLong)
pkFieldType.Flag = mysql.PriKeyFlag | mysql.NotNullFlag
pkCol := &model.ColumnInfo{
Name: model.NewCIStr("pk"),
ID: 1,
Offset: 0,
State: model.StatePublic,
FieldType: *pkFieldType,
}
col := &model.ColumnInfo{
Name: model.NewCIStr("c"),
ID: 0,
Offset: 0,
ID: 2,
Offset: 1,
DefaultValue: 1,
State: model.StatePublic,
FieldType: *types.NewFieldType(mysql.TypeLong),
}
col1 := &model.ColumnInfo{
Name: model.NewCIStr("c1"),
ID: 1,
Offset: 1,
ID: 3,
Offset: 2,
DefaultValue: 1,
State: model.StatePublic,
FieldType: *types.NewFieldType(mysql.TypeLong),
}
idx := &model.IndexInfo{
Name: model.NewCIStr("c"),
ID: 1,
ID: 4,
Unique: true,
Columns: []*model.IndexColumn{{
Name: model.NewCIStr("c"),
Offset: 0,
Offset: 1,
Length: 255,
}},
State: model.StatePublic,
}
s.tbInfo = &model.TableInfo{
ID: 1,
Name: model.NewCIStr("t"),
State: model.StatePublic,
Columns: []*model.ColumnInfo{col, col1},
Indices: []*model.IndexInfo{idx},
ID: 5,
Name: model.NewCIStr("t"),
State: model.StatePublic,
Columns: []*model.ColumnInfo{pkCol, col, col1},
Indices: []*model.IndexInfo{idx},
PKIsHandle: true,
}
err = t.CreateTable(s.dbInfo.ID, s.tbInfo)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -185,12 +194,12 @@ func (s *testSuite) TestScan(c *C) {
tb, err := tables.TableFromMeta(alloc, s.tbInfo)
c.Assert(err, IsNil)
indices := tb.Indices()
_, err = tb.AddRecord(s.ctx, types.MakeDatums(10, 11))
_, err = tb.AddRecord(s.ctx, types.MakeDatums(1, 10, 11))
c.Assert(err, IsNil)
s.ctx.CommitTxn()

record1 := &RecordData{Handle: int64(1), Values: types.MakeDatums(int64(10), int64(11))}
record2 := &RecordData{Handle: int64(2), Values: types.MakeDatums(int64(20), int64(21))}
record1 := &RecordData{Handle: int64(1), Values: types.MakeDatums(int64(1), int64(10), int64(11))}
record2 := &RecordData{Handle: int64(2), Values: types.MakeDatums(int64(2), int64(20), int64(21))}
ver, err := s.store.CurrentVersion()
c.Assert(err, IsNil)
records, _, err := ScanSnapshotTableRecord(s.store, ver, tb, int64(1), 1)
Expand Down Expand Up @@ -307,9 +316,9 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
c.Assert(err, IsNil)
c.Assert(cnt, Equals, int64(2))

// current index data:
// set data to:
// index data (handle, data): (1, 10), (2, 20), (3, 30)
// index col data (handle, data): (1, 10), (2, 20), (4, 40)
// table data (handle, data): (1, 10), (2, 20), (4, 40)
err = idx.Create(txn, types.MakeDatums(int64(30)), 3)
c.Assert(err, IsNil)
key := tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4))
Expand All @@ -325,9 +334,9 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
diffMsg := newDiffRetError("index", record1, nil)
c.Assert(err.Error(), DeepEquals, diffMsg)

// current index data:
// set data to:
// index data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40)
// index col data (handle, data): (1, 10), (2, 20), (4, 40), (3, 31)
// table data (handle, data): (1, 10), (2, 20), (4, 40), (3, 31)
err = idx.Create(txn, types.MakeDatums(int64(40)), 4)
c.Assert(err, IsNil)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3))
Expand All @@ -343,9 +352,9 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
diffMsg = newDiffRetError("index", record1, record2)
c.Assert(err.Error(), DeepEquals, diffMsg)

// current index data:
// set data to:
// index data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40)
// index col data (handle, data): (1, 10), (2, 20), (4, 40), (5, 30)
// table data (handle, data): (1, 10), (2, 20), (4, 40), (5, 30)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3))
txn.Delete(key)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 5))
Expand All @@ -361,9 +370,9 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
diffMsg = newDiffRetError("index", record1, record2)
c.Assert(err.Error(), DeepEquals, diffMsg)

// current index data:
// set data to:
// index data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40)
// index col data (handle, data): (1, 10), (2, 20), (3, 30)
// table data (handle, data): (1, 10), (2, 20), (3, 30)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4))
txn.Delete(key)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3))
Expand All @@ -379,9 +388,9 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
diffMsg = newDiffRetError("index", record1, nil)
c.Assert(err.Error(), DeepEquals, diffMsg)

// current index data:
// set data to:
// index data (handle, data): (1, 10), (2, 20), (3, 30)
// index col data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40)
// table data (handle, data): (1, 10), (2, 20), (3, 30), (4, 40)
err = idx.Delete(txn, types.MakeDatums(int64(40)), 4)
c.Assert(err, IsNil)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4))
Expand All @@ -399,7 +408,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {

func setColValue(c *C, txn kv.Transaction, key kv.Key, v types.Datum) {
row := []types.Datum{v, {}}
colIDs := []int64{0, 1}
colIDs := []int64{2, 3}
value, err := tablecodec.EncodeRow(row, colIDs)
c.Assert(err, IsNil)
err = txn.Set(key, value)
Expand Down
6 changes: 5 additions & 1 deletion table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,11 @@ func (t *Table) IterRecords(ctx context.Context, startKey kv.Key, cols []*table.
}
data := make([]types.Datum, 0, len(cols))
for _, col := range cols {
data = append(data, rowMap[col.ID])
if col.IsPKHandleColumn(t.Meta()) {
data = append(data, types.NewIntDatum(handle))
} else {
data = append(data, rowMap[col.ID])
}
}
more, err := fn(handle, data, cols)
if !more || err != nil {
Expand Down
20 changes: 20 additions & 0 deletions table/tables/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,23 @@ func (ts *testSuite) TestUnsignedPK(c *C) {
c.Assert(len(row), Equals, 2)
c.Assert(row[0].Kind(), Equals, types.KindUint64)
}

func (ts *testSuite) TestIterRecords(c *C) {
defer testleak.AfterTest(c)()
_, err := ts.se.Execute("CREATE TABLE test.tIter (a int primary key, b int)")
c.Assert(err, IsNil)
_, err = ts.se.Execute("INSERT test.tIter VALUES (1, 2), (2, NULL)")
c.Assert(err, IsNil)
ctx := ts.se.(context.Context)
dom := sessionctx.GetDomain(ctx)
tb, err := dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("tIter"))
c.Assert(err, IsNil)
totalCount := 0
err = tb.IterRecords(ctx, tb.FirstKey(), tb.Cols(), func(h int64, rec []types.Datum, cols []*table.Column) (bool, error) {
totalCount++
c.Assert(rec[0].IsNull(), IsFalse)
return true, nil
})
c.Assert(err, IsNil)
c.Assert(totalCount, Equals, 2)
}

0 comments on commit 83a6c94

Please sign in to comment.