Skip to content

Commit

Permalink
server/http_handler: use func in helper instead of use tikvrpc direct…
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored May 25, 2021
1 parent ad72d38 commit 0199b40
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 76 deletions.
73 changes: 4 additions & 69 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -198,71 +197,7 @@ func (t *tikvHandlerTool) getHandle(tb table.PhysicalTable, params map[string]st
return handle, nil
}

func (t *tikvHandlerTool) getMvccByStartTs(startTS uint64, startKey, endKey kv.Key) (*mvccKV, error) {
bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil)
for {
curRegion, err := t.RegionCache.LocateKey(bo, startKey)
if err != nil {
logutil.BgLogger().Error("get MVCC by startTS failed", zap.Uint64("txnStartTS", startTS),
zap.Stringer("startKey", startKey), zap.Error(err))
return nil, errors.Trace(err)
}

tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByStartTs, &kvrpcpb.MvccGetByStartTsRequest{
StartTs: startTS,
})
tikvReq.Context.Priority = kvrpcpb.CommandPri_Low
kvResp, err := t.Store.SendReq(bo, tikvReq, curRegion.Region, time.Hour)
if err != nil {
logutil.BgLogger().Error("get MVCC by startTS failed",
zap.Uint64("txnStartTS", startTS),
zap.Stringer("startKey", startKey),
zap.Reflect("region", curRegion.Region),
zap.Stringer("curRegion", curRegion),
zap.Reflect("kvResp", kvResp),
zap.Error(err))
return nil, errors.Trace(err)
}
data := kvResp.Resp.(*kvrpcpb.MvccGetByStartTsResponse)
if err := data.GetRegionError(); err != nil {
logutil.BgLogger().Warn("get MVCC by startTS failed",
zap.Uint64("txnStartTS", startTS),
zap.Stringer("startKey", startKey),
zap.Reflect("region", curRegion.Region),
zap.Stringer("curRegion", curRegion),
zap.Reflect("kvResp", kvResp),
zap.Stringer("error", err))
continue
}

if len(data.GetError()) > 0 {
logutil.BgLogger().Error("get MVCC by startTS failed",
zap.Uint64("txnStartTS", startTS),
zap.Stringer("startKey", startKey),
zap.Reflect("region", curRegion.Region),
zap.Stringer("curRegion", curRegion),
zap.Reflect("kvResp", kvResp),
zap.String("error", data.GetError()))
return nil, errors.New(data.GetError())
}

key := data.GetKey()
if len(key) > 0 {
resp := &kvrpcpb.MvccGetByKeyResponse{Info: data.Info, RegionError: data.RegionError, Error: data.Error}
return &mvccKV{Key: strings.ToUpper(hex.EncodeToString(key)), Value: resp, RegionID: curRegion.Region.GetID()}, nil
}

if len(endKey) > 0 && curRegion.Contains(endKey) {
return nil, nil
}
if len(curRegion.EndKey) == 0 {
return nil, nil
}
startKey = kv.Key(curRegion.EndKey)
}
}

func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values, idxCols []*model.ColumnInfo, handle kv.Handle) (*mvccKV, error) {
func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values, idxCols []*model.ColumnInfo, handle kv.Handle) (*helper.MvccKV, error) {
sc := new(stmtctx.StatementContext)
// HTTP request is not a database session, set timezone to UTC directly here.
// See https://github.com/pingcap/tidb/blob/master/docs/tidb_http_api.md for more details.
Expand All @@ -283,7 +218,7 @@ func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values,
if err != nil {
return nil, err
}
return &mvccKV{strings.ToUpper(hex.EncodeToString(encodedKey)), regionID, data}, err
return &helper.MvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), RegionID: regionID, Value: data}, err
}

// formValue2DatumRow converts URL query string to a Datum Row.
Expand Down Expand Up @@ -1646,7 +1581,7 @@ func (h mvccTxnHandler) handleMvccGetByKey(params map[string]string, values url.
if err != nil {
return nil, err
}
resp := &mvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), Value: data, RegionID: regionID}
resp := &helper.MvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), Value: data, RegionID: regionID}
if len(values.Get("decode")) == 0 {
return resp, nil
}
Expand Down Expand Up @@ -1713,7 +1648,7 @@ func (h *mvccTxnHandler) handleMvccGetByTxn(params map[string]string) (interface
}
startKey := tablecodec.EncodeTablePrefix(tableID)
endKey := tablecodec.EncodeRowKeyWithHandle(tableID, kv.IntHandle(math.MaxInt64))
return h.getMvccByStartTs(uint64(startTS), startKey, endKey)
return h.GetMvccByStartTs(uint64(startTS), startKey, endKey)
}

// serverInfo is used to report the servers info when do http request.
Expand Down
14 changes: 7 additions & 7 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ partition by range (a)

func decodeKeyMvcc(closer io.ReadCloser, c *C, valid bool) {
decoder := json.NewDecoder(closer)
var data mvccKV
var data helper.MvccKV
err := decoder.Decode(&data)
c.Assert(err, IsNil)
if valid {
Expand All @@ -580,7 +580,7 @@ func (ts *HTTPHandlerTestSuite) TestGetTableMVCC(c *C) {
resp, err := ts.fetchStatus("/mvcc/key/tidb/test/1")
c.Assert(err, IsNil)
decoder := json.NewDecoder(resp.Body)
var data mvccKV
var data helper.MvccKV
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(data.Value, NotNil)
Expand All @@ -601,7 +601,7 @@ func (ts *HTTPHandlerTestSuite) TestGetTableMVCC(c *C) {

resp, err = ts.fetchStatus(fmt.Sprintf("/mvcc/txn/%d/tidb/test", startTs))
c.Assert(err, IsNil)
var p2 mvccKV
var p2 helper.MvccKV
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&p2)
c.Assert(err, IsNil)
Expand All @@ -615,7 +615,7 @@ func (ts *HTTPHandlerTestSuite) TestGetTableMVCC(c *C) {
resp, err = ts.fetchStatus("/mvcc/hex/" + hexKey)
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
var data2 mvccKV
var data2 helper.MvccKV
err = decoder.Decode(&data2)
c.Assert(err, IsNil)
c.Assert(data2, DeepEquals, data)
Expand Down Expand Up @@ -669,7 +669,7 @@ func (ts *HTTPHandlerTestSuite) TestGetMVCCNotFound(c *C) {
resp, err := ts.fetchStatus("/mvcc/key/tidb/test/1234")
c.Assert(err, IsNil)
decoder := json.NewDecoder(resp.Body)
var data mvccKV
var data helper.MvccKV
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(data.Value.Info.Lock, IsNil)
Expand Down Expand Up @@ -974,14 +974,14 @@ func (ts *HTTPHandlerTestSuite) TestGetIndexMVCC(c *C) {
resp, err = ts.fetchStatus("/mvcc/index/tidb/test/idx1/1?a=1")
c.Assert(err, IsNil)
decoder := json.NewDecoder(resp.Body)
var data1 mvccKV
var data1 helper.MvccKV
err = decoder.Decode(&data1)
c.Assert(err, NotNil)

resp, err = ts.fetchStatus("/mvcc/index/tidb/test/idx2/1?a=1")
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
var data2 mvccKV
var data2 helper.MvccKV
err = decoder.Decode(&data2)
c.Assert(err, NotNil)

Expand Down
72 changes: 72 additions & 0 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,78 @@ func (h *Helper) GetMvccByEncodedKey(encodedKey kv.Key) (*kvrpcpb.MvccGetByKeyRe
return kvResp.Resp.(*kvrpcpb.MvccGetByKeyResponse), nil
}

// MvccKV wraps the key's mvcc info in tikv.
type MvccKV struct {
Key string `json:"key"`
RegionID uint64 `json:"region_id"`
Value *kvrpcpb.MvccGetByKeyResponse `json:"value"`
}

// GetMvccByStartTs gets Mvcc info by startTS from tikv.
func (h *Helper) GetMvccByStartTs(startTS uint64, startKey, endKey kv.Key) (*MvccKV, error) {
bo := tikv.NewBackofferWithVars(context.Background(), 5000, nil)
for {
curRegion, err := h.RegionCache.LocateKey(bo, startKey)
if err != nil {
logutil.BgLogger().Error("get MVCC by startTS failed", zap.Uint64("txnStartTS", startTS),
zap.Stringer("startKey", startKey), zap.Error(err))
return nil, errors.Trace(err)
}

tikvReq := tikvrpc.NewRequest(tikvrpc.CmdMvccGetByStartTs, &kvrpcpb.MvccGetByStartTsRequest{
StartTs: startTS,
})
tikvReq.Context.Priority = kvrpcpb.CommandPri_Low
kvResp, err := h.Store.SendReq(bo, tikvReq, curRegion.Region, time.Hour)
if err != nil {
logutil.BgLogger().Error("get MVCC by startTS failed",
zap.Uint64("txnStartTS", startTS),
zap.Stringer("startKey", startKey),
zap.Reflect("region", curRegion.Region),
zap.Stringer("curRegion", curRegion),
zap.Reflect("kvResp", kvResp),
zap.Error(err))
return nil, errors.Trace(err)
}
data := kvResp.Resp.(*kvrpcpb.MvccGetByStartTsResponse)
if err := data.GetRegionError(); err != nil {
logutil.BgLogger().Warn("get MVCC by startTS failed",
zap.Uint64("txnStartTS", startTS),
zap.Stringer("startKey", startKey),
zap.Reflect("region", curRegion.Region),
zap.Stringer("curRegion", curRegion),
zap.Reflect("kvResp", kvResp),
zap.Stringer("error", err))
continue
}

if len(data.GetError()) > 0 {
logutil.BgLogger().Error("get MVCC by startTS failed",
zap.Uint64("txnStartTS", startTS),
zap.Stringer("startKey", startKey),
zap.Reflect("region", curRegion.Region),
zap.Stringer("curRegion", curRegion),
zap.Reflect("kvResp", kvResp),
zap.String("error", data.GetError()))
return nil, errors.New(data.GetError())
}

key := data.GetKey()
if len(key) > 0 {
resp := &kvrpcpb.MvccGetByKeyResponse{Info: data.Info, RegionError: data.RegionError, Error: data.Error}
return &MvccKV{Key: strings.ToUpper(hex.EncodeToString(key)), Value: resp, RegionID: curRegion.Region.GetID()}, nil
}

if len(endKey) > 0 && curRegion.Contains(endKey) {
return nil, nil
}
if len(curRegion.EndKey) == 0 {
return nil, nil
}
startKey = kv.Key(curRegion.EndKey)
}
}

// StoreHotRegionInfos records all hog region stores.
// it's the response of PD.
type StoreHotRegionInfos struct {
Expand Down

0 comments on commit 0199b40

Please sign in to comment.