Skip to content

Commit

Permalink
Pass config to cadence matching and frontend services during initiali…
Browse files Browse the repository at this point in the history
…zation (cadence-workflow#296)

Replace constant config with a config struct that can be passed to the service during initialization.
Issue cadence-workflow#286
  • Loading branch information
Tamer Eldeeb authored Aug 4, 2017
1 parent 76ddae6 commit 719bb1c
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 100 deletions.
9 changes: 5 additions & 4 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
package main

import (
"log"
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/service/frontend"
"github.com/uber/cadence/service/history"
"github.com/uber/cadence/service/matching"
"log"
"time"
)

type (
Expand Down Expand Up @@ -107,11 +108,11 @@ func (s *server) startService() common.Daemon {

switch s.name {
case frontendService:
daemon = frontend.NewService(&params)
daemon = frontend.NewService(&params, frontend.NewConfig())
case historyService:
daemon = history.NewService(&params)
case matchingService:
daemon = matching.NewService(&params)
daemon = matching.NewService(&params, matching.NewConfig())
}

go execute(daemon, s.doneC)
Expand Down
4 changes: 2 additions & 2 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (c *cadenceImpl) startFrontend(logger bark.Logger, rpHosts []string, startW
params.CassandraConfig.Hosts = "127.0.0.1"
service := service.New(params)
var thriftServices []thrift.TChanServer
c.frontendHandler, thriftServices = frontend.NewWorkflowHandler(service, c.metadataMgr, c.historyMgr, c.visibilityMgr)
c.frontendHandler, thriftServices = frontend.NewWorkflowHandler(service, frontend.NewConfig(), c.metadataMgr, c.historyMgr, c.visibilityMgr)
err := c.frontendHandler.Start(thriftServices)
if err != nil {
c.logger.WithField("error", err).Fatal("Failed to start frontend")
Expand Down Expand Up @@ -203,7 +203,7 @@ func (c *cadenceImpl) startMatching(logger bark.Logger, taskMgr persistence.Task
params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards
service := service.New(params)
var thriftServices []thrift.TChanServer
c.matchingHandler, thriftServices = matching.NewHandler(taskMgr, service)
c.matchingHandler, thriftServices = matching.NewHandler(service, matching.NewConfig(), taskMgr)
c.matchingHandler.Start(thriftServices)
startWG.Done()
<-c.shutdownCh
Expand Down
20 changes: 8 additions & 12 deletions service/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type (
metricsClient metrics.Client
startWG sync.WaitGroup
rateLimiter common.TokenBucket
config *Config
service.Service
}

Expand All @@ -69,12 +70,6 @@ type (
}
)

const (
defaultVisibilityMaxPageSize = 1000
defaultHistoryMaxPageSize = 1000
defaultRPS = 1200 // This limit is based on experimental runs.
)

var (
errDomainNotSet = &gen.BadRequestError{Message: "Domain not set on request."}
errTaskTokenNotSet = &gen.BadRequestError{Message: "Task token not set on request."}
Expand All @@ -89,17 +84,18 @@ var (

// NewWorkflowHandler creates a thrift handler for the cadence service
func NewWorkflowHandler(
sVice service.Service, metadataMgr persistence.MetadataManager,
sVice service.Service, config *Config, metadataMgr persistence.MetadataManager,
historyMgr persistence.HistoryManager, visibilityMgr persistence.VisibilityManager) (*WorkflowHandler, []thrift.TChanServer) {
handler := &WorkflowHandler{
Service: sVice,
config: config,
metadataMgr: metadataMgr,
historyMgr: historyMgr,
visibitiltyMgr: visibilityMgr,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
hSerializerFactory: persistence.NewHistorySerializerFactory(),
domainCache: cache.NewDomainCache(metadataMgr, sVice.GetLogger()),
rateLimiter: common.NewTokenBucket(defaultRPS, common.NewRealTimeSource()),
rateLimiter: common.NewTokenBucket(config.RPS, common.NewRealTimeSource()),
}
// prevent us from trying to serve requests before handler's Start() is complete
handler.startWG.Add(1)
Expand Down Expand Up @@ -380,7 +376,7 @@ func (wh *WorkflowHandler) PollForDecisionTask(
if matchingResp.IsSetWorkflowExecution() {
// Non-empty response. Get the history
history, persistenceToken, err = wh.getHistory(
info.ID, *matchingResp.GetWorkflowExecution(), matchingResp.GetStartedEventId()+1, defaultHistoryMaxPageSize, nil)
info.ID, *matchingResp.GetWorkflowExecution(), matchingResp.GetStartedEventId()+1, wh.config.DefaultHistoryMaxPageSize, nil)
if err != nil {
return nil, wh.error(err, scope)
}
Expand Down Expand Up @@ -667,7 +663,7 @@ func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
}

if !getRequest.IsSetMaximumPageSize() || getRequest.GetMaximumPageSize() == 0 {
getRequest.MaximumPageSize = common.Int32Ptr(defaultHistoryMaxPageSize)
getRequest.MaximumPageSize = common.Int32Ptr(wh.config.DefaultHistoryMaxPageSize)
}

domainName := getRequest.GetDomain()
Expand Down Expand Up @@ -911,7 +907,7 @@ func (wh *WorkflowHandler) ListOpenWorkflowExecutions(ctx thrift.Context,
}

if !listRequest.IsSetMaximumPageSize() || listRequest.GetMaximumPageSize() == 0 {
listRequest.MaximumPageSize = common.Int32Ptr(defaultVisibilityMaxPageSize)
listRequest.MaximumPageSize = common.Int32Ptr(wh.config.DefaultVisibilityMaxPageSize)
}

domainName := listRequest.GetDomain()
Expand Down Expand Up @@ -999,7 +995,7 @@ func (wh *WorkflowHandler) ListClosedWorkflowExecutions(ctx thrift.Context,
}

if !listRequest.IsSetMaximumPageSize() || listRequest.GetMaximumPageSize() == 0 {
listRequest.MaximumPageSize = common.Int32Ptr(defaultVisibilityMaxPageSize)
listRequest.MaximumPageSize = common.Int32Ptr(wh.config.DefaultVisibilityMaxPageSize)
}

domainName := listRequest.GetDomain()
Expand Down
22 changes: 20 additions & 2 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,34 @@ import (
"github.com/uber/cadence/common/service"
)

// Config represents configuration for cadence-frontend service
type Config struct {
DefaultVisibilityMaxPageSize int32
DefaultHistoryMaxPageSize int32
RPS int
}

// NewConfig returns new service config with default values
func NewConfig() *Config {
return &Config{
DefaultVisibilityMaxPageSize: 1000,
DefaultHistoryMaxPageSize: 1000,
RPS: 1200, // This limit is based on experimental runs.
}
}

// Service represents the cadence-frontend service
type Service struct {
stopC chan struct{}
config *Config
params *service.BootstrapParams
}

// NewService builds a new cadence-frontend service
func NewService(params *service.BootstrapParams) common.Daemon {
func NewService(params *service.BootstrapParams, config *Config) common.Daemon {
return &Service{
params: params,
config: config,
stopC: make(chan struct{}),
}
}
Expand Down Expand Up @@ -89,7 +107,7 @@ func (s *Service) Start() {

history = persistence.NewHistoryPersistenceClient(history, base.GetMetricsClient())

handler, tchanServers := NewWorkflowHandler(base, metadata, history, visibility)
handler, tchanServers := NewWorkflowHandler(base, s.config, metadata, history, visibility)
handler.Start(tchanServers)

log.Infof("%v started", common.FrontendServiceName)
Expand Down
6 changes: 4 additions & 2 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,18 @@ var _ m.TChanMatchingService = (*Handler)(nil)
type Handler struct {
taskPersistence persistence.TaskManager
engine Engine
config *Config
metricsClient metrics.Client
startWG sync.WaitGroup
service.Service
}

// NewHandler creates a thrift handler for the history service
func NewHandler(taskPersistence persistence.TaskManager, sVice service.Service) (*Handler, []thrift.TChanServer) {
func NewHandler(sVice service.Service, config *Config, taskPersistence persistence.TaskManager) (*Handler, []thrift.TChanServer) {
handler := &Handler{
Service: sVice,
taskPersistence: taskPersistence,
config: config,
}
// prevent us from trying to serve requests before matching engine is started and ready
handler.startWG.Add(1)
Expand All @@ -64,7 +66,7 @@ func (h *Handler) Start(thriftService []thrift.TChanServer) error {
return err
}
h.metricsClient = h.Service.GetMetricsClient()
h.engine = NewEngine(h.taskPersistence, history, h.Service.GetLogger(), h.Service.GetMetricsClient())
h.engine = NewEngine(h.taskPersistence, history, h.config, h.Service.GetLogger(), h.Service.GetMetricsClient())
h.startWG.Done()
return nil
}
Expand Down
48 changes: 16 additions & 32 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package matching
import (
"errors"
"sync"
"time"

"github.com/pborman/uuid"
"github.com/uber-common/bark"
Expand All @@ -35,7 +34,6 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/client/history"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/logging"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand All @@ -46,15 +44,14 @@ import (
// TODO: Switch implementation from lock/channel based to a partitioned agent
// to simplify code and reduce possiblity of synchronization errors.
type matchingEngineImpl struct {
taskManager persistence.TaskManager
historyService history.Client
tokenSerializer common.TaskTokenSerializer
rangeSize int64
logger bark.Logger
metricsClient metrics.Client
longPollExpirationInterval time.Duration
taskListsLock sync.RWMutex // locks mutation of taskLists
taskLists map[taskListID]taskListManager // Convert to LRU cache
taskManager persistence.TaskManager
historyService history.Client
tokenSerializer common.TaskTokenSerializer
logger bark.Logger
metricsClient metrics.Client
taskListsLock sync.RWMutex // locks mutation of taskLists
taskLists map[taskListID]taskListManager // Convert to LRU cache
config *Config
}

type taskListID struct {
Expand All @@ -63,20 +60,14 @@ type taskListID struct {
taskType int
}

const (
defaultLongPollExpirationInterval = time.Minute
emptyGetRetryInitialInterval = 100 * time.Millisecond
emptyGetRetryMaxInterval = 1 * time.Second
)

var (
// EmptyPollForDecisionTaskResponse is the response when there are no decision tasks to hand out
emptyPollForDecisionTaskResponse = m.NewPollForDecisionTaskResponse()
// EmptyPollForActivityTaskResponse is the response when there are no activity tasks to hand out
emptyPollForActivityTaskResponse = workflow.NewPollForActivityTaskResponse()
persistenceOperationRetryPolicy = common.CreatePersistanceRetryPolicy()
historyServiceOperationRetryPolicy = common.CreateHistoryServiceRetryPolicy()
emptyGetTasksRetryPolicy = createEmptyGetTasksRetryPolicy()

// ErrNoTasks is exported temporarily for integration test
ErrNoTasks = errors.New("No tasks")
errPumpClosed = errors.New("Task list pump closed its channel")
Expand All @@ -100,20 +91,20 @@ var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed im
// NewEngine creates an instance of matching engine
func NewEngine(taskManager persistence.TaskManager,
historyService history.Client,
config *Config,
logger bark.Logger,
metricsClient metrics.Client) Engine {

return &matchingEngineImpl{
taskManager: taskManager,
historyService: historyService,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
taskLists: make(map[taskListID]taskListManager),
rangeSize: defaultRangeSize,
longPollExpirationInterval: defaultLongPollExpirationInterval,
taskManager: taskManager,
historyService: historyService,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
taskLists: make(map[taskListID]taskListManager),
logger: logger.WithFields(bark.Fields{
logging.TagWorkflowComponent: logging.TagValueMatchingEngineComponent,
}),
metricsClient: metricsClient,
config: config,
}
}

Expand Down Expand Up @@ -162,7 +153,7 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *taskListID) (taskListM
}
e.taskListsLock.RUnlock()
logging.LogTaskListLoadingEvent(e.logger, taskList.taskListName, taskList.taskType)
mgr := newTaskListManager(e, taskList)
mgr := newTaskListManager(e, taskList, e.config)
e.taskListsLock.Lock()
if result, ok := e.taskLists[*taskList]; ok {
e.taskListsLock.Unlock()
Expand Down Expand Up @@ -411,13 +402,6 @@ func newTaskListID(domainID, taskListName string, taskType int) *taskListID {
return &taskListID{domainID: domainID, taskListName: taskListName, taskType: taskType}
}

func createEmptyGetTasksRetryPolicy() backoff.RetryPolicy {
policy := backoff.NewExponentialRetryPolicy(emptyGetRetryInitialInterval)
policy.SetMaximumInterval(emptyGetRetryMaxInterval)

return policy
}

func workflowExecutionPtr(execution workflow.WorkflowExecution) *workflow.WorkflowExecution {
return &execution
}
Loading

0 comments on commit 719bb1c

Please sign in to comment.