Skip to content

Commit

Permalink
hbase: update for new tso
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing committed Dec 28, 2015
1 parent b601e3b commit 58ffe22
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 42 deletions.
2 changes: 1 addition & 1 deletion driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func parseDriverDSN(dsn string) (storePath, dbName string, err error) {
// Examples:
// goleveldb://relative/path/test
// boltdb:///absolute/path/test
// hbase://zk1,zk2,zk3/hbasetbl/test?tso=127.0.0.1:1234
// hbase://zk1,zk2,zk3/hbasetbl/test?tso=zk
//
// Open may return a cached connection (one previously closed), but doing so is
// unnecessary; the sql package maintains a pool of idle connections for
Expand Down
22 changes: 11 additions & 11 deletions store/hbase/hbase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ type testHBaseSuite struct {

func (t *testHBaseSuite) TestParsePath(c *C) {
tbl := []struct {
dsn string
ok bool
zks []string
oracle string
table string
dsn string
ok bool
zks string
tso string
table string
}{
{"hbase://z,k,zk/tbl", true, []string{"z", "k", "zk"}, "", "tbl"},
{"hbase://z:80,k:80/tbl?tso=127.0.0.1:1234", true, []string{"z:80", "k:80"}, "127.0.0.1:1234", "tbl"},
{"goleveldb://zk/tbl", false, nil, "", ""},
{"hbase://zk/path/tbl", false, nil, "", ""},
{"hbase:///zk/tbl", false, nil, "", ""},
{"hbase://z,k,zk/tbl", true, "z,k,zk", tsoTypeLocal, "tbl"},
{"hbase://z:80,k:80/tbl?tso=zk", true, "z:80,k:80", tsoTypeZK, "tbl"},
{"goleveldb://zk/tbl", false, "", "", ""},
{"hbase://zk/path/tbl", false, "", "", ""},
{"hbase:///zk/tbl", false, "", "", ""},
}

for _, t := range tbl {
zks, oracle, table, err := parsePath(t.dsn)
if t.ok {
c.Assert(err, IsNil, Commentf("dsn=%v", t.dsn))
c.Assert(zks, DeepEquals, t.zks, Commentf("dsn=%v", t.dsn))
c.Assert(oracle, Equals, t.oracle, Commentf("dsn=%v", t.dsn))
c.Assert(oracle, Equals, t.tso, Commentf("dsn=%v", t.dsn))
c.Assert(table, Equals, t.table, Commentf("dsn=%v", t.dsn))
} else {
c.Assert(err, NotNil, Commentf("dsn=%v", t.dsn))
Expand Down
69 changes: 39 additions & 30 deletions store/hbase/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,11 @@ func init() {
}

type hbaseStore struct {
mu sync.Mutex
uuid string
storeName string
oracleAddr string
oracle oracle.Oracle
conns []hbase.HBaseClient
mu sync.Mutex
uuid string
storeName string
oracle oracle.Oracle
conns []hbase.HBaseClient
}

func (s *hbaseStore) getHBaseClient() hbase.HBaseClient {
Expand Down Expand Up @@ -139,26 +138,34 @@ func (s *hbaseStore) CurrentVersion() (kv.Version, error) {
type Driver struct {
}

const (
tsoTypeLocal = "local"
tsoTypeZK = "zk"

tsoZKPath = "/zk/tso"
)

// Open opens or creates an HBase storage with given path.
//
// The format of path should be 'hbase://zk1,zk2,zk3/table[?tso=host:port]'.
// The format of path should be 'hbase://zk1,zk2,zk3/table[?tso=local|zk]'.
// If tso is not provided, it will use a local oracle instead. (for test only)
func (d Driver) Open(path string) (kv.Storage, error) {
mc.mu.Lock()
defer mc.mu.Unlock()

zks, oracleAddr, tableName, err := parsePath(path)
zks, tso, tableName, err := parsePath(path)
if err != nil {
return nil, errors.Trace(err)
}
if tso != tsoTypeLocal && tso != tsoTypeZK {
return nil, errors.Trace(ErrInvalidDSN)
}

uuid := fmt.Sprintf("hbase-%v-%v", zks, tableName)
if tso == tsoTypeLocal {
log.Warnf("hbase: store(%s) is using local oracle(for test only)", uuid)
}
if store, ok := mc.cache[uuid]; ok {
if oracleAddr != store.oracleAddr {
err = errors.Errorf("hbase: store(%s) is opened with a different tso, old: %v, new: %v", uuid, store.oracleAddr, oracleAddr)
log.Warn(errors.ErrorStack(err))
return nil, err
}
return store, nil
}

Expand All @@ -167,7 +174,7 @@ func (d Driver) Open(path string) (kv.Storage, error) {
conns := make([]hbase.HBaseClient, 0, hbaseConnPoolSize)
for i := 0; i < hbaseConnPoolSize; i++ {
var c hbase.HBaseClient
c, err = hbase.NewClient(zks, "/hbase")
c, err = hbase.NewClient(strings.Split(zks, ","), "/hbase")
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -193,37 +200,39 @@ func (d Driver) Open(path string) (kv.Storage, error) {
}

var ora oracle.Oracle
if len(oracleAddr) == 0 {
log.Warnf("hbase: store(%s) is using local oracle(for test only)", uuid)
switch tso {
case tsoTypeLocal:
ora = oracles.NewLocalOracle()
} else {
ora = oracles.NewRemoteOracle(oracleAddr)
case tsoTypeZK:
ora = oracles.NewRemoteOracle(zks, tsoZKPath)
}

s := &hbaseStore{
uuid: uuid,
storeName: tableName,
oracleAddr: oracleAddr,
oracle: ora,
conns: conns,
uuid: uuid,
storeName: tableName,
oracle: ora,
conns: conns,
}
mc.cache[uuid] = s
return s, nil
}

func parsePath(path string) (zks []string, oracleAddr, tableName string, err error) {
func parsePath(path string) (zks, tso, tableName string, err error) {
u, err := url.Parse(path)
if err != nil {
return nil, "", "", errors.Trace(err)
return "", "", "", errors.Trace(err)
}
if strings.ToLower(u.Scheme) != "hbase" {
return nil, "", "", errors.Trace(ErrInvalidDSN)
return "", "", "", errors.Trace(ErrInvalidDSN)
}
p, tableName := filepath.Split(u.Path)
if p != "/" {
return nil, "", "", errors.Trace(ErrInvalidDSN)
return "", "", "", errors.Trace(ErrInvalidDSN)
}
zks = u.Host
tso = u.Query().Get("tso")
if tso == "" {
tso = tsoTypeLocal
}
zks = strings.Split(u.Host, ",")
oracleAddr = u.Query().Get("tso")
return zks, oracleAddr, tableName, nil
return zks, tso, tableName, nil
}

0 comments on commit 58ffe22

Please sign in to comment.