Skip to content

Commit

Permalink
Make client timeouts configurable and increase frontend timeout (cade…
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Nov 16, 2018
1 parent 948ee2a commit eecdbba
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 27 deletions.
31 changes: 26 additions & 5 deletions client/clientfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,18 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/membership"
"github.com/uber/cadence/common/metrics"
"time"
)

// Factory can be used to create RPC clients for cadence services
type Factory interface {
NewHistoryClient() (history.Client, error)
NewMatchingClient() (matching.Client, error)
NewFrontendClient() (frontend.Client, error)

NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error)
NewMatchingClientWithTimeout(timeout time.Duration, longPollTimeout time.Duration) (matching.Client, error)
NewFrontendClientWithTimeout(timeout time.Duration) (frontend.Client, error)
}

type rpcClientFactory struct {
Expand All @@ -55,7 +60,19 @@ func NewRPCClientFactory(df common.RPCFactory,
}

func (cf *rpcClientFactory) NewHistoryClient() (history.Client, error) {
client, err := history.NewClient(cf.df, cf.monitor, cf.numberOfHistoryShards)
return cf.NewHistoryClientWithTimeout(history.DefaultTimeout)
}

func (cf *rpcClientFactory) NewMatchingClient() (matching.Client, error) {
return cf.NewMatchingClientWithTimeout(matching.DefaultTimeout, matching.DefaultLongPollTimeout)
}

func (cf *rpcClientFactory) NewFrontendClient() (frontend.Client, error) {
return cf.NewFrontendClientWithTimeout(frontend.DefaultTimeout)
}

func (cf *rpcClientFactory) NewHistoryClientWithTimeout(timeout time.Duration) (history.Client, error) {
client, err := history.NewClient(cf.df, cf.monitor, cf.numberOfHistoryShards, timeout)
if err != nil {
return nil, err
}
Expand All @@ -65,8 +82,12 @@ func (cf *rpcClientFactory) NewHistoryClient() (history.Client, error) {
return client, nil
}

func (cf *rpcClientFactory) NewMatchingClient() (matching.Client, error) {
client, err := matching.NewClient(cf.df, cf.monitor)
func (cf *rpcClientFactory) NewMatchingClientWithTimeout(
timeout time.Duration,
longPollTimeout time.Duration,
) (matching.Client, error) {

client, err := matching.NewClient(cf.df, cf.monitor, timeout, longPollTimeout)
if err != nil {
return nil, err
}
Expand All @@ -76,8 +97,8 @@ func (cf *rpcClientFactory) NewMatchingClient() (matching.Client, error) {
return client, nil
}

func (cf *rpcClientFactory) NewFrontendClient() (frontend.Client, error) {
client, err := frontend.NewClient(cf.df, cf.monitor)
func (cf *rpcClientFactory) NewFrontendClientWithTimeout(timeout time.Duration) (frontend.Client, error) {
client, err := frontend.NewClient(cf.df, cf.monitor, timeout)
if err != nil {
return nil, err
}
Expand Down
15 changes: 10 additions & 5 deletions client/frontend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@ import (

var _ Client = (*clientImpl)(nil)

const (
// DefaultTimeout is the default timeout used to make calls
DefaultTimeout = time.Minute * 3
)

type clientImpl struct {
resolver membership.ServiceResolver
thriftCacheLock sync.RWMutex
thriftCache map[string]workflowserviceclient.Interface
rpcFactory common.RPCFactory
timeout time.Duration
}

// NewClient creates a new frontend service TChannel client
func NewClient(d common.RPCFactory, monitor membership.Monitor) (Client, error) {
func NewClient(d common.RPCFactory, monitor membership.Monitor, timeout time.Duration) (Client, error) {
sResolver, err := monitor.GetResolver(common.FrontendServiceName)
if err != nil {
return nil, err
Expand All @@ -54,6 +60,7 @@ func NewClient(d common.RPCFactory, monitor membership.Monitor) (Client, error)
rpcFactory: d,
resolver: sResolver,
thriftCache: make(map[string]workflowserviceclient.Interface),
timeout: timeout,
}
return client, nil
}
Expand Down Expand Up @@ -539,12 +546,10 @@ func (c *clientImpl) UpdateDomain(
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
// TODO: make timeout configurable
timeout := time.Minute * 1
if parent == nil {
return context.WithTimeout(context.Background(), timeout)
return context.WithTimeout(context.Background(), c.timeout)
}
return context.WithTimeout(parent, timeout)
return context.WithTimeout(parent, c.timeout)
}

func (c *clientImpl) getRandomHost() (workflowserviceclient.Interface, error) {
Expand Down
15 changes: 10 additions & 5 deletions client/history/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ import (

var _ Client = (*clientImpl)(nil)

const (
// DefaultTimeout is the default timeout used to make calls
DefaultTimeout = time.Second * 30
)

type clientImpl struct {
resolver membership.ServiceResolver
tokenSerializer common.TaskTokenSerializer
Expand All @@ -43,10 +48,11 @@ type clientImpl struct {
thriftCacheLock sync.RWMutex
thriftCache map[string]historyserviceclient.Interface
rpcFactory common.RPCFactory
timeout time.Duration
}

// NewClient creates a new history service TChannel client
func NewClient(d common.RPCFactory, monitor membership.Monitor, numberOfShards int) (Client, error) {
func NewClient(d common.RPCFactory, monitor membership.Monitor, numberOfShards int, timeout time.Duration) (Client, error) {
sResolver, err := monitor.GetResolver(common.HistoryServiceName)
if err != nil {
return nil, err
Expand All @@ -58,6 +64,7 @@ func NewClient(d common.RPCFactory, monitor membership.Monitor, numberOfShards i
tokenSerializer: common.NewJSONTaskTokenSerializer(),
numberOfShards: numberOfShards,
thriftCache: make(map[string]historyserviceclient.Interface),
timeout: timeout,
}
return client, nil
}
Expand Down Expand Up @@ -614,12 +621,10 @@ func (c *clientImpl) getHostForRequest(workflowID string) (historyserviceclient.
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
// TODO: make timeout configurable
timeout := time.Second * 30
if parent == nil {
return context.WithTimeout(context.Background(), timeout)
return context.WithTimeout(context.Background(), c.timeout)
}
return context.WithTimeout(parent, timeout)
return context.WithTimeout(parent, c.timeout)
}

func (c *clientImpl) getThriftClient(hostPort string) historyserviceclient.Interface {
Expand Down
36 changes: 24 additions & 12 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,40 @@ import (

var _ Client = (*clientImpl)(nil)

const (
// DefaultTimeout is the default timeout used to make calls
DefaultTimeout = time.Minute
// DefaultLongPollTimeout is the long poll default timeout used to make calls
DefaultLongPollTimeout = time.Minute * 2
)

type clientImpl struct {
resolver membership.ServiceResolver
thriftCacheLock sync.RWMutex
thriftCache map[string]matchingserviceclient.Interface
rpcFactory common.RPCFactory
timeout time.Duration
longPollTimeout time.Duration
}

// NewClient creates a new history service TChannel client
func NewClient(d common.RPCFactory, monitor membership.Monitor) (Client, error) {
func NewClient(
d common.RPCFactory,
monitor membership.Monitor,
timeout time.Duration,
longPollTimeout time.Duration,
) (Client, error) {
sResolver, err := monitor.GetResolver(common.MatchingServiceName)
if err != nil {
return nil, err
}

client := &clientImpl{
rpcFactory: d,
resolver: sResolver,
thriftCache: make(map[string]matchingserviceclient.Interface),
rpcFactory: d,
resolver: sResolver,
thriftCache: make(map[string]matchingserviceclient.Interface),
timeout: timeout,
longPollTimeout: longPollTimeout,
}
return client, nil
}
Expand Down Expand Up @@ -166,21 +182,17 @@ func (c *clientImpl) getHostForRequest(key string) (matchingserviceclient.Interf
}

func (c *clientImpl) createContext(parent context.Context) (context.Context, context.CancelFunc) {
// TODO: make timeout configurable
timeout := time.Minute * 1
if parent == nil {
return context.WithTimeout(context.Background(), timeout)
return context.WithTimeout(context.Background(), c.timeout)
}
return context.WithTimeout(parent, timeout)
return context.WithTimeout(parent, c.timeout)
}

func (c *clientImpl) createLongPollContext(parent context.Context) (context.Context, context.CancelFunc) {
// TODO: make timeout configurable
timeout := time.Minute * 2
if parent == nil {
return context.WithTimeout(context.Background(), timeout)
return context.WithTimeout(context.Background(), c.longPollTimeout)
}
return context.WithTimeout(parent, timeout)
return context.WithTimeout(parent, c.longPollTimeout)
}

func (c *clientImpl) getThriftClient(hostPort string) matchingserviceclient.Interface {
Expand Down
10 changes: 10 additions & 0 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
persistencefactory "github.com/uber/cadence/common/persistence/persistence-factory"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/dynamicconfig"
"go.uber.org/cadence/worker"
)

const (
Expand Down Expand Up @@ -93,9 +94,18 @@ func (s *Service) Start() {
if s.params.ClusterMetadata.IsGlobalDomainEnabled() {
s.startReplicator(params, base, log)
}

frontendClient := s.getFrontendClient(base, log)
w := worker.New(frontendClient, SystemWorkflowDomain, SystemTaskList, worker.Options{})
if err := w.Start(); err != nil {
w.Stop()
log.Fatalf("failed to start worker: %v", err)
}

log.Infof("%v started", common.WorkerServiceName)

<-s.stopC
w.Stop()
base.Stop()
}

Expand Down

0 comments on commit eecdbba

Please sign in to comment.