Skip to content

Commit

Permalink
matching: support for task list partitions (cadence-workflow#2229)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Jul 26, 2019
1 parent 6ca71a6 commit 2cc59b5
Show file tree
Hide file tree
Showing 29 changed files with 2,130 additions and 208 deletions.
262 changes: 241 additions & 21 deletions .gen/go/matching/matching.go

Large diffs are not rendered by default.

36 changes: 26 additions & 10 deletions client/clientBean.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"fmt"
"net"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/uber/cadence/client/admin"
Expand All @@ -49,7 +51,7 @@ type (
// Bean in an collection of clients
Bean interface {
GetHistoryClient() history.Client
GetMatchingClient() matching.Client
GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)
GetFrontendClient() frontend.Client
GetRemoteAdminClient(cluster string) admin.Client
GetRemoteFrontendClient(cluster string) frontend.Client
Expand All @@ -61,11 +63,13 @@ type (
}

clientBeanImpl struct {
sync.Mutex
historyClient history.Client
matchingClient matching.Client
matchingClient atomic.Value
frontendClient frontend.Client
remoteAdminClients map[string]admin.Client
remoteFrontendClients map[string]frontend.Client
factory Factory
}

dnsDispatcherProvider struct {
Expand Down Expand Up @@ -98,11 +102,6 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
return nil, err
}

matchingClient, err := factory.NewMatchingClient()
if err != nil {
return nil, err
}

frontendClient, err := factory.NewFrontendClient()
if err != nil {
return nil, err
Expand Down Expand Up @@ -140,8 +139,8 @@ func NewClientBean(factory Factory, dispatcherProvider DispatcherProvider, clust
}

return &clientBeanImpl{
factory: factory,
historyClient: historyClient,
matchingClient: matchingClient,
frontendClient: frontendClient,
remoteAdminClients: remoteAdminClients,
remoteFrontendClients: remoteFrontendClients,
Expand All @@ -152,8 +151,11 @@ func (h *clientBeanImpl) GetHistoryClient() history.Client {
return h.historyClient
}

func (h *clientBeanImpl) GetMatchingClient() matching.Client {
return h.matchingClient
func (h *clientBeanImpl) GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
if client := h.matchingClient.Load(); client != nil {
return client.(matching.Client), nil
}
return h.lazyInitMatchingClient(domainIDToName)
}

func (h *clientBeanImpl) GetFrontendClient() frontend.Client {
Expand Down Expand Up @@ -184,6 +186,20 @@ func (h *clientBeanImpl) GetRemoteFrontendClient(cluster string) frontend.Client
return client
}

func (h *clientBeanImpl) lazyInitMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
h.Lock()
defer h.Unlock()
if cached := h.matchingClient.Load(); cached != nil {
return cached.(matching.Client), nil
}
client, err := h.factory.NewMatchingClient(domainIDToName)
if err != nil {
return nil, err
}
h.matchingClient.Store(client)
return client, nil
}

// NewDNSYarpcDispatcherProvider create a dispatcher provider which handles with IP address
func NewDNSYarpcDispatcherProvider(logger log.Logger, interval time.Duration) DispatcherProvider {
if interval <= 0 {
Expand Down
19 changes: 13 additions & 6 deletions client/clientBean_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,27 @@ func (_m *MockClientBean) GetHistoryClient() history.Client {
return r0
}

// GetMatchingClient provides a mock function with given fields:
func (_m *MockClientBean) GetMatchingClient() matching.Client {
ret := _m.Called()
// GetMatchingClient provides a mock function with given fields: domainIDToName
func (_m *MockClientBean) GetMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
ret := _m.Called(domainIDToName)

var r0 matching.Client
if rf, ok := ret.Get(0).(func() matching.Client); ok {
r0 = rf()
if rf, ok := ret.Get(0).(func(DomainIDToNameFunc) matching.Client); ok {
r0 = rf(domainIDToName)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(matching.Client)
}
}

return r0
var r1 error
if rf, ok := ret.Get(1).(func(DomainIDToNameFunc) error); ok {
r1 = rf(domainIDToName)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// GetFrontendClient provides a mock function with given fields:
Expand Down
64 changes: 42 additions & 22 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/service/dynamicconfig"
)

const (
Expand All @@ -49,34 +50,46 @@ const (
clientKeyDispatcher = "client-key-dispatcher"
)

// Factory can be used to create RPC clients for cadence services
type Factory interface {
NewHistoryClient() (history.Client, error)
NewMatchingClient() (matching.Client, error)
NewFrontendClient() (frontend.Client, error)
type (
// Factory can be used to create RPC clients for cadence services
Factory interface {
NewHistoryClient() (history.Client, error)
NewMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error)
NewFrontendClient() (frontend.Client, error)

NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error)
NewMatchingClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (matching.Client, error)
NewFrontendClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (frontend.Client, error)
NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error)
NewMatchingClientWithTimeout(domainIDToName DomainIDToNameFunc, timeout time.Duration, longPollTimeout time.Duration) (matching.Client, error)
NewFrontendClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (frontend.Client, error)

NewAdminClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, dispatcher *yarpc.Dispatcher) (admin.Client, error)
NewFrontendClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, longPollTimeout time.Duration, dispatcher *yarpc.Dispatcher) (frontend.Client, error)
}
NewAdminClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, dispatcher *yarpc.Dispatcher) (admin.Client, error)
NewFrontendClientWithTimeoutAndDispatcher(rpcName string, timeout time.Duration, longPollTimeout time.Duration, dispatcher *yarpc.Dispatcher) (frontend.Client, error)
}

type rpcClientFactory struct {
rpcFactory common.RPCFactory
monitor membership.Monitor
metricsClient metrics.Client
numberOfHistoryShards int
}
// DomainIDToNameFunc maps a domainID to domain name. Returns error when mapping is not possible.
DomainIDToNameFunc func(string) (string, error)

rpcClientFactory struct {
rpcFactory common.RPCFactory
monitor membership.Monitor
metricsClient metrics.Client
dynConfig *dynamicconfig.Collection
numberOfHistoryShards int
}
)

// NewRPCClientFactory creates an instance of client factory that knows how to dispatch RPC calls.
func NewRPCClientFactory(rpcFactory common.RPCFactory, monitor membership.Monitor,
metricsClient metrics.Client, numberOfHistoryShards int) Factory {
func NewRPCClientFactory(
rpcFactory common.RPCFactory,
monitor membership.Monitor,
metricsClient metrics.Client,
dc *dynamicconfig.Collection,
numberOfHistoryShards int,
) Factory {
return &rpcClientFactory{
rpcFactory: rpcFactory,
monitor: monitor,
metricsClient: metricsClient,
dynConfig: dc,
numberOfHistoryShards: numberOfHistoryShards,
}
}
Expand All @@ -85,8 +98,8 @@ func (cf *rpcClientFactory) NewHistoryClient() (history.Client, error) {
return cf.NewHistoryClientWithTimeout(history.DefaultTimeout)
}

func (cf *rpcClientFactory) NewMatchingClient() (matching.Client, error) {
return cf.NewMatchingClientWithTimeout(matching.DefaultTimeout, matching.DefaultLongPollTimeout)
func (cf *rpcClientFactory) NewMatchingClient(domainIDToName DomainIDToNameFunc) (matching.Client, error) {
return cf.NewMatchingClientWithTimeout(domainIDToName, matching.DefaultTimeout, matching.DefaultLongPollTimeout)
}

func (cf *rpcClientFactory) NewFrontendClient() (frontend.Client, error) {
Expand Down Expand Up @@ -120,6 +133,7 @@ func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (
}

func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
domainIDToName DomainIDToNameFunc,
timeout time.Duration,
longPollTimeout time.Duration,
) (matching.Client, error) {
Expand All @@ -141,7 +155,13 @@ func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
return matchingserviceclient.New(dispatcher.ClientConfig(common.MatchingServiceName)), nil
}

client := matching.NewClient(timeout, longPollTimeout, common.NewClientCache(keyResolver, clientProvider))
client := matching.NewClient(
timeout,
longPollTimeout,
common.NewClientCache(keyResolver, clientProvider),
matching.NewLoadBalancer(domainIDToName, cf.dynConfig),
)

if cf.metricsClient != nil {
client = matching.NewMetricClient(client, cf.metricsClient)
}
Expand Down
69 changes: 54 additions & 15 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/uber/cadence/.gen/go/matching/matchingserviceclient"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"go.uber.org/yarpc"
)

Expand All @@ -44,86 +45,124 @@ type clientImpl struct {
timeout time.Duration
longPollTimeout time.Duration
clients common.ClientCache
loadBalancer LoadBalancer
}

// NewClient creates a new history service TChannel client
func NewClient(
timeout time.Duration,
longPollTimeout time.Duration,
clients common.ClientCache,
lb LoadBalancer,
) Client {
return &clientImpl{
timeout: timeout,
longPollTimeout: longPollTimeout,
clients: clients,
loadBalancer: lb,
}
}

func (c *clientImpl) AddActivityTask(
ctx context.Context,
addRequest *m.AddActivityTaskRequest,
request *m.AddActivityTaskRequest,
opts ...yarpc.CallOption) error {
opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getClientForTasklist(addRequest.TaskList.GetName())
partition := c.loadBalancer.PickWritePartition(
request.GetDomainUUID(),
*request.GetTaskList(),
persistence.TaskListTypeActivity,
request.GetForwardedFrom(),
)
request.TaskList.Name = &partition
client, err := c.getClientForTasklist(partition)
if err != nil {
return err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.AddActivityTask(ctx, addRequest, opts...)
return client.AddActivityTask(ctx, request, opts...)
}

func (c *clientImpl) AddDecisionTask(
ctx context.Context,
addRequest *m.AddDecisionTaskRequest,
request *m.AddDecisionTaskRequest,
opts ...yarpc.CallOption) error {
opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getClientForTasklist(addRequest.TaskList.GetName())
partition := c.loadBalancer.PickWritePartition(
request.GetDomainUUID(),
*request.GetTaskList(),
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
)
request.TaskList.Name = &partition
client, err := c.getClientForTasklist(request.TaskList.GetName())
if err != nil {
return err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.AddDecisionTask(ctx, addRequest, opts...)
return client.AddDecisionTask(ctx, request, opts...)
}

func (c *clientImpl) PollForActivityTask(
ctx context.Context,
pollRequest *m.PollForActivityTaskRequest,
request *m.PollForActivityTaskRequest,
opts ...yarpc.CallOption) (*workflow.PollForActivityTaskResponse, error) {
opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getClientForTasklist(pollRequest.PollRequest.TaskList.GetName())
partition := c.loadBalancer.PickReadPartition(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeActivity,
request.GetForwardedFrom(),
)
request.PollRequest.TaskList.Name = &partition
client, err := c.getClientForTasklist(request.PollRequest.TaskList.GetName())
if err != nil {
return nil, err
}
ctx, cancel := c.createLongPollContext(ctx)
defer cancel()
return client.PollForActivityTask(ctx, pollRequest, opts...)
return client.PollForActivityTask(ctx, request, opts...)
}

func (c *clientImpl) PollForDecisionTask(
ctx context.Context,
pollRequest *m.PollForDecisionTaskRequest,
request *m.PollForDecisionTaskRequest,
opts ...yarpc.CallOption) (*m.PollForDecisionTaskResponse, error) {
opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getClientForTasklist(pollRequest.PollRequest.TaskList.GetName())
partition := c.loadBalancer.PickReadPartition(
request.GetDomainUUID(),
*request.PollRequest.GetTaskList(),
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
)
request.PollRequest.TaskList.Name = &partition
client, err := c.getClientForTasklist(request.PollRequest.TaskList.GetName())
if err != nil {
return nil, err
}
ctx, cancel := c.createLongPollContext(ctx)
defer cancel()
return client.PollForDecisionTask(ctx, pollRequest, opts...)
return client.PollForDecisionTask(ctx, request, opts...)
}

func (c *clientImpl) QueryWorkflow(ctx context.Context, queryRequest *m.QueryWorkflowRequest, opts ...yarpc.CallOption) (*workflow.QueryWorkflowResponse, error) {
func (c *clientImpl) QueryWorkflow(ctx context.Context, request *m.QueryWorkflowRequest, opts ...yarpc.CallOption) (*workflow.QueryWorkflowResponse, error) {
opts = common.AggregateYarpcOptions(ctx, opts...)
client, err := c.getClientForTasklist(queryRequest.TaskList.GetName())
partition := c.loadBalancer.PickReadPartition(
request.GetDomainUUID(),
*request.GetTaskList(),
persistence.TaskListTypeDecision,
request.GetForwardedFrom(),
)
request.TaskList.Name = &partition
client, err := c.getClientForTasklist(request.TaskList.GetName())
if err != nil {
return nil, err
}
ctx, cancel := c.createContext(ctx)
defer cancel()
return client.QueryWorkflow(ctx, queryRequest, opts...)
return client.QueryWorkflow(ctx, request, opts...)
}

func (c *clientImpl) RespondQueryTaskCompleted(ctx context.Context, request *m.RespondQueryTaskCompletedRequest, opts ...yarpc.CallOption) error {
Expand Down
Loading

0 comments on commit 2cc59b5

Please sign in to comment.