Skip to content

Commit

Permalink
*: improve performance of DecodeBytes in DecodeOneToChunk (pingcap#6135)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored and zhexuany committed Mar 29, 2018
1 parent 4a42252 commit c398110
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 37 deletions.
4 changes: 2 additions & 2 deletions distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,10 @@ func (r *selectResult) getSelectResp() error {
func (r *selectResult) readRowsData(chk *chunk.Chunk) (err error) {
rowsData := r.selectResp.Chunks[r.respChkIdx].RowsData
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
timeZone := r.ctx.GetSessionVars().GetTimeZone()
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().GetTimeZone())
for chk.NumRows() < maxChunkSize && len(rowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
rowsData, err = codec.DecodeOneToChunk(rowsData, chk, i, r.fieldTypes[i], timeZone)
rowsData, err = decoder.DecodeOne(rowsData, i, r.fieldTypes[i])
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions distsql/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ func (r *streamResult) readDataIfNecessary(ctx context.Context) error {
func (r *streamResult) flushToChunk(chk *chunk.Chunk) (err error) {
remainRowsData := r.curr.RowsData
maxChunkSize := r.ctx.GetSessionVars().MaxChunkSize
timeZone := r.ctx.GetSessionVars().GetTimeZone()
decoder := codec.NewDecoder(chk, r.ctx.GetSessionVars().GetTimeZone())
for chk.NumRows() < maxChunkSize && len(remainRowsData) > 0 {
for i := 0; i < r.rowLen; i++ {
remainRowsData, err = codec.DecodeOneToChunk(remainRowsData, chk, i, r.fieldTypes[i], timeZone)
remainRowsData, err = decoder.DecodeOne(remainRowsData, i, r.fieldTypes[i])
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ func (ts *HTTPHandlerTestSuite) TestGetTableMVCC(c *C) {
c.Assert(bytes.Equal(v2, expect.Value), IsTrue)
}

_, key, err := codec.DecodeBytes(p1.Key)
_, key, err := codec.DecodeBytes(p1.Key, nil)
c.Assert(err, IsNil)
hexKey := hex.EncodeToString(key)
resp, err = http.Get("http://127.0.0.1:10090/mvcc/hex/" + hexKey)
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,7 +857,7 @@ func (key MvccKey) Raw() []byte {
if len(key) == 0 {
return nil
}
_, k, err := codec.DecodeBytes(key)
_, k, err := codec.DecodeBytes(key, nil)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func mvccEncode(key []byte, ver uint64) []byte {
// just returns the origin key.
func mvccDecode(encodedKey []byte) ([]byte, uint64, error) {
// Skip DataPrefix
remainBytes, key, err := codec.DecodeBytes(encodedKey)
remainBytes, key, err := codec.DecodeBytes(encodedKey, nil)
if err != nil {
// should never happen
return nil, 0, errors.Trace(err)
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/pd_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ func processRegionResult(region *metapb.Region, peer *metapb.Peer, err error) (*

func decodeRegionMetaKey(r *metapb.Region) error {
if len(r.StartKey) != 0 {
_, decoded, err := codec.DecodeBytes(r.StartKey)
_, decoded, err := codec.DecodeBytes(r.StartKey, nil)
if err != nil {
return errors.Trace(err)
}
r.StartKey = decoded
}
if len(r.EndKey) != 0 {
_, decoded, err := codec.DecodeBytes(r.EndKey)
_, decoded, err := codec.DecodeBytes(r.EndKey, nil)
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions structure/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (t *TxStructure) decodeHashDataKey(ek kv.Key) ([]byte, []byte, error) {

ek = ek[len(t.prefix):]

ek, key, err = codec.DecodeBytes(ek)
ek, key, err = codec.DecodeBytes(ek, nil)
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand All @@ -89,7 +89,7 @@ func (t *TxStructure) decodeHashDataKey(ek kv.Key) ([]byte, []byte, error) {
return nil, nil, errInvalidHashKeyFlag.Gen("invalid encoded hash data key flag %c", byte(tp))
}

_, field, err = codec.DecodeBytes(ek)
_, field, err = codec.DecodeBytes(ek, nil)
return key, field, errors.Trace(err)
}

Expand Down
16 changes: 16 additions & 0 deletions util/codec/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package codec
import (
"testing"

"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

var valueCnt = 100
Expand Down Expand Up @@ -71,3 +73,17 @@ func BenchmarkDecodeDecimal(b *testing.B) {
DecodeDecimal(raw)
}
}

func BenchmarkDecodeOneToChunk(b *testing.B) {
str := new(types.Datum)
*str = types.NewStringDatum("a")
var raw []byte
raw = append(raw, bytesFlag)
raw = EncodeBytes(raw, str.GetBytes())
intType := types.NewFieldType(mysql.TypeLonglong)
b.ResetTimer()
decoder := NewDecoder(chunk.NewChunk([]*types.FieldType{intType}), nil)
for i := 0; i < b.N; i++ {
decoder.DecodeOne(raw, 0, intType)
}
}
22 changes: 13 additions & 9 deletions util/codec/bytes.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ func EncodeBytes(b []byte, data []byte) []byte {
return result
}

func decodeBytes(b []byte, reverse bool) ([]byte, []byte, error) {
data := make([]byte, 0, len(b))
func decodeBytes(b []byte, buf []byte, reverse bool) ([]byte, []byte, error) {
if buf == nil {
buf = make([]byte, 0, len(b))
}
buf = buf[:0]
for {
if len(b) < encGroupSize+1 {
return nil, nil, errors.New("insufficient bytes to decode value")
Expand All @@ -91,7 +94,7 @@ func decodeBytes(b []byte, reverse bool) ([]byte, []byte, error) {
}

realGroupSize := encGroupSize - padCount
data = append(data, group[:realGroupSize]...)
buf = append(buf, group[:realGroupSize]...)
b = b[encGroupSize+1:]

if padCount != 0 {
Expand All @@ -109,15 +112,16 @@ func decodeBytes(b []byte, reverse bool) ([]byte, []byte, error) {
}
}
if reverse {
reverseBytes(data)
reverseBytes(buf)
}
return b, data, nil
return b, buf, nil
}

// DecodeBytes decodes bytes which is encoded by EncodeBytes before,
// returns the leftover bytes and decoded value if no error.
func DecodeBytes(b []byte) ([]byte, []byte, error) {
return decodeBytes(b, false)
// `buf` is used to buffer data to avoid the cost of makeslice in decodeBytes when DecodeBytes is called by Decoder.DecodeOne.
func DecodeBytes(b []byte, buf []byte) ([]byte, []byte, error) {
return decodeBytes(b, buf, false)
}

// EncodeBytesDesc first encodes bytes using EncodeBytes, then bitwise reverses
Expand All @@ -131,8 +135,8 @@ func EncodeBytesDesc(b []byte, data []byte) []byte {

// DecodeBytesDesc decodes bytes which is encoded by EncodeBytesDesc before,
// returns the leftover bytes and decoded value if no error.
func DecodeBytesDesc(b []byte) ([]byte, []byte, error) {
return decodeBytes(b, true)
func DecodeBytesDesc(b []byte, buf []byte) ([]byte, []byte, error) {
return decodeBytes(b, buf, true)
}

// EncodeCompactBytes joins bytes with its length into a byte slice. It is more
Expand Down
6 changes: 3 additions & 3 deletions util/codec/bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ func (s *testBytesSuite) TestBytesCodec(c *C) {
if input.desc {
b := EncodeBytesDesc(nil, input.enc)
c.Assert(b, BytesEquals, input.dec)
_, d, err := DecodeBytesDesc(b)
_, d, err := DecodeBytesDesc(b, nil)
c.Assert(err, IsNil)
c.Assert(d, BytesEquals, input.enc)
} else {
b := EncodeBytes(nil, input.enc)
c.Assert(b, BytesEquals, input.dec)
_, d, err := DecodeBytes(b)
_, d, err := DecodeBytes(b, nil)
c.Assert(err, IsNil)
c.Assert(d, BytesEquals, input.enc)
}
Expand All @@ -78,7 +78,7 @@ func (s *testBytesSuite) TestBytesCodec(c *C) {
}

for _, input := range errInputs {
_, _, err := DecodeBytes(input)
_, _, err := DecodeBytes(input, nil)
c.Assert(err, NotNil)
}
}
35 changes: 26 additions & 9 deletions util/codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func DecodeOne(b []byte) (remain []byte, d types.Datum, err error) {
d.SetFloat64(v)
case bytesFlag:
var v []byte
b, v, err = DecodeBytes(b)
b, v, err = DecodeBytes(b, nil)
d.SetBytes(v)
case compactBytesFlag:
var v []byte
Expand Down Expand Up @@ -481,11 +481,29 @@ func peekUvarint(b []byte) (int, error) {
return n, nil
}

// DecodeOneToChunk decodes one value to chunk and returns the remained bytes.
func DecodeOneToChunk(b []byte, chk *chunk.Chunk, colIdx int, ft *types.FieldType, loc *time.Location) (remain []byte, err error) {
// Decoder is used to decode value to chunk.
type Decoder struct {
chk *chunk.Chunk
timezone *time.Location

// buf is only used for DecodeBytes to avoid the cost of makeslice.
buf []byte
}

// NewDecoder creates a Decoder.
func NewDecoder(chk *chunk.Chunk, timezone *time.Location) *Decoder {
return &Decoder{
chk: chk,
timezone: timezone,
}
}

// DecodeOne decodes one value to chunk and returns the remained bytes.
func (decoder *Decoder) DecodeOne(b []byte, colIdx int, ft *types.FieldType) (remain []byte, err error) {
if len(b) < 1 {
return nil, errors.New("invalid encoded key")
}
chk := decoder.chk
flag := b[0]
b = b[1:]
switch flag {
Expand All @@ -502,7 +520,7 @@ func DecodeOneToChunk(b []byte, chk *chunk.Chunk, colIdx int, ft *types.FieldTyp
if err != nil {
return nil, errors.Trace(err)
}
err = appendUintToChunk(v, chk, colIdx, ft, loc)
err = appendUintToChunk(v, chk, colIdx, ft, decoder.timezone)
case varintFlag:
var v int64
b, v, err = DecodeVarint(b)
Expand All @@ -516,7 +534,7 @@ func DecodeOneToChunk(b []byte, chk *chunk.Chunk, colIdx int, ft *types.FieldTyp
if err != nil {
return nil, errors.Trace(err)
}
err = appendUintToChunk(v, chk, colIdx, ft, loc)
err = appendUintToChunk(v, chk, colIdx, ft, decoder.timezone)
case floatFlag:
var v float64
b, v, err = DecodeFloat(b)
Expand All @@ -525,12 +543,11 @@ func DecodeOneToChunk(b []byte, chk *chunk.Chunk, colIdx int, ft *types.FieldTyp
}
appendFloatToChunk(v, chk, colIdx, ft)
case bytesFlag:
var v []byte
b, v, err = DecodeBytes(b)
b, decoder.buf, err = DecodeBytes(b, decoder.buf)
if err != nil {
return nil, errors.Trace(err)
}
chk.AppendBytes(colIdx, v)
chk.AppendBytes(colIdx, decoder.buf)
case compactBytesFlag:
var v []byte
b, v, err = DecodeCompactBytes(b)
Expand Down Expand Up @@ -569,7 +586,7 @@ func DecodeOneToChunk(b []byte, chk *chunk.Chunk, colIdx int, ft *types.FieldTyp
if err != nil {
return nil, errors.Trace(err)
}
return b, nil
return b, errors.Trace(err)
}

func appendIntToChunk(val int64, chk *chunk.Chunk, colIdx int, ft *types.FieldType) {
Expand Down
11 changes: 6 additions & 5 deletions util/codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,12 @@ func (s *testCodecSuite) TestBytes(c *C) {

for _, t := range tblBytes {
b := EncodeBytes(nil, t)
_, v, err := DecodeBytes(b)
_, v, err := DecodeBytes(b, nil)
c.Assert(err, IsNil)
c.Assert(t, DeepEquals, v, Commentf("%v - %v - %v", t, b, v))

b = EncodeBytesDesc(nil, t)
_, v, err = DecodeBytesDesc(b)
_, v, err = DecodeBytesDesc(b, nil)
c.Assert(err, IsNil)
c.Assert(t, DeepEquals, v, Commentf("%v - %v - %v", t, b, v))

Expand Down Expand Up @@ -912,20 +912,21 @@ func (s *testCodecSuite) TestDecodeOneToChunk(c *C) {
tps = append(tps, t.tp)
datums = append(datums, types.NewDatum(t.value))
}
chk := chunk.NewChunk(tps)
rowCount := 3
decoder := NewDecoder(chunk.NewChunk(tps), time.Local)
for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
encoded, err := EncodeValue(sc, nil, datums...)
c.Assert(err, IsNil)
decoder.buf = make([]byte, 0, len(encoded))
for colIdx, t := range table {
encoded, err = DecodeOneToChunk(encoded, chk, colIdx, t.tp, time.Local)
encoded, err = decoder.DecodeOne(encoded, colIdx, t.tp)
c.Assert(err, IsNil)
}
}

for colIdx, t := range table {
for rowIdx := 0; rowIdx < rowCount; rowIdx++ {
got := chk.GetRow(rowIdx).GetDatum(colIdx, t.tp)
got := decoder.chk.GetRow(rowIdx).GetDatum(colIdx, t.tp)
expect := datums[colIdx]
if got.IsNull() {
c.Assert(expect.IsNull(), IsTrue)
Expand Down

0 comments on commit c398110

Please sign in to comment.