Skip to content

Commit

Permalink
tablecodec: Add EncodeRow and DecodeRow (pingcap#1341)
Browse files Browse the repository at this point in the history
Add EncodeRow and DecodeRow
  • Loading branch information
shenli authored Jun 23, 2016
1 parent 1f08055 commit 02aa787
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
51 changes: 51 additions & 0 deletions tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,26 @@ func EncodeValue(raw types.Datum) ([]byte, error) {
return b, errors.Trace(err)
}

// EncodeRow encode row data and column ids into a slice of byte.
// Row layout: colID1, value1, colID2, value2, .....
func EncodeRow(row []types.Datum, colIDs []int64) ([]byte, error) {
if len(row) != len(colIDs) {
return nil, errors.Errorf("EncodeRow error: data and columnID count not match %d vs %d", len(row), len(colIDs))
}
values := make([]types.Datum, 2*len(row))
for i, c := range row {
id := colIDs[i]
idv := types.NewIntDatum(id)
values[2*i] = idv
fc, err := flatten(c)
if err != nil {
return nil, errors.Trace(err)
}
values[2*i+1] = fc
}
return codec.EncodeValue(nil, values...)
}

func flatten(data types.Datum) (types.Datum, error) {
switch data.Kind() {
case types.KindMysqlTime:
Expand Down Expand Up @@ -193,6 +213,37 @@ func DecodeColumnValue(data []byte, ft *types.FieldType) (types.Datum, error) {
return colDatum, nil
}

// DecodeRow decodes a byte slice into datums.
// TODO: We should only decode columns in the cols map.
// Row layout: colID1, value1, colID2, value2, .....
func DecodeRow(data []byte, cols map[int64]*types.FieldType) (map[int64]types.Datum, error) {
if data == nil {
return nil, nil
}
values, err := codec.Decode(data)
if err != nil {
return nil, errors.Trace(err)
}
if len(values)%2 != 0 {
return nil, errors.New("Decoded row value length is not even number!")
}
row := make(map[int64]types.Datum, len(cols))
for i := 0; i < len(values); i += 2 {
cid := values[i]
id := cid.GetInt64()
ft, ok := cols[id]
if ok {
v := values[i+1]
v, err = Unflatten(v, ft)
if err != nil {
return nil, errors.Trace(err)
}
row[id] = v
}
}
return row, nil
}

// Unflatten converts a raw datum to a column datum.
func Unflatten(datum types.Datum, ft *types.FieldType) (types.Datum, error) {
if datum.IsNull() {
Expand Down
80 changes: 80 additions & 0 deletions tablecodec/tablecodec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ import (
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
)

func TestT(t *testing.T) {
Expand All @@ -42,3 +44,81 @@ func (s *testTableCodecSuite) TestTableCodec(c *C) {
c.Assert(err, IsNil)
c.Assert(h, Equals, int64(2))
}

// column is a structure used for test
type column struct {
id int64
tp *types.FieldType
}

func (s *testTableCodecSuite) TestRowCodec(c *C) {
defer testleak.AfterTest(c)()

c1 := &column{id: 1, tp: types.NewFieldType(mysql.TypeLonglong)}
c2 := &column{id: 2, tp: types.NewFieldType(mysql.TypeVarchar)}
c3 := &column{id: 3, tp: types.NewFieldType(mysql.TypeNewDecimal)}
cols := []*column{c1, c2, c3}

row := make([]types.Datum, 3)
row[0] = types.NewIntDatum(100)
row[1] = types.NewBytesDatum([]byte("abc"))
row[2] = types.NewDecimalDatum(mysql.NewDecimalFromInt(1, 1))
// Encode
colIDs := make([]int64, 0, 3)
for _, col := range cols {
colIDs = append(colIDs, col.id)
}
bs, err := EncodeRow(row, colIDs)
c.Assert(err, IsNil)
c.Assert(bs, NotNil)

// Decode
colMap := make(map[int64]*types.FieldType, 3)
for _, col := range cols {
colMap[col.id] = col.tp
}
r, err := DecodeRow(bs, colMap)
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r, HasLen, 3)
// Compare decoded row and original row
for i, col := range cols {
v, ok := r[col.id]
c.Assert(ok, IsTrue)
equal, err1 := v.CompareDatum(row[i])
c.Assert(err1, IsNil)
c.Assert(equal, Equals, 0)
}

// colMap may contains more columns than encoded row.
colMap[4] = types.NewFieldType(mysql.TypeFloat)
r, err = DecodeRow(bs, colMap)
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r, HasLen, 3)
for i, col := range cols {
v, ok := r[col.id]
c.Assert(ok, IsTrue)
equal, err1 := v.CompareDatum(row[i])
c.Assert(err1, IsNil)
c.Assert(equal, Equals, 0)
}

// colMap may contains less columns than encoded row.
delete(colMap, 3)
delete(colMap, 4)
r, err = DecodeRow(bs, colMap)
c.Assert(err, IsNil)
c.Assert(r, NotNil)
c.Assert(r, HasLen, 2)
for i, col := range cols {
if i > 1 {
break
}
v, ok := r[col.id]
c.Assert(ok, IsTrue)
equal, err1 := v.CompareDatum(row[i])
c.Assert(err1, IsNil)
c.Assert(equal, Equals, 0)
}
}

0 comments on commit 02aa787

Please sign in to comment.