diff --git a/client/clientfactory.go b/client/clientfactory.go new file mode 100644 index 00000000000..9c4f689716d --- /dev/null +++ b/client/clientfactory.go @@ -0,0 +1,41 @@ +package client + +import ( + "code.uber.internal/devexp/minions/client/history" + "code.uber.internal/devexp/minions/client/matching" + "code.uber.internal/devexp/minions/common/membership" + "code.uber.internal/devexp/minions/common/metrics" + tchannel "github.com/uber/tchannel-go" +) + +// Factory can be used to create RPC clients for cadence services +type Factory interface { + NewHistoryClient() (history.Client, error) + NewMatchingClient() (matching.Client, error) +} + +type tchannelClientFactory struct { + ch *tchannel.Channel + monitor membership.Monitor + metricsClient metrics.Client + numberOfHistoryShards int +} + +// NewTChannelClientFactory creates an instance of client factory using tchannel +func NewTChannelClientFactory(ch *tchannel.Channel, + monitor membership.Monitor, metricsClient metrics.Client, numberOfHistoryShards int) Factory { + return &tchannelClientFactory{ + ch: ch, + monitor: monitor, + metricsClient: metricsClient, + numberOfHistoryShards: numberOfHistoryShards, + } +} + +func (cf *tchannelClientFactory) NewHistoryClient() (history.Client, error) { + return history.NewClient(cf.ch, cf.monitor, cf.numberOfHistoryShards) +} + +func (cf *tchannelClientFactory) NewMatchingClient() (matching.Client, error) { + return matching.NewClient(cf.ch, cf.monitor) +} diff --git a/client/history/client.go b/client/history/client.go index 5fec9ee4816..3f72c197cf4 100644 --- a/client/history/client.go +++ b/client/history/client.go @@ -1,6 +1,7 @@ package history import ( + "sync" "time" "golang.org/x/net/context" @@ -8,53 +9,44 @@ import ( h "code.uber.internal/devexp/minions/.gen/go/history" workflow "code.uber.internal/devexp/minions/.gen/go/shared" "code.uber.internal/devexp/minions/common/membership" + "code.uber.internal/devexp/minions/common/util" tchannel "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/thrift" ) const historyServiceName = "cadence-history" -const shardID = "1" // TODO: actually derive shardID from request var _ Client = (*clientImpl)(nil) type clientImpl struct { - connection *tchannel.Channel - resolver membership.ServiceResolver + connection *tchannel.Channel + resolver membership.ServiceResolver + tokenSerializer util.TaskTokenSerializer + numberOfShards int + // TODO: consider refactor thriftCache into a separate struct + thriftCacheLock sync.RWMutex + thriftCache map[string]h.TChanHistoryService } // NewClient creates a new history service TChannel client -func NewClient(ch *tchannel.Channel, monitor membership.Monitor) (Client, error) { +func NewClient(ch *tchannel.Channel, monitor membership.Monitor, numberOfShards int) (Client, error) { sResolver, err := monitor.GetResolver(historyServiceName) if err != nil { return nil, err } client := &clientImpl{ - connection: ch, - resolver: sResolver, + connection: ch, + resolver: sResolver, + tokenSerializer: util.NewJSONTaskTokenSerializer(), + numberOfShards: numberOfShards, + thriftCache: make(map[string]h.TChanHistoryService), } return client, nil } -func (c *clientImpl) getHostForRequest(key string) (h.TChanHistoryService, error) { - host, err := c.resolver.Lookup(key) - if err != nil { - return nil, err - } - // TODO: build client cache - tClient := thrift.NewClient(c.connection, historyServiceName, &thrift.ClientOptions{ - HostPort: host.GetAddress(), - }) - return h.NewTChanHistoryServiceClient(tClient), nil -} - -func (c *clientImpl) createContext() (thrift.Context, context.CancelFunc) { - // TODO: make timeout configurable - return thrift.NewContext(time.Second * 30) -} - func (c *clientImpl) StartWorkflowExecution(request *workflow.StartWorkflowExecutionRequest) (*workflow.StartWorkflowExecutionResponse, error) { - client, err := c.getHostForRequest(shardID) + client, err := c.getHostForRequest(request.GetWorkflowId()) if err != nil { return nil, err } @@ -65,7 +57,7 @@ func (c *clientImpl) StartWorkflowExecution(request *workflow.StartWorkflowExecu func (c *clientImpl) GetWorkflowExecutionHistory( request *workflow.GetWorkflowExecutionHistoryRequest) (*workflow.GetWorkflowExecutionHistoryResponse, error) { - client, err := c.getHostForRequest(shardID) + client, err := c.getHostForRequest(request.Execution.GetWorkflowId()) if err != nil { return nil, err } @@ -75,7 +67,7 @@ func (c *clientImpl) GetWorkflowExecutionHistory( } func (c *clientImpl) RecordDecisionTaskStarted(request *h.RecordDecisionTaskStartedRequest) (*h.RecordDecisionTaskStartedResponse, error) { - client, err := c.getHostForRequest(shardID) + client, err := c.getHostForRequest(request.WorkflowExecution.GetWorkflowId()) if err != nil { return nil, err } @@ -85,7 +77,7 @@ func (c *clientImpl) RecordDecisionTaskStarted(request *h.RecordDecisionTaskStar } func (c *clientImpl) RecordActivityTaskStarted(request *h.RecordActivityTaskStartedRequest) (*h.RecordActivityTaskStartedResponse, error) { - client, err := c.getHostForRequest(shardID) + client, err := c.getHostForRequest(request.WorkflowExecution.GetWorkflowId()) if err != nil { return nil, err } @@ -95,7 +87,11 @@ func (c *clientImpl) RecordActivityTaskStarted(request *h.RecordActivityTaskStar } func (c *clientImpl) RespondDecisionTaskCompleted(request *workflow.RespondDecisionTaskCompletedRequest) error { - client, err := c.getHostForRequest(shardID) + taskToken, err := c.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return err + } + client, err := c.getHostForRequest(taskToken.WorkflowID) if err != nil { return err } @@ -105,7 +101,11 @@ func (c *clientImpl) RespondDecisionTaskCompleted(request *workflow.RespondDecis } func (c *clientImpl) RespondActivityTaskCompleted(request *workflow.RespondActivityTaskCompletedRequest) error { - client, err := c.getHostForRequest(shardID) + taskToken, err := c.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return err + } + client, err := c.getHostForRequest(taskToken.WorkflowID) if err != nil { return err } @@ -115,7 +115,11 @@ func (c *clientImpl) RespondActivityTaskCompleted(request *workflow.RespondActiv } func (c *clientImpl) RespondActivityTaskFailed(request *workflow.RespondActivityTaskFailedRequest) error { - client, err := c.getHostForRequest(shardID) + taskToken, err := c.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return err + } + client, err := c.getHostForRequest(taskToken.WorkflowID) if err != nil { return err } @@ -125,7 +129,11 @@ func (c *clientImpl) RespondActivityTaskFailed(request *workflow.RespondActivity } func (c *clientImpl) RecordActivityTaskHeartbeat(request *workflow.RecordActivityTaskHeartbeatRequest) (*workflow.RecordActivityTaskHeartbeatResponse, error) { - client, err := c.getHostForRequest(shardID) + taskToken, err := c.tokenSerializer.Deserialize(request.TaskToken) + if err != nil { + return nil, err + } + client, err := c.getHostForRequest(taskToken.WorkflowID) if err != nil { return nil, err } @@ -133,3 +141,43 @@ func (c *clientImpl) RecordActivityTaskHeartbeat(request *workflow.RecordActivit defer cancel() return client.RecordActivityTaskHeartbeat(ctx, request) } + +func (c *clientImpl) getHostForRequest(workflowID string) (h.TChanHistoryService, error) { + key := util.WorkflowIDToHistoryShard(workflowID, c.numberOfShards) + host, err := c.resolver.Lookup(string(key)) + if err != nil { + return nil, err + } + + return c.getThriftClient(host.GetAddress()), nil +} + +func (c *clientImpl) createContext() (thrift.Context, context.CancelFunc) { + // TODO: make timeout configurable + return thrift.NewContext(time.Second * 30) +} + +func (c *clientImpl) getThriftClient(hostPort string) h.TChanHistoryService { + c.thriftCacheLock.RLock() + client, ok := c.thriftCache[hostPort] + c.thriftCacheLock.RUnlock() + if ok { + return client + } + + c.thriftCacheLock.Lock() + defer c.thriftCacheLock.Unlock() + + // check again if in the cache cause it might have been added + // before we acquired the lock + client, ok = c.thriftCache[hostPort] + if !ok { + tClient := thrift.NewClient(c.connection, historyServiceName, &thrift.ClientOptions{ + HostPort: hostPort, + }) + + client = h.NewTChanHistoryServiceClient(tClient) + c.thriftCache[hostPort] = client + } + return client +} diff --git a/common/clientfactory.go b/common/clientfactory.go deleted file mode 100644 index 763a6e2e22f..00000000000 --- a/common/clientfactory.go +++ /dev/null @@ -1,34 +0,0 @@ -package common - -import ( - "code.uber.internal/devexp/minions/client/history" - "code.uber.internal/devexp/minions/client/matching" - "code.uber.internal/devexp/minions/common/membership" - tchannel "github.com/uber/tchannel-go" -) - -// ClientFactory can be used to create RPC clients for cadence services -type ClientFactory interface { - NewHistoryClient() (history.Client, error) - NewMatchingClient() (matching.Client, error) -} - -type tchannelClientFactory struct { - ch *tchannel.Channel - monitor membership.Monitor -} - -func newTChannelClientFactory(ch *tchannel.Channel, monitor membership.Monitor) ClientFactory { - return &tchannelClientFactory{ - ch: ch, - monitor: monitor, - } -} - -func (cf *tchannelClientFactory) NewHistoryClient() (history.Client, error) { - return history.NewClient(cf.ch, cf.monitor) -} - -func (cf *tchannelClientFactory) NewMatchingClient() (matching.Client, error) { - return matching.NewClient(cf.ch, cf.monitor) -} diff --git a/common/service.go b/common/service.go index c3594c672d9..848fd3d6cd4 100644 --- a/common/service.go +++ b/common/service.go @@ -6,6 +6,7 @@ import ( "os" "time" + "code.uber.internal/devexp/minions/client" "code.uber.internal/devexp/minions/common/logging" "code.uber.internal/devexp/minions/common/membership" "code.uber.internal/devexp/minions/common/metrics" @@ -40,7 +41,8 @@ type serviceImpl struct { rpSeedHosts []string membershipMonitor membership.Monitor tchannelFactory TChannelFactory - clientFactory ClientFactory + clientFactory client.Factory + numberOfHistoryShards int logger bark.Logger metricsScope tally.Scope runtimeMetricsReporter *metrics.RuntimeMetricsReporter @@ -48,15 +50,18 @@ type serviceImpl struct { // NewService instantiates a ServiceInstance // TODO: have a better name for Service. +// TODO: consider passing a config object if the parameter list gets too big // this is the object which holds all the common stuff // shared by all the services. -func NewService(serviceName string, logger bark.Logger, scope tally.Scope, tchanFactory TChannelFactory, rpHosts []string) Service { +func NewService(serviceName string, logger bark.Logger, + scope tally.Scope, tchanFactory TChannelFactory, rpHosts []string, numberOfHistoryShards int) Service { sVice := &serviceImpl{ - sName: serviceName, - logger: logger.WithField("Service", serviceName), - tchannelFactory: tchanFactory, - rpSeedHosts: rpHosts, - metricsScope: scope, + sName: serviceName, + logger: logger.WithField("Service", serviceName), + tchannelFactory: tchanFactory, + rpSeedHosts: rpHosts, + metricsScope: scope, + numberOfHistoryShards: numberOfHistoryShards, } sVice.runtimeMetricsReporter = metrics.NewRuntimeMetricsReporter(scope, time.Minute, sVice.logger) @@ -126,7 +131,8 @@ func (h *serviceImpl) Start(thriftServices []thrift.TChanServer) { } h.hostInfo = hostInfo - h.clientFactory = newTChannelClientFactory(h.ch, h.membershipMonitor) + metricsClient := metrics.NewClient(h.metricsScope, h.getMetricsServiceIdx(h.sName)) + h.clientFactory = client.NewTChannelClientFactory(h.ch, h.membershipMonitor, metricsClient, h.numberOfHistoryShards) // The service is now started up log.Info("service started") @@ -161,7 +167,7 @@ func (h *serviceImpl) GetMetricsScope() tally.Scope { return h.metricsScope } -func (h *serviceImpl) GetClientFactory() ClientFactory { +func (h *serviceImpl) GetClientFactory() client.Factory { return h.clientFactory } @@ -192,3 +198,9 @@ func (h *serviceImpl) bootstrapRingpop(rp *ringpop.Ringpop, rpHosts []string) er _, err := rp.Bootstrap(bOptions) return err } + +func (h *serviceImpl) getMetricsServiceIdx(serviceName string) metrics.ServiceIdx { + // for now we always use frontend for all metrics + // TODO: return proper index based on service name once per-service metrics are defined + return metrics.Frontend +} diff --git a/common/serviceinterfaces.go b/common/serviceinterfaces.go index 931057060e9..58b108193c9 100644 --- a/common/serviceinterfaces.go +++ b/common/serviceinterfaces.go @@ -5,6 +5,7 @@ import ( "github.com/uber-go/tally" "github.com/uber/tchannel-go/thrift" + "code.uber.internal/devexp/minions/client" "code.uber.internal/devexp/minions/common/membership" ) @@ -24,7 +25,7 @@ type ( GetMetricsScope() tally.Scope - GetClientFactory() ClientFactory + GetClientFactory() client.Factory GetMembershipMonitor() membership.Monitor diff --git a/common/jsonTaskTokenSerializer.go b/common/util/jsonTaskTokenSerializer.go similarity index 97% rename from common/jsonTaskTokenSerializer.go rename to common/util/jsonTaskTokenSerializer.go index c3e21811d99..6e4040dbb45 100644 --- a/common/jsonTaskTokenSerializer.go +++ b/common/util/jsonTaskTokenSerializer.go @@ -1,4 +1,4 @@ -package common +package util import "encoding/json" diff --git a/common/taskTokenSerializerInterfaces.go b/common/util/taskTokenSerializerInterfaces.go similarity index 95% rename from common/taskTokenSerializerInterfaces.go rename to common/util/taskTokenSerializerInterfaces.go index 8c9357fa73f..53252204262 100644 --- a/common/taskTokenSerializerInterfaces.go +++ b/common/util/taskTokenSerializerInterfaces.go @@ -1,4 +1,4 @@ -package common +package util type ( // TaskTokenSerializer serializes task tokens diff --git a/common/util/util.go b/common/util/util.go index c31aa85df9c..2bdf9d75744 100644 --- a/common/util/util.go +++ b/common/util/util.go @@ -4,6 +4,8 @@ import ( "sync" "time" + farm "github.com/dgryski/go-farm" + workflow "code.uber.internal/devexp/minions/.gen/go/shared" "code.uber.internal/devexp/minions/common/backoff" ) @@ -74,3 +76,9 @@ func IsPersistenceTransientError(err error) bool { return false } + +// WorkflowIDToHistoryShard is used to map workflowID to a shardID +func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int { + hash := farm.Fingerprint32([]byte(workflowID)) + return int(hash % uint32(numberOfShards)) +} diff --git a/host/integration_test.go b/host/integration_test.go index 3879a3c3109..99fb4a7245f 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -48,6 +48,10 @@ var ( integration = flag.Bool("integration", true, "run integration tests") ) +const ( + testNumberOfHistoryShards = 4 +) + type ( integrationSuite struct { host Cadence @@ -105,7 +109,9 @@ func (s *integrationSuite) SetupTest() { options.SchemaDir = ".." s.SetupWorkflowStoreWithOptions(options) - s.host = NewCadence(s.ShardMgr, s.WorkflowMgr, s.TaskMgr, s.logger) + s.setupShards() + + s.host = NewCadence(s.ShardMgr, s.WorkflowMgr, s.TaskMgr, testNumberOfHistoryShards, s.logger) s.host.Start() s.engine, _ = frontend.NewClient(s.ch, s.host.FrontendAddress()) } @@ -719,3 +725,13 @@ func (s *integrationSuite) TestSequential_UserTimers() { s.Nil(poller.pollAndProcessDecisionTask(true, false)) s.True(workflowComplete) } + +func (s *integrationSuite) setupShards() { + // shard 0 is always created, we create additional shards if needed + for shardID := 1; shardID < testNumberOfHistoryShards; shardID++ { + err := s.CreateShard(shardID, "", 0) + if err != nil { + s.logger.WithField("error", err).Fatal("Failed to create shard") + } + } +} diff --git a/host/onebox.go b/host/onebox.go index 6b5e38a1037..21ef6157865 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -25,26 +25,28 @@ type Cadence interface { } type cadenceImpl struct { - frontendHandler *frontend.WorkflowHandler - matchingHandler *matching.Handler - historyHandler *history.Handler - logger bark.Logger - shardMgr persistence.ShardManager - taskMgr persistence.TaskManager - executionMgr persistence.ExecutionManager - shutdownCh chan struct{} - shutdownWG sync.WaitGroup + frontendHandler *frontend.WorkflowHandler + matchingHandler *matching.Handler + historyHandler *history.Handler + numberOfHistoryShards int + logger bark.Logger + shardMgr persistence.ShardManager + taskMgr persistence.TaskManager + executionMgr persistence.ExecutionManager + shutdownCh chan struct{} + shutdownWG sync.WaitGroup } // NewCadence returns an instance that hosts full cadence in one process func NewCadence(shardMgr persistence.ShardManager, executionMgr persistence.ExecutionManager, - taskMgr persistence.TaskManager, logger bark.Logger) Cadence { + taskMgr persistence.TaskManager, numberOfHistoryShards int, logger bark.Logger) Cadence { return &cadenceImpl{ - logger: logger, - shardMgr: shardMgr, - taskMgr: taskMgr, - executionMgr: executionMgr, - shutdownCh: make(chan struct{}), + numberOfHistoryShards: numberOfHistoryShards, + logger: logger, + shardMgr: shardMgr, + taskMgr: taskMgr, + executionMgr: executionMgr, + shutdownCh: make(chan struct{}), } } @@ -95,7 +97,7 @@ func (c *cadenceImpl) startFrontend(logger bark.Logger, rpHosts []string, startW return c.createTChannel(sName, c.FrontendAddress(), thriftServices) } scope := tally.NewTestScope("cadence-frontend", make(map[string]string)) - service := common.NewService("cadence-frontend", logger, scope, tchanFactory, rpHosts) + service := common.NewService("cadence-frontend", logger, scope, tchanFactory, rpHosts, c.numberOfHistoryShards) var thriftServices []thrift.TChanServer c.frontendHandler, thriftServices = frontend.NewWorkflowHandler(service) err := c.frontendHandler.Start(thriftServices) @@ -113,9 +115,9 @@ func (c *cadenceImpl) startHistory(logger bark.Logger, shardMgr persistence.Shar return c.createTChannel(sName, c.HistoryServiceAddress(), thriftServices) } scope := tally.NewTestScope("cadence-history", make(map[string]string)) - service := common.NewService("cadence-history", logger, scope, tchanFactory, rpHosts) + service := common.NewService("cadence-history", logger, scope, tchanFactory, rpHosts, c.numberOfHistoryShards) var thriftServices []thrift.TChanServer - c.historyHandler, thriftServices = history.NewHandler(service, shardMgr, executionMgr) + c.historyHandler, thriftServices = history.NewHandler(service, shardMgr, executionMgr, c.numberOfHistoryShards) c.historyHandler.Start(thriftServices) startWG.Done() <-c.shutdownCh @@ -128,7 +130,7 @@ func (c *cadenceImpl) startMatching(logger bark.Logger, taskMgr persistence.Task return c.createTChannel(sName, c.MatchingServiceAddress(), thriftServices) } scope := tally.NewTestScope("cadence-matching", make(map[string]string)) - service := common.NewService("cadence-matching", logger, scope, tchanFactory, rpHosts) + service := common.NewService("cadence-matching", logger, scope, tchanFactory, rpHosts, c.numberOfHistoryShards) var thriftServices []thrift.TChanServer c.matchingHandler, thriftServices = matching.NewHandler(taskMgr, service) c.matchingHandler.Start(thriftServices) diff --git a/service/history/handler.go b/service/history/handler.go index efaee151cf7..d0e9b2c808d 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -8,14 +8,10 @@ import ( "code.uber.internal/devexp/minions/client/matching" "code.uber.internal/devexp/minions/common" "code.uber.internal/devexp/minions/common/persistence" + "code.uber.internal/devexp/minions/common/util" "github.com/uber/tchannel-go/thrift" ) -const ( - // TODO: Move this to config knob - numberOfShards = 1 -) - // Handler - Thrift handler inteface for history service type Handler struct { numberOfShards int @@ -23,7 +19,7 @@ type Handler struct { executionManager persistence.ExecutionManager matchingServiceClient matching.Client controller *shardController - tokenSerializer common.TaskTokenSerializer + tokenSerializer util.TaskTokenSerializer common.Service } @@ -32,13 +28,13 @@ var _ EngineFactory = (*Handler)(nil) // NewHandler creates a thrift handler for the history service func NewHandler(sVice common.Service, shardManager persistence.ShardManager, - executionPersistence persistence.ExecutionManager) (*Handler, []thrift.TChanServer) { + executionPersistence persistence.ExecutionManager, numberOfShards int) (*Handler, []thrift.TChanServer) { handler := &Handler{ Service: sVice, shardManager: shardManager, executionManager: executionPersistence, numberOfShards: numberOfShards, - tokenSerializer: common.NewJSONTaskTokenSerializer(), + tokenSerializer: util.NewJSONTaskTokenSerializer(), } return handler, []thrift.TChanServer{h.NewTChanHistoryServiceServer(handler)} } diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 13a68e29bab..9a431971544 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -22,7 +22,7 @@ type ( executionManager persistence.ExecutionManager txProcessor transferQueueProcessor timerProcessor timerQueueProcessor - tokenSerializer common.TaskTokenSerializer + tokenSerializer util.TaskTokenSerializer tracker *pendingTaskTracker logger bark.Logger } @@ -89,7 +89,7 @@ func NewEngineWithShardContext(shard ShardContext, executionManager persistence. shard: shard, executionManager: executionManager, txProcessor: txProcessor, - tokenSerializer: common.NewJSONTaskTokenSerializer(), + tokenSerializer: util.NewJSONTaskTokenSerializer(), tracker: tracker, logger: logger.WithFields(bark.Fields{ tagWorkflowComponent: tagValueWorkflowEngineComponent, diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index 8308fb135d9..dc3435e533d 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -16,6 +16,7 @@ import ( "code.uber.internal/devexp/minions/common" "code.uber.internal/devexp/minions/common/mocks" "code.uber.internal/devexp/minions/common/persistence" + "code.uber.internal/devexp/minions/common/util" ) type ( @@ -66,7 +67,7 @@ func (s *engineSuite) SetupTest() { txProcessor: txProcessor, tracker: tracker, logger: s.logger, - tokenSerializer: common.NewJSONTaskTokenSerializer(), + tokenSerializer: util.NewJSONTaskTokenSerializer(), } h.timerProcessor = newTimerQueueProcessor(h, s.mockExecutionMgr, s.logger) s.mockHistoryEngine = h @@ -93,7 +94,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedInvalidToken() { } func (s *engineSuite) TestRespondDecisionTaskCompletedIfNoExecution() { - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -111,7 +112,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedIfNoExecution() { } func (s *engineSuite) TestRespondDecisionTaskCompletedIfGetExecutionFailed() { - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -129,7 +130,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedIfGetExecutionFailed() { func (s *engineSuite) TestRespondDecisionTaskCompletedUpdateExecutionFailed() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -170,7 +171,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedUpdateExecutionFailed() { func (s *engineSuite) TestRespondDecisionTaskCompletedIfTaskCompleted() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -211,7 +212,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedIfTaskCompleted() { func (s *engineSuite) TestRespondDecisionTaskCompletedIfTaskNotStarted() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -273,7 +274,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedConflictOnUpdate() { decisionScheduledEvent2 := addDecisionTaskScheduledEvent(builder, tl, 30) decisionStartedEvent2 := addDecisionTaskStartedEvent(builder, decisionScheduledEvent2.GetEventId(), tl, identity) - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: decisionScheduledEvent2.GetEventId(), @@ -353,7 +354,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedConflictOnUpdate() { func (s *engineSuite) TestRespondDecisionTaskCompletedMaxAttemptsExceeded() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -433,7 +434,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedCompleteWorkflowFailed() { decisionScheduledEvent2 := addDecisionTaskScheduledEvent(builder, tl, 30) decisionStartedEvent2 := addDecisionTaskStartedEvent(builder, decisionScheduledEvent2.GetEventId(), tl, identity) - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: decisionScheduledEvent2.GetEventId(), @@ -513,7 +514,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedFailWorkflowFailed() { decisionScheduledEvent2 := addDecisionTaskScheduledEvent(builder, tl, 30) decisionStartedEvent2 := addDecisionTaskStartedEvent(builder, decisionScheduledEvent2.GetEventId(), tl, identity) - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: decisionScheduledEvent2.GetEventId(), @@ -565,7 +566,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedFailWorkflowFailed() { func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledDecision() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -637,7 +638,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledDec func (s *engineSuite) TestRespondDecisionTaskCompletedCompleteWorkflowSuccess() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -696,7 +697,7 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedCompleteWorkflowSuccess() func (s *engineSuite) TestRespondDecisionTaskCompletedFailWorkflowSuccess() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -771,7 +772,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedInvalidToken() { } func (s *engineSuite) TestRespondActivityTaskCompletedIfNoExecution() { - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -789,7 +790,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedIfNoExecution() { } func (s *engineSuite) TestRespondActivityTaskCompletedIfGetExecutionFailed() { - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -807,7 +808,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedIfGetExecutionFailed() { func (s *engineSuite) TestRespondActivityTaskCompletedUpdateExecutionFailed() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -849,7 +850,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedUpdateExecutionFailed() { func (s *engineSuite) TestRespondActivityTaskCompletedIfTaskCompleted() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -894,7 +895,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedIfTaskCompleted() { func (s *engineSuite) TestRespondActivityTaskCompletedIfTaskNotStarted() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -934,7 +935,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedIfTaskNotStarted() { func (s *engineSuite) TestRespondActivityTaskCompletedConflictOnUpdate() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1008,7 +1009,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedConflictOnUpdate() { func (s *engineSuite) TestRespondActivityTaskCompletedMaxAttemptsExceeded() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1051,7 +1052,7 @@ func (s *engineSuite) TestRespondActivityTaskCompletedMaxAttemptsExceeded() { func (s *engineSuite) TestRespondActivityTaskCompletedSuccess() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1123,7 +1124,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedInvalidToken() { } func (s *engineSuite) TestRespondActivityTaskFailedIfNoExecution() { - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -1141,7 +1142,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedIfNoExecution() { } func (s *engineSuite) TestRespondActivityTaskFailedIfGetExecutionFailed() { - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 2, @@ -1159,7 +1160,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedIfGetExecutionFailed() { func (s *engineSuite) TestRespondActivityTaskFailedUpdateExecutionFailed() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1200,7 +1201,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedUpdateExecutionFailed() { func (s *engineSuite) TestRespondActivityTaskFailedIfTaskCompleted() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1247,7 +1248,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedIfTaskCompleted() { func (s *engineSuite) TestRespondActivityTaskFailedIfTaskNotStarted() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1285,7 +1286,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedIfTaskNotStarted() { func (s *engineSuite) TestRespondActivityTaskFailedConflictOnUpdate() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1362,7 +1363,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedConflictOnUpdate() { func (s *engineSuite) TestRespondActivityTaskFailedMaxAttemptsExceeded() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1403,7 +1404,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedMaxAttemptsExceeded() { func (s *engineSuite) TestRespondActivityTaskFailedSuccess() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1466,7 +1467,7 @@ func (s *engineSuite) TestRespondActivityTaskFailedSuccess() { func (s *engineSuite) TestRecordActivityTaskHeartBeatSuccess_NoTimer() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, @@ -1501,7 +1502,7 @@ func (s *engineSuite) TestRecordActivityTaskHeartBeatSuccess_NoTimer() { func (s *engineSuite) TestRecordActivityTaskHeartBeatSuccess_TimerRunning() { tl := "testTaskList" - taskToken, _ := json.Marshal(&common.TaskToken{ + taskToken, _ := json.Marshal(&util.TaskToken{ WorkflowID: "wId", RunID: "rId", ScheduleID: 5, diff --git a/service/history/shardController.go b/service/history/shardController.go index cd934a28330..70f7c23c5ad 100644 --- a/service/history/shardController.go +++ b/service/history/shardController.go @@ -6,7 +6,6 @@ import ( "sync/atomic" "time" - "github.com/dgryski/go-farm" "github.com/uber-common/bark" "code.uber.internal/devexp/minions/common/membership" @@ -111,7 +110,7 @@ func (c *shardController) Stop() { } func (c *shardController) GetEngine(workflowID string) (Engine, error) { - shardID := WorkflowIDToShard(workflowID) + shardID := util.WorkflowIDToHistoryShard(workflowID, c.numberOfShards) return c.getEngineForShard(shardID) } @@ -268,9 +267,3 @@ func (i *historyShardsItem) stopEngine() { i.engine.Stop() } } - -// WorkflowIDToShard is used to hash workflowID to a shardID -func WorkflowIDToShard(workflowID string) int { - hash := farm.Fingerprint32([]byte(workflowID)) - return int(hash % numberOfShards) -} diff --git a/service/history/timerQueueProcessor_test.go b/service/history/timerQueueProcessor_test.go index 5543a885b06..a3a6999ea60 100644 --- a/service/history/timerQueueProcessor_test.go +++ b/service/history/timerQueueProcessor_test.go @@ -9,6 +9,7 @@ import ( "code.uber.internal/devexp/minions/common" "code.uber.internal/devexp/minions/common/mocks" "code.uber.internal/devexp/minions/common/persistence" + "code.uber.internal/devexp/minions/common/util" workflow "code.uber.internal/devexp/minions/.gen/go/shared" log "github.com/Sirupsen/logrus" @@ -57,7 +58,7 @@ func (s *timerQueueProcessorSuite) SetupSuite() { txProcessor: txProcessor, logger: s.logger, tracker: tracker, - tokenSerializer: common.NewJSONTaskTokenSerializer(), + tokenSerializer: util.NewJSONTaskTokenSerializer(), } } diff --git a/service/matching/matchingEngine.go b/service/matching/matchingEngine.go index 3576a814318..4e98aaa31c5 100644 --- a/service/matching/matchingEngine.go +++ b/service/matching/matchingEngine.go @@ -24,7 +24,7 @@ const defaultRangeSize = 100000 type matchingEngineImpl struct { taskManager persistence.TaskManager historyService history.Client - tokenSerializer common.TaskTokenSerializer + tokenSerializer util.TaskTokenSerializer taskLists map[taskListID]*taskListContext rangeSize int64 logger bark.Logger @@ -81,7 +81,7 @@ func NewEngine(taskManager persistence.TaskManager, historyService history.Clien return &matchingEngineImpl{ taskManager: taskManager, historyService: historyService, - tokenSerializer: common.NewJSONTaskTokenSerializer(), + tokenSerializer: util.NewJSONTaskTokenSerializer(), taskLists: make(map[taskListID]*taskListContext), rangeSize: defaultRangeSize, logger: logger.WithFields(bark.Fields{ @@ -327,7 +327,7 @@ func (e *matchingEngineImpl) createPollForDecisionTaskResponse(context *taskCont response := workflow.NewPollForDecisionTaskResponse() response.WorkflowExecution = workflowExecutionPtr(context.workflowExecution) - token := &common.TaskToken{ + token := &util.TaskToken{ WorkflowID: task.WorkflowID, RunID: task.RunID, ScheduleID: task.ScheduleID, @@ -359,7 +359,7 @@ func (e *matchingEngineImpl) createPollForActivityTaskResponse(context *taskCont response.StartedEventId = common.Int64Ptr(startedEvent.GetEventId()) response.WorkflowExecution = workflowExecutionPtr(context.workflowExecution) - token := &common.TaskToken{ + token := &util.TaskToken{ WorkflowID: task.WorkflowID, RunID: task.RunID, ScheduleID: task.ScheduleID, diff --git a/service/matching/matchingEngine_test.go b/service/matching/matchingEngine_test.go index eef25c944c8..3ef60b3e565 100644 --- a/service/matching/matchingEngine_test.go +++ b/service/matching/matchingEngine_test.go @@ -19,6 +19,7 @@ import ( "code.uber.internal/devexp/minions/common" "code.uber.internal/devexp/minions/common/mocks" "code.uber.internal/devexp/minions/common/persistence" + "code.uber.internal/devexp/minions/common/util" ) type ( @@ -56,7 +57,7 @@ func (s *matchingEngineSuite) SetupTest() { historyService: s.historyClient, taskLists: make(map[taskListID]*taskListContext), logger: s.logger, - tokenSerializer: common.NewJSONTaskTokenSerializer(), + tokenSerializer: util.NewJSONTaskTokenSerializer(), } } @@ -167,7 +168,7 @@ func (s *matchingEngineSuite) TestPollForActivityTasks() { s.EqualValues(activityInput, result.Input) s.EqualValues(startedID, *result.StartedEventId) s.EqualValues(workflowExecution, result.WorkflowExecution) - token := &common.TaskToken{ + token := &util.TaskToken{ WorkflowID: workflowID, RunID: runID, ScheduleID: scheduleID,