Skip to content

Commit

Permalink
store: implement MVCCDebugger Interface for MVCCLevelDB (pingcap#11754)
Browse files Browse the repository at this point in the history
  • Loading branch information
Deardrops authored and sre-bot committed Sep 5, 2019
1 parent 5199e21 commit 161ff3c
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 49 deletions.
67 changes: 24 additions & 43 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@
package server

import (
"bytes"
"database/sql"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand All @@ -30,7 +28,6 @@ import (

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
zaplog "github.com/pingcap/log"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
Expand Down Expand Up @@ -329,14 +326,16 @@ func (ts *HTTPHandlerTestSuite) prepareData(c *C) {

func decodeKeyMvcc(closer io.ReadCloser, c *C, valid bool) {
decoder := json.NewDecoder(closer)
var data kvrpcpb.MvccGetByKeyResponse
var data mvccKV
err := decoder.Decode(&data)
c.Assert(err, IsNil)
if valid {
c.Assert(data.Info, NotNil)
c.Assert(len(data.Info.Writes), Greater, 0)
c.Assert(data.Value.Info, NotNil)
c.Assert(len(data.Value.Info.Writes), Greater, 0)
} else {
c.Assert(data.Info, IsNil)
c.Assert(data.Value.Info.Lock, IsNil)
c.Assert(data.Value.Info.Writes, IsNil)
c.Assert(data.Value.Info.Values, IsNil)
}
}

Expand All @@ -345,45 +344,35 @@ func (ts *HTTPHandlerTestSuite) TestGetTableMVCC(c *C) {
ts.prepareData(c)
defer ts.stopServer(c)

c.Skip("MVCCLevelDB doesn't implement MVCCDebugger interface.")
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:10090/mvcc/key/tidb/test/1"))
c.Assert(err, IsNil)
decoder := json.NewDecoder(resp.Body)
var data kvrpcpb.MvccGetByKeyResponse
var data mvccKV
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(data.Info, NotNil)
c.Assert(len(data.Info.Writes), Greater, 0)
startTs := data.Info.Writes[0].StartTs

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/mvcc/txn/%d", startTs))
c.Assert(err, IsNil)
var p1 kvrpcpb.MvccGetByStartTsResponse
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&p1)
c.Assert(err, IsNil)
c.Assert(data.Value, NotNil)
info := data.Value.Info
c.Assert(info, NotNil)
c.Assert(len(info.Writes), Greater, 0)
startTs := info.Writes[0].StartTs

resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/mvcc/txn/%d/tidb/test", startTs))
c.Assert(err, IsNil)
var p2 kvrpcpb.MvccGetByStartTsResponse
var p2 mvccKV
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&p2)
c.Assert(err, IsNil)

for id, expect := range data.Info.Values {
v1 := p1.Info.Values[id].Value
v2 := p2.Info.Values[id].Value
c.Assert(bytes.Equal(v1, expect.Value), IsTrue)
c.Assert(bytes.Equal(v2, expect.Value), IsTrue)
for i, expect := range info.Values {
v2 := p2.Value.Info.Values[i].Value
c.Assert(v2, BytesEquals, expect.Value)
}

_, key, err := codec.DecodeBytes(p1.Key, nil)
c.Assert(err, IsNil)
hexKey := hex.EncodeToString(key)
hexKey := p2.Key
resp, err = http.Get("http://127.0.0.1:10090/mvcc/hex/" + hexKey)
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
var data2 kvrpcpb.MvccGetByKeyResponse
var data2 mvccKV
err = decoder.Decode(&data2)
c.Assert(err, IsNil)
c.Assert(data2, DeepEquals, data)
Expand All @@ -396,19 +385,12 @@ func (ts *HTTPHandlerTestSuite) TestGetMVCCNotFound(c *C) {
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:10090/mvcc/key/tidb/test/1234"))
c.Assert(err, IsNil)
decoder := json.NewDecoder(resp.Body)
var data kvrpcpb.MvccGetByKeyResponse
var data mvccKV
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(data.Info, IsNil)

c.Skip("MVCCLevelDB doesn't implement MVCCDebugger interface.")
resp, err = http.Get(fmt.Sprintf("http://127.0.0.1:10090/mvcc/txn/0"))
c.Assert(err, IsNil)
var p kvrpcpb.MvccGetByStartTsResponse
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&p)
c.Assert(err, IsNil)
c.Assert(p.Info, IsNil)
c.Assert(data.Value.Info.Lock, IsNil)
c.Assert(data.Value.Info.Writes, IsNil)
c.Assert(data.Value.Info.Values, IsNil)
}

func (ts *HTTPHandlerTestSuite) TestDecodeColumnValue(c *C) {
Expand Down Expand Up @@ -479,7 +461,6 @@ func (ts *HTTPHandlerTestSuite) TestGetIndexMVCC(c *C) {
ts.prepareData(c)
defer ts.stopServer(c)

c.Skip("MVCCLevelDB doesn't implement MVCCDebugger interface.")
// tests for normal index key
resp, err := http.Get("http://127.0.0.1:10090/mvcc/index/tidb/test/idx1/1?a=1&b=2")
c.Assert(err, IsNil)
Expand Down Expand Up @@ -520,14 +501,14 @@ func (ts *HTTPHandlerTestSuite) TestGetIndexMVCC(c *C) {
resp, err = http.Get("http://127.0.0.1:10090/mvcc/index/tidb/test/idx1/1?a=1")
c.Assert(err, IsNil)
decoder := json.NewDecoder(resp.Body)
var data1 kvrpcpb.MvccGetByKeyResponse
var data1 mvccKV
err = decoder.Decode(&data1)
c.Assert(err, NotNil)

resp, err = http.Get("http://127.0.0.1:10090/mvcc/index/tidb/test/idx2/1?a=1")
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
var data2 kvrpcpb.MvccGetByKeyResponse
var data2 mvccKV
err = decoder.Decode(&data2)
c.Assert(err, NotNil)
}
Expand Down
11 changes: 7 additions & 4 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,14 @@ func (s *Server) startHTTPServer() {
router.Handle("/regions/meta", regionHandler{tikvHandlerTool}).Name("RegionsMeta")
router.Handle("/regions/hot", regionHandler{tikvHandlerTool}).Name("RegionHot")
router.Handle("/regions/{regionID}", regionHandler{tikvHandlerTool})
router.Handle("/mvcc/key/{db}/{table}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByKey})
router.Handle("/mvcc/txn/{startTS}/{db}/{table}", mvccTxnHandler{tikvHandlerTool, opMvccGetByTxn})
router.Handle("/mvcc/hex/{hexKey}", mvccTxnHandler{tikvHandlerTool, opMvccGetByHex})
router.Handle("/mvcc/index/{db}/{table}/{index}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByIdx})
}

// HTTP path for get MVCC info
router.Handle("/mvcc/key/{db}/{table}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByKey})
router.Handle("/mvcc/txn/{startTS}/{db}/{table}", mvccTxnHandler{tikvHandlerTool, opMvccGetByTxn})
router.Handle("/mvcc/hex/{hexKey}", mvccTxnHandler{tikvHandlerTool, opMvccGetByHex})
router.Handle("/mvcc/index/{db}/{table}/{index}/{handle}", mvccTxnHandler{tikvHandlerTool, opMvccGetByIdx})

addr := fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, s.cfg.Status.StatusPort)
if s.cfg.Status.StatusPort == 0 {
addr = fmt.Sprintf("%s:%d", s.cfg.Status.StatusHost, defaultStatusPort)
Expand Down
16 changes: 16 additions & 0 deletions store/mockstore/mocktikv/mock_tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,3 +650,19 @@ func (s *testMVCCLevelDB) TestErrors(c *C) {
c.Assert(ErrAlreadyCommitted(0).Error(), Equals, "txn already committed")
c.Assert((&ErrConflict{}).Error(), Equals, "write conflict")
}

func (s *testMVCCLevelDB) TestMvccGetByKey(c *C) {
s.mustPrewriteOK(c, putMutations("q1", "v5"), "p1", 5)
debugger, ok := s.store.(MVCCDebugger)
c.Assert(ok, IsTrue)
mvccInfo := debugger.MvccGetByKey([]byte("q1"))
except := &kvrpcpb.MvccInfo{
Lock: &kvrpcpb.MvccLock{
Type: kvrpcpb.Op_Put,
StartTs: 5,
Primary: []byte("p1"),
ShortValue: []byte("v5"),
},
}
c.Assert(mvccInfo, DeepEquals, except)
}
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ type RawKV interface {

// MVCCDebugger is for debugging.
type MVCCDebugger interface {
MvccGetByStartTS(startKey, endKey []byte, starTS uint64) (*kvrpcpb.MvccInfo, []byte)
MvccGetByStartTS(starTS uint64) (*kvrpcpb.MvccInfo, []byte)
MvccGetByKey(key []byte) *kvrpcpb.MvccInfo
}

Expand Down
104 changes: 104 additions & 0 deletions store/mockstore/mocktikv/mvcc_leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,3 +1273,107 @@ func (mvcc *MVCCLevelDB) doRawDeleteRange(startKey, endKey []byte) error {

return mvcc.db.Write(batch, nil)
}

// MvccGetByStartTS implements the MVCCDebugger interface.
func (mvcc *MVCCLevelDB) MvccGetByStartTS(starTS uint64) (*kvrpcpb.MvccInfo, []byte) {
mvcc.mu.RLock()
defer mvcc.mu.RUnlock()

var key []byte
iter := newIterator(mvcc.db, nil)
defer iter.Release()

// find the first committed key for which `start_ts` equals to `ts`
for iter.Valid() {
var value mvccValue
err := value.UnmarshalBinary(iter.Value())
if err == nil && value.startTS == starTS {
_, key, _ = codec.DecodeBytes(iter.Key(), nil)
break
}
iter.Next()
}
if key == nil {
return nil, nil
}

return mvcc.MvccGetByKey(key), key
}

var valueTypeOpMap = [...]kvrpcpb.Op{
typePut: kvrpcpb.Op_Put,
typeDelete: kvrpcpb.Op_Del,
typeRollback: kvrpcpb.Op_Rollback,
}

// MvccGetByKey implements the MVCCDebugger interface.
func (mvcc *MVCCLevelDB) MvccGetByKey(key []byte) *kvrpcpb.MvccInfo {
mvcc.mu.RLock()
defer mvcc.mu.RUnlock()

info := &kvrpcpb.MvccInfo{}

startKey := mvccEncode(key, lockVer)
iter := newIterator(mvcc.db, &util.Range{
Start: startKey,
})
defer iter.Release()

dec1 := lockDecoder{expectKey: key}
ok, err := dec1.Decode(iter)
if err != nil {
return nil
}
if ok {
var shortValue []byte
if isShortValue(dec1.lock.value) {
shortValue = dec1.lock.value
}
info.Lock = &kvrpcpb.MvccLock{
Type: dec1.lock.op,
StartTs: dec1.lock.startTS,
Primary: dec1.lock.primary,
ShortValue: shortValue,
}
}

dec2 := valueDecoder{expectKey: key}
var writes []*kvrpcpb.MvccWrite
var values []*kvrpcpb.MvccValue
for iter.Valid() {
ok, err := dec2.Decode(iter)
if err != nil {
return nil
}
if !ok {
iter.Next()
break
}
var shortValue []byte
if isShortValue(dec2.value.value) {
shortValue = dec2.value.value
}
write := &kvrpcpb.MvccWrite{
Type: valueTypeOpMap[dec2.value.valueType],
StartTs: dec2.value.startTS,
CommitTs: dec2.value.commitTS,
ShortValue: shortValue,
}
writes = append(writes, write)
value := &kvrpcpb.MvccValue{
StartTs: dec2.value.startTS,
Value: dec2.value.value,
}
values = append(values, value)
}
info.Writes = writes
info.Values = values

return info
}

const shortValueMaxLen = 64

func isShortValue(value []byte) bool {
return len(value) <= shortValueMaxLen
}
2 changes: 1 addition & 1 deletion store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (h *rpcHandler) handleMvccGetByStartTS(req *kvrpcpb.MvccGetByStartTsRequest
}
}
var resp kvrpcpb.MvccGetByStartTsResponse
resp.Info, resp.Key = debugger.MvccGetByStartTS(h.startKey, h.endKey, req.StartTs)
resp.Info, resp.Key = debugger.MvccGetByStartTS(req.StartTs)
return &resp
}

Expand Down

0 comments on commit 161ff3c

Please sign in to comment.