Skip to content

Commit

Permalink
Use ringpop for membership
Browse files Browse the repository at this point in the history
Summary:
This revision creates a layer on top of ringpop primitives.
The plan is to use one ringpop ring for membership of all services,
and use this layer to shard keys for every individual service.

Reviewers: nx, maxim, sivakk, aravindv, samar

Reviewed By: sivakk, aravindv, samar

Subscribers: jenkins

Differential Revision: https://code.uberinternal.com/D690749
  • Loading branch information
Tamer Eldeeb committed Jan 11, 2017
1 parent 51d479e commit 07738de
Show file tree
Hide file tree
Showing 12 changed files with 802 additions and 57 deletions.
71 changes: 58 additions & 13 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,74 +7,119 @@ import (

h "code.uber.internal/devexp/minions/.gen/go/history"
workflow "code.uber.internal/devexp/minions/.gen/go/shared"
"code.uber.internal/devexp/minions/common/membership"
tchannel "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
)

const historyServiceName = "uber-minions-history"
const historyServiceName = "cadence-history"
const shardID = "1" // TODO: actually derive shardID from request

var _ Client = (*clientImpl)(nil)

type clientImpl struct {
connection *tchannel.Channel
client h.TChanHistoryService
resolver membership.ServiceResolver
}

// NewClient creates a new history service TChannel client
func NewClient(ch *tchannel.Channel) (Client, error) {
tClient := thrift.NewClient(ch, historyServiceName, nil)
func NewClient(ch *tchannel.Channel, monitor membership.Monitor) (Client, error) {
sResolver, err := monitor.GetResolver(historyServiceName)
if err != nil {
return nil, err
}

client := &clientImpl{
connection: ch,
client: h.NewTChanHistoryServiceClient(tClient),
resolver: sResolver,
}
return client, nil
}

func (c *clientImpl) getHostForRequest(key string) (h.TChanHistoryService, error) {
host, err := c.resolver.Lookup(key)
if err != nil {
return nil, err
}
// TODO: build client cache
tClient := thrift.NewClient(c.connection, historyServiceName, &thrift.ClientOptions{
HostPort: host.GetAddress(),
})
return h.NewTChanHistoryServiceClient(tClient), nil
}

func (c *clientImpl) createContext() (thrift.Context, context.CancelFunc) {
// TODO: make timeout configurable
return thrift.NewContext(time.Second * 5)
return thrift.NewContext(time.Second * 10)
}

func (c *clientImpl) StartWorkflowExecution(request *workflow.StartWorkflowExecutionRequest) (*workflow.StartWorkflowExecutionResponse, error) {
client, err := c.getHostForRequest(shardID)
if err != nil {
return nil, err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.StartWorkflowExecution(ctx, request)
return client.StartWorkflowExecution(ctx, request)
}

func (c *clientImpl) GetWorkflowExecutionHistory(
request *workflow.GetWorkflowExecutionHistoryRequest) (*workflow.GetWorkflowExecutionHistoryResponse, error) {
client, err := c.getHostForRequest(shardID)
if err != nil {
return nil, err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.GetWorkflowExecutionHistory(ctx, request)
return client.GetWorkflowExecutionHistory(ctx, request)
}

func (c *clientImpl) RecordDecisionTaskStarted(request *h.RecordDecisionTaskStartedRequest) (*h.RecordDecisionTaskStartedResponse, error) {
client, err := c.getHostForRequest(shardID)
if err != nil {
return nil, err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.RecordDecisionTaskStarted(ctx, request)
return client.RecordDecisionTaskStarted(ctx, request)
}

func (c *clientImpl) RecordActivityTaskStarted(request *h.RecordActivityTaskStartedRequest) (*h.RecordActivityTaskStartedResponse, error) {
client, err := c.getHostForRequest(shardID)
if err != nil {
return nil, err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.RecordActivityTaskStarted(ctx, request)
return client.RecordActivityTaskStarted(ctx, request)
}

func (c *clientImpl) RespondDecisionTaskCompleted(request *workflow.RespondDecisionTaskCompletedRequest) error {
client, err := c.getHostForRequest(shardID)
if err != nil {
return err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.RespondDecisionTaskCompleted(ctx, request)
return client.RespondDecisionTaskCompleted(ctx, request)
}

func (c *clientImpl) RespondActivityTaskCompleted(request *workflow.RespondActivityTaskCompletedRequest) error {
client, err := c.getHostForRequest(shardID)
if err != nil {
return err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.RespondActivityTaskCompleted(ctx, request)
return client.RespondActivityTaskCompleted(ctx, request)
}

func (c *clientImpl) RespondActivityTaskFailed(request *workflow.RespondActivityTaskFailedRequest) error {
client, err := c.getHostForRequest(shardID)
if err != nil {
return err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.RespondActivityTaskFailed(ctx, request)
return client.RespondActivityTaskFailed(ctx, request)
}
52 changes: 42 additions & 10 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,55 +7,87 @@ import (

m "code.uber.internal/devexp/minions/.gen/go/matching"
workflow "code.uber.internal/devexp/minions/.gen/go/shared"
"code.uber.internal/devexp/minions/common/membership"
tchannel "github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
)

const matchingServiceName = "uber-minions-matching"
const matchingServiceName = "cadence-matching"

var _ Client = (*clientImpl)(nil)

type clientImpl struct {
connection *tchannel.Channel
client m.TChanMatchingService
resolver membership.ServiceResolver
}

// NewClient creates a new history service TChannel client
func NewClient(ch *tchannel.Channel) (Client, error) {
tClient := thrift.NewClient(ch, matchingServiceName, nil)
func NewClient(ch *tchannel.Channel, monitor membership.Monitor) (Client, error) {
sResolver, err := monitor.GetResolver(matchingServiceName)
if err != nil {
return nil, err
}

client := &clientImpl{
connection: ch,
client: m.NewTChanMatchingServiceClient(tClient),
resolver: sResolver,
}
return client, nil
}

func (c *clientImpl) getHostForRequest(key string) (m.TChanMatchingService, error) {
host, err := c.resolver.Lookup(key)
if err != nil {
return nil, err
}
// TODO: build client cache
tClient := thrift.NewClient(c.connection, matchingServiceName, &thrift.ClientOptions{
HostPort: host.GetAddress(),
})
return m.NewTChanMatchingServiceClient(tClient), nil
}

func (c *clientImpl) createContext() (thrift.Context, context.CancelFunc) {
// TODO: make timeout configurable
return thrift.NewContext(time.Second * 5)
return thrift.NewContext(time.Second * 10)
}

func (c *clientImpl) AddActivityTask(addRequest *m.AddActivityTaskRequest) error {
client, err := c.getHostForRequest(addRequest.GetTaskList().GetName())
if err != nil {
return err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.AddActivityTask(ctx, addRequest)
return client.AddActivityTask(ctx, addRequest)
}

func (c *clientImpl) AddDecisionTask(addRequest *m.AddDecisionTaskRequest) error {
client, err := c.getHostForRequest(addRequest.GetTaskList().GetName())
if err != nil {
return err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.AddDecisionTask(ctx, addRequest)
return client.AddDecisionTask(ctx, addRequest)
}

func (c *clientImpl) PollForActivityTask(pollRequest *workflow.PollForActivityTaskRequest) (*workflow.PollForActivityTaskResponse, error) {
client, err := c.getHostForRequest(pollRequest.GetTaskList().GetName())
if err != nil {
return nil, err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.PollForActivityTask(ctx, pollRequest)
return client.PollForActivityTask(ctx, pollRequest)
}

func (c *clientImpl) PollForDecisionTask(pollRequest *workflow.PollForDecisionTaskRequest) (*workflow.PollForDecisionTaskResponse, error) {
client, err := c.getHostForRequest(pollRequest.GetTaskList().GetName())
if err != nil {
return nil, err
}
ctx, cancel := c.createContext()
defer cancel()
return c.client.PollForDecisionTask(ctx, pollRequest)
return client.PollForDecisionTask(ctx, pollRequest)
}
13 changes: 8 additions & 5 deletions common/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"code.uber.internal/devexp/minions/client/history"
"code.uber.internal/devexp/minions/client/matching"
"code.uber.internal/devexp/minions/common/membership"
tchannel "github.com/uber/tchannel-go"
)

Expand All @@ -13,19 +14,21 @@ type ClientFactory interface {
}

type tchannelClientFactory struct {
ch *tchannel.Channel
ch *tchannel.Channel
monitor membership.Monitor
}

func newTChannelClientFactory(ch *tchannel.Channel) ClientFactory {
func newTChannelClientFactory(ch *tchannel.Channel, monitor membership.Monitor) ClientFactory {
return &tchannelClientFactory{
ch: ch,
ch: ch,
monitor: monitor,
}
}

func (cf *tchannelClientFactory) NewHistoryClient() (history.Client, error) {
return history.NewClient(cf.ch)
return history.NewClient(cf.ch, cf.monitor)
}

func (cf *tchannelClientFactory) NewMatchingClient() (matching.Client, error) {
return matching.NewClient(cf.ch)
return matching.NewClient(cf.ch, cf.monitor)
}
40 changes: 40 additions & 0 deletions common/membership/hostinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package membership

// HostInfo is a type that contains the info about a cadence host
type HostInfo struct {
addr string // ip:port
labels map[string]string
}

// NewHostInfo creates a new HostInfo instance
func NewHostInfo(addr string, labels map[string]string) *HostInfo {
if labels == nil {
labels = make(map[string]string)
}
return &HostInfo{
addr: addr,
labels: labels,
}
}

// GetAddress returns the ip:port address
func (hi *HostInfo) GetAddress() string {
return hi.addr
}

// Identity implements ringpop's Membership interface
func (hi *HostInfo) Identity() string {
// for now we just use the address as the identity
return hi.addr
}

// Label implements ringpop's Membership interface
func (hi *HostInfo) Label(key string) (value string, has bool) {
value, has = hi.labels[key]
return
}

// SetLabel sets the label.
func (hi *HostInfo) SetLabel(key string, value string) {
hi.labels[key] = value
}
53 changes: 53 additions & 0 deletions common/membership/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package membership

import "errors"

// ErrUnknownService is thrown for a service that is not tracked by this instance
var ErrUnknownService = errors.New("Service not tracked by Oracle")

// ErrInsufficientHosts is thrown when there are not enough hosts to serve the request
var ErrInsufficientHosts = errors.New("Not enough hosts to serve the request")

// ErrListenerAlreadyExist is thrown on a duplicate AddListener call from the same listener
var ErrListenerAlreadyExist = errors.New("Listener already exist for the service")

type (

// ChangedEvent describes a change in membership
ChangedEvent struct {
HostsAdded []*HostInfo
HostsUpdated []*HostInfo
HostsRemoved []*HostInfo
}

// Monitor provides membership information for all cadence services.
// It can be used to query which member host of a service is responsible for serving a given key.
Monitor interface {
Start() error
Stop()
Lookup(service string, key string) (*HostInfo, error)
GetResolver(service string) (ServiceResolver, error)
// AddListener adds a listener for this service.
// The listener will get notified on the given
// channel, whenever there is a membership change.
// @service: The service to be listened on
// @name: The name for identifying the listener
// @notifyChannel: The channel on which the caller receives notifications
AddListener(service string, name string, notifyChannel chan<- *ChangedEvent) error
// RemoveListener removes a listener for this service.
RemoveListener(service string, name string) error
}

// ServiceResolver provides membership information for a specific cadence service.
// It can be used to resolve which member host is responsible for serving a given key.
ServiceResolver interface {
Lookup(key string) (*HostInfo, error)
// AddListener adds a listener which will get notified on the given
// channel, whenever membership changes.
// @name: The name for identifying the listener
// @notifyChannel: The channel on which the caller receives notifications
AddListener(name string, notifyChannel chan<- *ChangedEvent) error
// RemoveListener removes a listener for this service.
RemoveListener(name string) error
}
)
Loading

0 comments on commit 07738de

Please sign in to comment.