Skip to content

Commit

Permalink
Merge pull request influxdata#5403 from influxdata/meta-service2
Browse files Browse the repository at this point in the history
refactor meta into separate meta client & service
  • Loading branch information
dgnorton committed Jan 23, 2016
2 parents 865b267 + e5ac1b7 commit 58e0eed
Show file tree
Hide file tree
Showing 81 changed files with 6,278 additions and 7,337 deletions.
2 changes: 1 addition & 1 deletion cluster/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package cluster
import (
"math/rand"

"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/services/meta"
)

// Balancer represents a load-balancing algorithm for a set of nodes
Expand Down
2 changes: 1 addition & 1 deletion cluster/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"testing"

"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/services/meta"
)

func NewNodes() []meta.NodeInfo {
Expand Down
17 changes: 9 additions & 8 deletions cluster/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/services/meta"
"github.com/influxdb/influxdb/tsdb"
)

Expand Down Expand Up @@ -88,11 +88,12 @@ type PointsWriter struct {
WriteTimeout time.Duration
Logger *log.Logger

MetaStore interface {
NodeID() uint64
Node *influxdb.Node

MetaClient interface {
Database(name string) (di *meta.DatabaseInfo, err error)
RetentionPolicy(database, policy string) (*meta.RetentionPolicyInfo, error)
CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)
}

Expand Down Expand Up @@ -187,7 +188,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
// holds the start time ranges for required shard groups
timeRanges := map[time.Time]*meta.ShardGroupInfo{}

rp, err := w.MetaStore.RetentionPolicy(wp.Database, wp.RetentionPolicy)
rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)
if err != nil {
return nil, err
}
Expand All @@ -201,7 +202,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)

// holds all the shard groups and shards that are required for writes
for t := range timeRanges {
sg, err := w.MetaStore.CreateShardGroupIfNotExists(wp.Database, wp.RetentionPolicy, t)
sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -235,7 +236,7 @@ func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
w.statMap.Add(statPointWriteReq, int64(len(p.Points)))

if p.RetentionPolicy == "" {
db, err := w.MetaStore.Database(p.Database)
db, err := w.MetaClient.Database(p.Database)
if err != nil {
return err
} else if db == nil {
Expand Down Expand Up @@ -309,7 +310,7 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo

for _, owner := range shard.Owners {
go func(shardID uint64, owner meta.ShardOwner, points []models.Point) {
if w.MetaStore.NodeID() == owner.NodeID {
if w.Node.ID == owner.NodeID {
w.statMap.Add(statPointWriteReqLocal, int64(len(points)))

err := w.TSDBStore.WriteToShard(shardID, points)
Expand Down
32 changes: 17 additions & 15 deletions cluster/points_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import (
"testing"
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/services/meta"
)

// Ensures the points writer maps a single point to a single shard.
func TestPointsWriter_MapShards_One(t *testing.T) {
ms := MetaStore{}
ms := MetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)

ms.NodeIDFn = func() uint64 { return 1 }
Expand All @@ -26,7 +27,7 @@ func TestPointsWriter_MapShards_One(t *testing.T) {
return &rp.ShardGroups[0], nil
}

c := cluster.PointsWriter{MetaStore: ms}
c := cluster.PointsWriter{MetaClient: ms}
pr := &cluster.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
Expand All @@ -49,7 +50,7 @@ func TestPointsWriter_MapShards_One(t *testing.T) {

// Ensures the points writer maps a multiple points across shard group boundaries.
func TestPointsWriter_MapShards_Multiple(t *testing.T) {
ms := MetaStore{}
ms := MetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
Expand All @@ -76,7 +77,7 @@ func TestPointsWriter_MapShards_Multiple(t *testing.T) {
panic("should not get here")
}

c := cluster.PointsWriter{MetaStore: ms}
c := cluster.PointsWriter{MetaClient: ms}
pr := &cluster.WritePointsRequest{
Database: "mydb",
RetentionPolicy: "myrp",
Expand Down Expand Up @@ -303,7 +304,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
},
}

ms := NewMetaStore()
ms := NewMetaClient()
ms.DatabaseFn = func(database string) (*meta.DatabaseInfo, error) {
return nil, nil
}
Expand All @@ -316,11 +317,12 @@ func TestPointsWriter_WritePoints(t *testing.T) {
}

c := cluster.NewPointsWriter()
c.MetaStore = ms
c.MetaClient = ms
c.ShardWriter = sw
c.TSDBStore = store
c.HintedHandoff = hh
c.Subscriber = sub
c.Node = &influxdb.Node{ID: 1}

c.Open()
defer c.Close()
Expand Down Expand Up @@ -372,8 +374,8 @@ func (f *fakeStore) CreateShard(database, retentionPolicy string, shardID uint64
return f.CreateShardfn(database, retentionPolicy, shardID)
}

func NewMetaStore() *MetaStore {
ms := &MetaStore{}
func NewMetaClient() *MetaClient {
ms := &MetaClient{}
rp := NewRetentionPolicy("myp", time.Hour, 3)
AttachShardGroupInfo(rp, []meta.ShardOwner{
{NodeID: 1},
Expand Down Expand Up @@ -401,29 +403,29 @@ func NewMetaStore() *MetaStore {
return ms
}

type MetaStore struct {
type MetaClient struct {
NodeIDFn func() uint64
RetentionPolicyFn func(database, name string) (*meta.RetentionPolicyInfo, error)
CreateShardGroupIfNotExistsFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
DatabaseFn func(database string) (*meta.DatabaseInfo, error)
ShardOwnerFn func(shardID uint64) (string, string, *meta.ShardGroupInfo)
}

func (m MetaStore) NodeID() uint64 { return m.NodeIDFn() }
func (m MetaClient) NodeID() uint64 { return m.NodeIDFn() }

func (m MetaStore) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
func (m MetaClient) RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) {
return m.RetentionPolicyFn(database, name)
}

func (m MetaStore) CreateShardGroupIfNotExists(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
func (m MetaClient) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
return m.CreateShardGroupIfNotExistsFn(database, policy, timestamp)
}

func (m MetaStore) Database(database string) (*meta.DatabaseInfo, error) {
func (m MetaClient) Database(database string) (*meta.DatabaseInfo, error) {
return m.DatabaseFn(database)
}

func (m MetaStore) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) {
func (m MetaClient) ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo) {
return m.ShardOwnerFn(shardID)
}

Expand Down
6 changes: 3 additions & 3 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/services/meta"
"github.com/influxdb/influxdb/tsdb"
)

Expand Down Expand Up @@ -43,7 +43,7 @@ type Service struct {

Listener net.Listener

MetaStore interface {
MetaClient interface {
ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)
}

Expand Down Expand Up @@ -198,7 +198,7 @@ func (s *Service) processWriteShardRequest(buf []byte) error {
if err == tsdb.ErrShardNotFound {

// Query the metastore for the owner of this shard
database, retentionPolicy, sgi := s.MetaStore.ShardOwner(req.ShardID())
database, retentionPolicy, sgi := s.MetaClient.ShardOwner(req.ShardID())
if sgi == nil {
// If we can't find it, then we need to drop this request
// as it is no longer valid. This could happen if writes were queued via
Expand Down
10 changes: 5 additions & 5 deletions cluster/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,20 @@ import (

"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/services/meta"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/tsdb"
)

type metaStore struct {
type metaClient struct {
host string
}

func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) {
func (m *metaClient) DataNode(nodeID uint64) (*meta.NodeInfo, error) {
return &meta.NodeInfo{
ID: nodeID,
Host: m.host,
ID: nodeID,
TCPHost: m.host,
}, nil
}

Expand Down
16 changes: 9 additions & 7 deletions cluster/shard_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"net"
"time"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/services/meta"
"github.com/influxdb/influxdb/tsdb"
)

Expand All @@ -18,9 +19,10 @@ import (
type ShardMapper struct {
ForceRemoteMapping bool // All shards treated as remote. Useful for testing.

MetaStore interface {
NodeID() uint64
Node(id uint64) (ni *meta.NodeInfo, err error)
Node *influxdb.Node

MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
}

TSDBStore interface {
Expand All @@ -42,7 +44,7 @@ func NewShardMapper(timeout time.Duration) *ShardMapper {
// CreateMapper returns a Mapper for the given shard ID.
func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, chunkSize int) (tsdb.Mapper, error) {
// Create a remote mapper if the local node doesn't own the shard.
if !sh.OwnedBy(s.MetaStore.NodeID()) || s.ForceRemoteMapping {
if !sh.OwnedBy(s.Node.ID) || s.ForceRemoteMapping {
// Pick a node in a pseudo-random manner.
conn, err := s.dial(sh.Owners[rand.Intn(len(sh.Owners))].NodeID)
if err != nil {
Expand All @@ -63,11 +65,11 @@ func (s *ShardMapper) CreateMapper(sh meta.ShardInfo, stmt influxql.Statement, c
}

func (s *ShardMapper) dial(nodeID uint64) (net.Conn, error) {
ni, err := s.MetaStore.Node(nodeID)
ni, err := s.MetaClient.DataNode(nodeID)
if err != nil {
return nil, err
}
conn, err := net.Dial("tcp", ni.Host)
conn, err := net.Dial("tcp", ni.TCPHost)
if err != nil {
return nil, err
}
Expand Down
16 changes: 8 additions & 8 deletions cluster/shard_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"net"
"time"

"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/models"
"github.com/influxdb/influxdb/services/meta"
"gopkg.in/fatih/pool.v2"
)

Expand All @@ -22,8 +22,8 @@ type ShardWriter struct {
pool *clientPool
timeout time.Duration

MetaStore interface {
Node(id uint64) (ni *meta.NodeInfo, err error)
MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
}
}

Expand Down Expand Up @@ -94,7 +94,7 @@ func (w *ShardWriter) dial(nodeID uint64) (net.Conn, error) {
_, ok := w.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: w.pool, timeout: w.timeout}
factory.metaStore = w.MetaStore
factory.metaClient = w.MetaClient

p, err := pool.NewChannelPool(1, 3, factory.dial)
if err != nil {
Expand Down Expand Up @@ -130,8 +130,8 @@ type connFactory struct {
size() int
}

metaStore interface {
Node(id uint64) (ni *meta.NodeInfo, err error)
metaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
}
}

Expand All @@ -140,7 +140,7 @@ func (c *connFactory) dial() (net.Conn, error) {
return nil, errMaxConnectionsExceeded
}

ni, err := c.metaStore.Node(c.nodeID)
ni, err := c.metaClient.DataNode(c.nodeID)
if err != nil {
return nil, err
}
Expand All @@ -149,7 +149,7 @@ func (c *connFactory) dial() (net.Conn, error) {
return nil, fmt.Errorf("node %d does not exist", c.nodeID)
}

conn, err := net.DialTimeout("tcp", ni.Host, c.timeout)
conn, err := net.DialTimeout("tcp", ni.TCPHost, c.timeout)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 58e0eed

Please sign in to comment.