Skip to content

Commit

Permalink
store/tikv: add raw kv api. (pingcap#2101)
Browse files Browse the repository at this point in the history
* store/tikv: add raw kv api.
  • Loading branch information
disksing authored and ngaut committed Nov 28, 2016
1 parent 2c34a75 commit a2da8fe
Show file tree
Hide file tree
Showing 11 changed files with 2,583 additions and 938 deletions.
3,055 changes: 2,197 additions & 858 deletions _vendor/src/github.com/pingcap/kvproto/pkg/kvrpcpb/kvrpcpb.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ import:
- go-binlog
- go-tipb
- package: github.com/pingcap/kvproto
version: f5305cef0defaec3307623b7f7394d9fb3fd99b5
version: cdfae5d0daef51ca9ad28d7983c58323715bd9be
- package: github.com/gogo/protobuf
version: ^0.3.0
subpackages:
Expand Down
1 change: 1 addition & 0 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ const (
cleanupMaxBackoff = 10000
gcMaxBackoff = 100000
gcResolveLockMaxBackoff = 100000
rawkvMaxBackoff = 5000
)

// Backoffer is a utility for retrying queries.
Expand Down
74 changes: 1 addition & 73 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/kvproto/pkg/errorpb"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/pd/pd-client"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -183,79 +182,8 @@ func (s *tikvStore) GetClient() kv.Client {
}
}

// sendKVReq sends req to tikv server. It will retry internally to find the right
// region leader if i) fails to establish a connection to server or ii) server
// returns `NotLeader`.
func (s *tikvStore) SendKVReq(bo *Backoffer, req *pb.Request, regionID RegionVerID, timeout time.Duration) (*pb.Response, error) {
for {
select {
case <-bo.ctx.Done():
return nil, errors.Trace(bo.ctx.Err())
default:
}

region := s.regionCache.GetRegionByVerID(regionID)
if region == nil {
// If the region is not found in cache, it must be out
// of date and already be cleaned up. We can skip the
// RPC by returning RegionError directly.
return &pb.Response{
Type: req.GetType(),
RegionError: &errorpb.Error{StaleEpoch: &errorpb.StaleEpoch{}},
}, nil
}
req.Context = region.GetContext()
resp, err := s.client.SendKVReq(region.GetAddress(), req, timeout)
if err != nil {
s.regionCache.NextPeer(region.VerID())
err = bo.Backoff(boTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %s, try next peer later", err, req.Context))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
if regionErr := resp.GetRegionError(); regionErr != nil {
reportRegionError(regionErr)
// Retry if error is `NotLeader`.
if notLeader := regionErr.GetNotLeader(); notLeader != nil {
log.Warnf("tikv reports `NotLeader`: %s, ctx: %s, retry later", notLeader, req.Context)
s.regionCache.UpdateLeader(region.VerID(), notLeader.GetLeader().GetId())
if notLeader.GetLeader() == nil {
err = bo.Backoff(boRegionMiss, errors.Errorf("not leader: %v, ctx: %s", notLeader, req.Context))
if err != nil {
return nil, errors.Trace(err)
}
}
continue
}
if staleEpoch := regionErr.GetStaleEpoch(); staleEpoch != nil {
log.Warnf("tikv reports `StaleEpoch`, ctx: %s, retry later", req.Context)
err = s.regionCache.OnRegionStale(region, staleEpoch.NewRegions)
if err != nil {
return nil, errors.Trace(err)
}
continue
}
// Retry if the error is `ServerIsBusy`.
if regionErr.GetServerIsBusy() != nil {
log.Warnf("tikv reports `ServerIsBusy`, ctx: %s, retry later", req.Context)
err = bo.Backoff(boServerBusy, errors.Errorf("server is busy"))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
// For other errors, we only drop cache here.
// Because caller may need to re-split the request.
log.Warnf("tikv reports region error: %s, ctx: %s", resp.GetRegionError(), req.Context)
s.regionCache.DropRegion(region.VerID())
return resp, nil
}
if resp.GetType() != req.GetType() {
return nil, errors.Trace(errMismatch(resp, req))
}
return resp, nil
}
return sendKVReq(s.regionCache, s.client, bo, req, regionID, timeout)
}

func parsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
Expand Down
30 changes: 28 additions & 2 deletions store/tikv/mock-tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,15 @@ func (e *mvccEntry) Rollback(startTS uint64) error {
// MvccStore is an in-memory, multi-versioned, transaction-supported kv storage.
type MvccStore struct {
sync.RWMutex
tree *llrb.LLRB
tree *llrb.LLRB
rawkv map[string][]byte
}

// NewMvccStore creates a MvccStore.
func NewMvccStore() *MvccStore {
return &MvccStore{
tree: llrb.New(),
tree: llrb.New(),
rawkv: make(map[string][]byte),
}
}

Expand Down Expand Up @@ -435,3 +437,27 @@ func (s *MvccStore) ResolveLock(startKey, endKey []byte, startTS, commitTS uint6
s.submit(ents...)
return nil
}

// RawGet queries value with the key.
func (s *MvccStore) RawGet(key []byte) []byte {
s.RLock()
defer s.RUnlock()
return s.rawkv[string(key)]
}

// RawPut stores a key-value pair.
func (s *MvccStore) RawPut(key, value []byte) {
s.Lock()
defer s.Unlock()
if value == nil {
value = []byte{}
}
s.rawkv[string(key)] = value
}

// RawDelete deletes a key-value pair.
func (s *MvccStore) RawDelete(key []byte) {
s.Lock()
defer s.Unlock()
delete(s.rawkv, string(key))
}
26 changes: 25 additions & 1 deletion store/tikv/mock-tikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ type rpcHandler struct {
}

func newRPCHandler(cluster *Cluster, mvccStore *MvccStore, storeID uint64) *rpcHandler {
return &rpcHandler{
h := &rpcHandler{
cluster: cluster,
mvccStore: mvccStore,
storeID: storeID,
}
return h
}

func (h *rpcHandler) handleRequest(req *kvrpcpb.Request) *kvrpcpb.Response {
Expand Down Expand Up @@ -66,6 +67,13 @@ func (h *rpcHandler) handleRequest(req *kvrpcpb.Request) *kvrpcpb.Response {
resp.CmdResolveLockResp = h.onResolveLock(req.CmdResolveLockReq)
case kvrpcpb.MessageType_CmdResolveLock:
resp.CmdResolveLockResp = h.onResolveLock(req.CmdResolveLockReq)

case kvrpcpb.MessageType_CmdRawGet:
resp.CmdRawGetResp = h.onRawGet(req.CmdRawGetReq)
case kvrpcpb.MessageType_CmdRawPut:
resp.CmdRawPutResp = h.onRawPut(req.CmdRawPutReq)
case kvrpcpb.MessageType_CmdRawDelete:
resp.CmdRawDeleteResp = h.onRawDelete(req.CmdRawDeleteReq)
}
return resp
}
Expand Down Expand Up @@ -242,6 +250,22 @@ func (h *rpcHandler) onResolveLock(req *kvrpcpb.CmdResolveLockRequest) *kvrpcpb.
return &kvrpcpb.CmdResolveLockResponse{}
}

func (h *rpcHandler) onRawGet(req *kvrpcpb.CmdRawGetRequest) *kvrpcpb.CmdRawGetResponse {
return &kvrpcpb.CmdRawGetResponse{
Value: h.mvccStore.RawGet(req.GetKey()),
}
}

func (h *rpcHandler) onRawPut(req *kvrpcpb.CmdRawPutRequest) *kvrpcpb.CmdRawPutResponse {
h.mvccStore.RawPut(req.GetKey(), req.GetValue())
return &kvrpcpb.CmdRawPutResponse{}
}

func (h *rpcHandler) onRawDelete(req *kvrpcpb.CmdRawDeleteRequest) *kvrpcpb.CmdRawDeleteResponse {
h.mvccStore.RawDelete(req.GetKey())
return &kvrpcpb.CmdRawDeleteResponse{}
}

func convertToKeyError(err error) *kvrpcpb.KeyError {
if locked, ok := err.(*ErrLocked); ok {
return &kvrpcpb.KeyError{
Expand Down
138 changes: 138 additions & 0 deletions store/tikv/rawkv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tikv

import (
"context"

"github.com/juju/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/pd/pd-client"
)

// RawKVClient is a client of TiKV server which is used as a key-value storage,
// only GET/PUT/DELETE commands are supported.
type RawKVClient struct {
clusterID uint64
regionCache *RegionCache
rpcClient Client
}

// NewRawKVClient creates a client with PD cluster addrs.
func NewRawKVClient(pdAddrs []string) (*RawKVClient, error) {
pdCli, err := pd.NewClient(pdAddrs)
if err != nil {
return nil, errors.Trace(err)
}
return &RawKVClient{
clusterID: pdCli.GetClusterID(),
regionCache: NewRegionCache(pdCli),
rpcClient: newRPCClient(),
}, nil
}

// ClusterID returns the TiKV cluster ID.
func (c *RawKVClient) ClusterID() uint64 {
return c.clusterID
}

// Get queries value with the key. When the key does not exist, it returns
// `nil, nil`, while `[]byte{}, nil` means an empty value.
func (c *RawKVClient) Get(key []byte) ([]byte, error) {
req := &kvrpcpb.Request{
Type: kvrpcpb.MessageType_CmdRawGet,
CmdRawGetReq: &kvrpcpb.CmdRawGetRequest{
Key: key,
},
}
resp, err := c.sendKVReq(key, req)
if err != nil {
return nil, errors.Trace(err)
}
cmdResp := resp.GetCmdRawGetResp()
if cmdResp == nil {
return nil, errors.Trace(errBodyMissing)
}
if cmdResp.GetError() != "" {
return nil, errors.New(cmdResp.GetError())
}
return cmdResp.Value, nil
}

// Put stores a key-value pair to TiKV.
func (c *RawKVClient) Put(key, value []byte) error {
req := &kvrpcpb.Request{
Type: kvrpcpb.MessageType_CmdRawPut,
CmdRawPutReq: &kvrpcpb.CmdRawPutRequest{
Key: key,
Value: value,
},
}
resp, err := c.sendKVReq(key, req)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.GetCmdRawPutResp()
if cmdResp == nil {
return errors.Trace(errBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
return nil
}

// Delete deletes a key-value pair from TiKV.
func (c *RawKVClient) Delete(key []byte) error {
req := &kvrpcpb.Request{
Type: kvrpcpb.MessageType_CmdRawDelete,
CmdRawDeleteReq: &kvrpcpb.CmdRawDeleteRequest{
Key: key,
},
}
resp, err := c.sendKVReq(key, req)
if err != nil {
return errors.Trace(err)
}
cmdResp := resp.GetCmdRawDeleteResp()
if cmdResp == nil {
return errors.Trace(errBodyMissing)
}
if cmdResp.GetError() != "" {
return errors.New(cmdResp.GetError())
}
return nil
}

func (c *RawKVClient) sendKVReq(key []byte, req *kvrpcpb.Request) (*kvrpcpb.Response, error) {
bo := NewBackoffer(rawkvMaxBackoff, context.Background())
for {
region, err := c.regionCache.GetRegion(bo, key)
if err != nil {
return nil, errors.Trace(err)
}
resp, err := sendKVReq(c.regionCache, c.rpcClient, bo, req, region.VerID(), readTimeoutShort)
if err != nil {
return nil, errors.Trace(err)
}
if regionErr := resp.GetRegionError(); regionErr != nil {
err := bo.Backoff(boRegionMiss, errors.New(regionErr.String()))
if err != nil {
return nil, errors.Trace(err)
}
continue
}
return resp, nil
}
}
Loading

0 comments on commit a2da8fe

Please sign in to comment.