Skip to content

Commit

Permalink
Merge pull request pingcap#604 from pingcap/disksing/tso
Browse files Browse the repository at this point in the history
hbase: add remote tso support
  • Loading branch information
siddontang committed Nov 19, 2015
2 parents 5821277 + c461574 commit 7a6a71b
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 23 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ godep:
go get github.com/tools/godep
go get github.com/pingcap/go-hbase
go get github.com/pingcap/go-themis
go get github.com/ngaut/tso/client

build:
$(GO) build
Expand Down
38 changes: 38 additions & 0 deletions store/hbase/hbase_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package hbasekv

import . "github.com/pingcap/check"

var _ = Suite(&testHBaseSuite{})

type testHBaseSuite struct {
}

func (t *testHBaseSuite) TestParseDSN(c *C) {
zks, oracle, table, err := parseDSN("zk1.com,zk2,192.168.0.1|localhost:1234/tidb")
c.Assert(zks, DeepEquals, []string{"zk1.com", "zk2", "192.168.0.1"})
c.Assert(oracle, Equals, "localhost:1234")
c.Assert(table, Equals, "tidb")
c.Assert(err, IsNil)

zks, oracle, table, err = parseDSN("zk1,zk2/tidb")
c.Assert(zks, DeepEquals, []string{"zk1", "zk2"})
c.Assert(oracle, Equals, "")
c.Assert(table, Equals, "tidb")
c.Assert(err, IsNil)

_, _, _, err = parseDSN("zk1,zk2|localhost:1234")
c.Assert(err, NotNil)
}
81 changes: 58 additions & 23 deletions store/hbase/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package hbasekv

import (
"fmt"
"math/rand"
"strings"
"sync"
Expand All @@ -23,6 +24,8 @@ import (
"github.com/ngaut/log"
"github.com/pingcap/go-hbase"
"github.com/pingcap/go-themis"
"github.com/pingcap/go-themis/oracle"
"github.com/pingcap/go-themis/oracle/oracles"
"github.com/pingcap/tidb/kv"
)

Expand All @@ -47,8 +50,8 @@ var (
)

var (
// ErrZkInvalid is returned when zookeeper info is invalid.
ErrZkInvalid = errors.New("zk info invalid")
// ErrInvalidDSN is returned when store dsn is invalid.
ErrInvalidDSN = errors.New("invalid dsn")
)

type storeCache struct {
Expand All @@ -65,8 +68,9 @@ func init() {

type hbaseStore struct {
mu sync.Mutex
zkInfo string
dsn string
storeName string
oracle oracle.Oracle
conns []hbase.HBaseClient
}

Expand All @@ -79,22 +83,28 @@ func (s *hbaseStore) Begin() (kv.Transaction, error) {
s.mu.Lock()
defer s.mu.Unlock()
hbaseCli := s.getHBaseClient()
t := themis.NewTxn(hbaseCli)
t, err := themis.NewTxn(hbaseCli, s.oracle)
if err != nil {
return nil, errors.Trace(err)
}
txn := newHbaseTxn(t, s.storeName)
return txn, nil
}

func (s *hbaseStore) GetSnapshot(ver kv.Version) (kv.MvccSnapshot, error) {
hbaseCli := s.getHBaseClient()
t := themis.NewTxn(hbaseCli)
t, err := themis.NewTxn(hbaseCli, s.oracle)
if err != nil {
return nil, errors.Trace(err)
}
return newHbaseSnapshot(t, s.storeName), nil
}

func (s *hbaseStore) Close() error {
mc.mu.Lock()
defer mc.mu.Unlock()

delete(mc.cache, s.zkInfo)
delete(mc.cache, s.dsn)

var err error
for _, conn := range s.conns {
Expand All @@ -108,12 +118,15 @@ func (s *hbaseStore) Close() error {
}

func (s *hbaseStore) UUID() string {
return "hbase." + s.storeName + "." + s.zkInfo
return fmt.Sprintf("hbase.%s.%s", s.storeName, s.dsn)
}

func (s *hbaseStore) CurrentVersion() (kv.Version, error) {
hbaseCli := s.getHBaseClient()
t := themis.NewTxn(hbaseCli)
t, err := themis.NewTxn(hbaseCli, s.oracle)
if err != nil {
return kv.Version{Ver: 0}, errors.Trace(err)
}
defer t.Release()

return kv.Version{Ver: t.GetStartTS()}, nil
Expand All @@ -123,26 +136,22 @@ func (s *hbaseStore) CurrentVersion() (kv.Version, error) {
type Driver struct {
}

// Open opens or creates a storage database with given path.
func (d Driver) Open(zkInfo string) (kv.Storage, error) {
// Open opens or creates an HBase storage with given dsn, format should be 'zk1,zk2,zk3|tsoaddr:port/tblName'.
// If tsoAddr is not provided, it will use a local oracle instead.
func (d Driver) Open(dsn string) (kv.Storage, error) {
mc.mu.Lock()
defer mc.mu.Unlock()

if len(zkInfo) == 0 {
return nil, errors.Trace(ErrZkInvalid)
}
pos := strings.LastIndex(zkInfo, "/")
if pos == -1 {
return nil, errors.Trace(ErrZkInvalid)
}
tableName := zkInfo[pos+1:]
zks := strings.Split(zkInfo[:pos], ",")

if store, ok := mc.cache[zkInfo]; ok {
if store, ok := mc.cache[dsn]; ok {
// TODO: check the cache store has the same engine with this Driver.
return store, nil
}

zks, oracleAddr, tableName, err := parseDSN(dsn)
if err != nil {
return nil, errors.Trace(err)
}

// create buffered HBase connections, HBaseClient is goroutine-safe, so
// it's OK to redistribute to transactions.
conns := make([]hbase.HBaseClient, 0, hbaseConnPoolSize)
Expand All @@ -167,11 +176,37 @@ func (d Driver) Open(zkInfo string) (kv.Storage, error) {
}
}

var ora oracle.Oracle
if len(oracleAddr) == 0 {
ora = oracles.NewLocalOracle()
} else {
ora = oracles.NewRemoteOracle(oracleAddr)
}

s := &hbaseStore{
zkInfo: zkInfo,
dsn: dsn,
storeName: tableName,
oracle: ora,
conns: conns,
}
mc.cache[zkInfo] = s
mc.cache[dsn] = s
return s, nil
}

func parseDSN(dsn string) (zks []string, oracleAddr, tableName string, err error) {
pos := strings.LastIndex(dsn, "/")
if pos == -1 {
err = errors.Trace(ErrInvalidDSN)
return
}
tableName = dsn[pos+1:]
addrs := dsn[:pos]

pos = strings.LastIndex(addrs, "|")
if pos != -1 {
oracleAddr = addrs[pos+1:]
addrs = addrs[:pos]
}
zks = strings.Split(addrs, ",")
return
}

0 comments on commit 7a6a71b

Please sign in to comment.