Skip to content

Commit

Permalink
hbase: add remote tso support
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Nov 18, 2015
1 parent 5491086 commit 549dc3b
Showing 1 changed file with 49 additions and 19 deletions.
68 changes: 49 additions & 19 deletions store/hbase/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/ngaut/log"
"github.com/pingcap/go-hbase"
"github.com/pingcap/go-themis"
"github.com/pingcap/go-themis/oracle"
"github.com/pingcap/go-themis/oracle/oracles"
"github.com/pingcap/tidb/kv"
)

Expand All @@ -47,8 +49,8 @@ var (
)

var (
// ErrZkInvalid is returned when zookeeper info is invalid.
ErrZkInvalid = errors.New("zk info invalid")
// ErrDsnInvalid is returned when store dsn is invalid.
ErrDsnInvalid = errors.New("dsn invalid")
)

type storeCache struct {
Expand All @@ -65,8 +67,9 @@ func init() {

type hbaseStore struct {
mu sync.Mutex
zkInfo string
dsn string
storeName string
oracle oracle.Oracle
conns []hbase.HBaseClient
}

Expand All @@ -79,22 +82,28 @@ func (s *hbaseStore) Begin() (kv.Transaction, error) {
s.mu.Lock()
defer s.mu.Unlock()
hbaseCli := s.getHBaseClient()
t := themis.NewTxn(hbaseCli)
t, err := themis.NewTxn(hbaseCli, s.oracle)
if err != nil {
return nil, errors.Trace(err)
}
txn := newHbaseTxn(t, s.storeName)
return txn, nil
}

func (s *hbaseStore) GetSnapshot(ver kv.Version) (kv.MvccSnapshot, error) {
hbaseCli := s.getHBaseClient()
t := themis.NewTxn(hbaseCli)
t, err := themis.NewTxn(hbaseCli, s.oracle)
if err != nil {
return nil, errors.Trace(err)
}
return newHbaseSnapshot(t, s.storeName), nil
}

func (s *hbaseStore) Close() error {
mc.mu.Lock()
defer mc.mu.Unlock()

delete(mc.cache, s.zkInfo)
delete(mc.cache, s.dsn)

var err error
for _, conn := range s.conns {
Expand All @@ -108,12 +117,15 @@ func (s *hbaseStore) Close() error {
}

func (s *hbaseStore) UUID() string {
return "hbase." + s.storeName + "." + s.zkInfo
return "hbase." + s.storeName + "." + s.dsn
}

func (s *hbaseStore) CurrentVersion() (kv.Version, error) {
hbaseCli := s.getHBaseClient()
t := themis.NewTxn(hbaseCli)
t, err := themis.NewTxn(hbaseCli, s.oracle)
if err != nil {
return kv.Version{Ver: 0}, errors.Trace(err)
}
defer t.Release()

return kv.Version{Ver: t.GetStartTS()}, nil
Expand All @@ -123,22 +135,32 @@ func (s *hbaseStore) CurrentVersion() (kv.Version, error) {
type Driver struct {
}

// Open opens or creates a storage database with given path.
func (d Driver) Open(zkInfo string) (kv.Storage, error) {
// Open opens or creates an HBase storage with given dsn, format should be 'zk1,zk2,zk3|tsoaddr:port/tblName'.
// If tsoAddr is not provided, it will use a local oracle instead.
func (d Driver) Open(dsn string) (kv.Storage, error) {
mc.mu.Lock()
defer mc.mu.Unlock()

if len(zkInfo) == 0 {
return nil, errors.Trace(ErrZkInvalid)
if len(dsn) == 0 {
return nil, errors.Trace(ErrDsnInvalid)
}
pos := strings.LastIndex(zkInfo, "/")
pos := strings.LastIndex(dsn, "/")
if pos == -1 {
return nil, errors.Trace(ErrZkInvalid)
return nil, errors.Trace(ErrDsnInvalid)
}
tableName := dsn[pos+1:]
addrs := dsn[:pos]

var tsoAddr string
pos = strings.LastIndex(addrs, "|")
if pos != -1 {
tsoAddr = addrs[pos+1:]
addrs = addrs[:pos]
}
tableName := zkInfo[pos+1:]
zks := strings.Split(zkInfo[:pos], ",")

if store, ok := mc.cache[zkInfo]; ok {
zks := strings.Split(addrs, ",")

if store, ok := mc.cache[dsn]; ok {
// TODO: check the cache store has the same engine with this Driver.
return store, nil
}
Expand Down Expand Up @@ -167,11 +189,19 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) {
}
}

var ora oracle.Oracle
if tsoAddr == "" {
ora = &oracles.LocalOracle{}
} else {
ora = oracles.NewTsoOracle(tsoAddr)
}

s := &hbaseStore{
zkInfo: zkInfo,
dsn: dsn,
storeName: tableName,
oracle: ora,
conns: conns,
}
mc.cache[zkInfo] = s
mc.cache[dsn] = s
return s, nil
}

0 comments on commit 549dc3b

Please sign in to comment.