Skip to content

Commit

Permalink
store/tikv: remove cluster ID
Browse files Browse the repository at this point in the history
  • Loading branch information
huachaohuang committed Nov 4, 2016
1 parent 4019a9e commit e1cb126
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 40 deletions.
7 changes: 2 additions & 5 deletions sessionctx/binloginfo/binloginfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ import (
// shared by all sessions.
var PumpClient binlog.PumpClient

// ClusterID is set by command line argument, if not set, use default value 1.
var ClusterID uint64 = 1

// keyType is a dummy type to avoid naming collision in context.
type keyType int

Expand Down Expand Up @@ -71,9 +68,9 @@ func GetPrewriteValue(ctx context.Context, createIfNotExists bool) *binlog.Prewr
}

// WriteBinlog writes a binlog to Pump.
func WriteBinlog(bin *binlog.Binlog) error {
func WriteBinlog(bin *binlog.Binlog, clusterID uint64) error {
commitData, _ := bin.Marshal()
req := &binlog.WriteBinlogReq{ClusterID: ClusterID, Payload: commitData}
req := &binlog.WriteBinlogReq{ClusterID: clusterID, Payload: commitData}
resp, err := PumpClient.WriteBinlog(goctx.Background(), req)
if err == nil && resp.Errmsg != "" {
err = errors.New(resp.Errmsg)
Expand Down
30 changes: 14 additions & 16 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"fmt"
"math/rand"
"net/url"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -50,21 +49,24 @@ func (d Driver) Open(path string) (kv.Storage, error) {
mc.Lock()
defer mc.Unlock()

etcdAddrs, clusterID, disableGC, err := parsePath(path)
etcdAddrs, disableGC, err := parsePath(path)
if err != nil {
return nil, errors.Trace(err)
}

pdCli, err := pd.NewClient(etcdAddrs)
if err != nil {
return nil, errors.Trace(err)
}
clusterID := pdCli.GetClusterID()

// FIXME: uuid will be a very long and ugly string, simplify it.
uuid := fmt.Sprintf("tikv-%v-%v", etcdAddrs, clusterID)
if store, ok := mc.cache[uuid]; ok {
return store, nil
}

pdCli, err := pd.NewClient(etcdAddrs, clusterID)
if err != nil {
return nil, errors.Trace(err)
}
s, err := newTikvStore(uuid, &codecPDClient{pdCli}, newRPCClient(), !disableGC)
s, err := newTikvStore(clusterID, uuid, &codecPDClient{pdCli}, newRPCClient(), !disableGC)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -76,6 +78,7 @@ func (d Driver) Open(path string) (kv.Storage, error) {
var oracleUpdateInterval = 2000

type tikvStore struct {
clusterID uint64
uuid string
oracle oracle.Oracle
client Client
Expand All @@ -84,13 +87,14 @@ type tikvStore struct {
gcWorker *GCWorker
}

func newTikvStore(uuid string, pdClient pd.Client, client Client, enableGC bool) (*tikvStore, error) {
func newTikvStore(clusterID uint64, uuid string, pdClient pd.Client, client Client, enableGC bool) (*tikvStore, error) {
oracle, err := oracles.NewPdOracle(pdClient, time.Duration(oracleUpdateInterval)*time.Millisecond)
if err != nil {
return nil, errors.Trace(err)
}

store := &tikvStore{
clusterID: clusterID,
uuid: uuid,
oracle: oracle,
client: client,
Expand All @@ -113,7 +117,7 @@ func NewMockTikvStore() (kv.Storage, error) {
mvccStore := mocktikv.NewMvccStore()
client := mocktikv.NewRPCClient(cluster, mvccStore)
uuid := fmt.Sprintf("mock-tikv-store-:%v", time.Now().Unix())
return newTikvStore(uuid, mocktikv.NewPDClient(cluster), client, false)
return newTikvStore(1, uuid, mocktikv.NewPDClient(cluster), client, false)
}

func (s *tikvStore) Begin() (kv.Transaction, error) {
Expand Down Expand Up @@ -248,7 +252,7 @@ func (s *tikvStore) SendKVReq(bo *Backoffer, req *pb.Request, regionID RegionVer
}
}

func parsePath(path string) (etcdAddrs []string, clusterID uint64, disableGC bool, err error) {
func parsePath(path string) (etcdAddrs []string, disableGC bool, err error) {
var u *url.URL
u, err = url.Parse(path)
if err != nil {
Expand All @@ -260,12 +264,6 @@ func parsePath(path string) (etcdAddrs []string, clusterID uint64, disableGC boo
log.Error(err)
return
}
clusterID, err = strconv.ParseUint(u.Query().Get("cluster"), 10, 64)
if err != nil {
log.Errorf("Parse clusterID error [%s]", err)
err = errors.Trace(err)
return
}
switch strings.ToLower(u.Query().Get("disableGC")) {
case "true":
disableGC = true
Expand Down
9 changes: 5 additions & 4 deletions store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ func newLockResolver(store *tikvStore) *LockResolver {
}

// NewLockResolver creates a LockResolver.
func NewLockResolver(etcdAddrs []string, clusterID uint64) (*LockResolver, error) {
uuid := fmt.Sprintf("tikv-%v-%v", etcdAddrs, clusterID)
pdCli, err := pd.NewClient(etcdAddrs, clusterID)
func NewLockResolver(etcdAddrs []string) (*LockResolver, error) {
pdCli, err := pd.NewClient(etcdAddrs)
if err != nil {
return nil, errors.Trace(err)
}
s, err := newTikvStore(uuid, &codecPDClient{pdCli}, newRPCClient(), false)
clusterID := pdCli.GetClusterID()
uuid := fmt.Sprintf("tikv-%v-%v", etcdAddrs, clusterID)
s, err := newTikvStore(clusterID, uuid, &codecPDClient{pdCli}, newRPCClient(), false)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
4 changes: 4 additions & 0 deletions store/tikv/mock-tikv/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func NewPDClient(cluster *Cluster) pd.Client {
}
}

func (c *pdClient) GetClusterID() uint64 {
return 1
}

func (c *pdClient) GetTS() (int64, int64, error) {
tsMu.Lock()
defer tsMu.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *testSplitSuite) SetUpTest(c *C) {
mocktikv.BootstrapWithSingleStore(s.cluster)
mvccStore := mocktikv.NewMvccStore()
client := mocktikv.NewRPCClient(s.cluster, mvccStore)
store, err := newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), client, false)
store, err := newTikvStore(1, "mock-tikv-store", mocktikv.NewPDClient(s.cluster), client, false)
c.Assert(err, IsNil)
s.store = store
s.bo = NewBackoffer(5000)
Expand Down
15 changes: 8 additions & 7 deletions store/tikv/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,23 +41,20 @@ func (s *testStoreSuite) SetUpTest(c *C) {
mocktikv.BootstrapWithSingleStore(s.cluster)
mvccStore := mocktikv.NewMvccStore()
clientFactory := mocktikv.NewRPCClient(s.cluster, mvccStore)
store, err := newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), clientFactory, false)
store, err := newTikvStore(1, "mock-tikv-store", mocktikv.NewPDClient(s.cluster), clientFactory, false)
c.Assert(err, IsNil)
s.store = store
}

func (s *testStoreSuite) TestParsePath(c *C) {
etcdAddrs, clusterID, disableGC, err := parsePath("tikv://node1:2379,node2:2379?cluster=1")
etcdAddrs, disableGC, err := parsePath("tikv://node1:2379,node2:2379")
c.Assert(err, IsNil)
c.Assert(etcdAddrs, DeepEquals, []string{"node1:2379", "node2:2379"})
c.Assert(clusterID, Equals, uint64(1))
c.Assert(disableGC, IsFalse)

_, _, _, err = parsePath("tikv://node1:2379")
_, _, err = parsePath("tikv://node1:2379")
c.Assert(err, NotNil)
_, _, _, err = parsePath("tidb://node1:2379?cluster=1")
c.Assert(err, NotNil)
_, _, disableGC, err = parsePath("tikv://node1:2379?cluster=1&disableGC=true")
_, disableGC, err = parsePath("tikv://node1:2379?disableGC=true")
c.Assert(err, IsNil)
c.Assert(disableGC, IsTrue)
}
Expand Down Expand Up @@ -291,6 +288,10 @@ func (c *mockPDClient) disable() {
c.stop = true
}

func (c *mockPDClient) GetClusterID() uint64 {
return 1
}

func (c *mockPDClient) GetTS() (int64, int64, error) {
c.RLock()
defer c.RUnlock()
Expand Down
7 changes: 3 additions & 4 deletions store/tikv/ticlient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import (
)

var (
withTiKV = flag.Bool("with-tikv", false, "run tests with TiKV cluster started. (not use the mock server)")
pdAddrs = flag.String("pd-addrs", "127.0.0.1:2379", "pd addrs")
clusterID = flag.Int("cluster", 1, "cluster ID")
withTiKV = flag.Bool("with-tikv", false, "run tests with TiKV cluster started. (not use the mock server)")
pdAddrs = flag.String("pd-addrs", "127.0.0.1:2379", "pd addrs")
)

func newTestStore(c *C) *tikvStore {
Expand All @@ -35,7 +34,7 @@ func newTestStore(c *C) *tikvStore {

if *withTiKV {
var d Driver
store, err := d.Open(fmt.Sprintf("tikv://%s?cluster=%d", *pdAddrs, *clusterID))
store, err := d.Open(fmt.Sprintf("tikv://%s", *pdAddrs))
c.Assert(err, IsNil)
return store.(*tikvStore)
}
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/txn_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (c *txnCommitter) prewriteBinlog() chan error {
if bin.Tp == binlog.BinlogType_Prewrite {
bin.PrewriteKey = c.keys[0]
}
err := binloginfo.WriteBinlog(bin)
err := binloginfo.WriteBinlog(bin, c.store.clusterID)
ch <- errors.Trace(err)
}()
return ch
Expand All @@ -409,7 +409,7 @@ func (c *txnCommitter) writeFinishBinlog(tp binlog.BinlogType, commitTS int64) {
bin.Tp = tp
bin.CommitTs = commitTS
go func() {
err := binloginfo.WriteBinlog(bin)
err := binloginfo.WriteBinlog(bin, c.store.clusterID)
if err != nil {
log.Errorf("failed to write binlog: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/txn_committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) {
mocktikv.BootstrapWithMultiRegions(s.cluster, []byte("a"), []byte("b"), []byte("c"))
mvccStore := mocktikv.NewMvccStore()
client := mocktikv.NewRPCClient(s.cluster, mvccStore)
store, err := newTikvStore("mock-tikv-store", mocktikv.NewPDClient(s.cluster), client, false)
store, err := newTikvStore(1, "mock-tikv-store", mocktikv.NewPDClient(s.cluster), client, false)
c.Assert(err, IsNil)
s.store = store
}
Expand Down

0 comments on commit e1cb126

Please sign in to comment.