diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 2d23b922448..d08f5f730a8 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -273,6 +273,8 @@ const ( HistoryRecordChildExecutionCompletedScope // HistoryRequestCancelWorkflowExecutionScope tracks RequestCancelWorkflowExecution API calls received by service HistoryRequestCancelWorkflowExecutionScope + // HistoryShardControllerScope is the scope used by shard controller + HistoryShardControllerScope // TransferQueueProcessorScope is the scope used by all metric emitted by transfer queue processor TransferQueueProcessorScope // TransferTaskActivityScope is the scope used for activity task processing by transfer queue processor @@ -403,6 +405,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ HistoryScheduleDecisionTaskScope: {operation: "ScheduleDecisionTask"}, HistoryRecordChildExecutionCompletedScope: {operation: "RecordChildExecutionCompleted"}, HistoryRequestCancelWorkflowExecutionScope: {operation: "RequestCancelWorkflowExecution"}, + HistoryShardControllerScope: {operation: "ShardController"}, TransferQueueProcessorScope: {operation: "TransferQueueProcessor"}, TransferTaskActivityScope: {operation: "TransferTaskActivity"}, TransferTaskDecisionScope: {operation: "TransferTaskDecision"}, @@ -478,6 +481,13 @@ const ( ScheduleToCloseTimeoutCounter NewTimerCounter NewTimerNotifyCounter + AcquireShardsCounter + AcquireShardsLatency + ShardClosedCounter + ShardItemCreatedCounter + ShardItemRemovedCounter + MembershipChangedCounter + NumShardsGauge ) // Matching metrics enum @@ -541,6 +551,13 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ScheduleToCloseTimeoutCounter: {metricName: "schedule-to-close-timeout", metricType: Counter}, NewTimerCounter: {metricName: "new-timer", metricType: Counter}, NewTimerNotifyCounter: {metricName: "new-timer-notifications", metricType: Counter}, + AcquireShardsCounter: {metricName: "acquire-shards-count", metricType: Counter}, + AcquireShardsLatency: {metricName: "acquire-shards-latency", metricType: Timer}, + ShardClosedCounter: {metricName: "shard-closed-count", metricType: Counter}, + ShardItemCreatedCounter: {metricName: "sharditem-created-count", metricType: Counter}, + ShardItemRemovedCounter: {metricName: "sharditem-removed-count", metricType: Counter}, + MembershipChangedCounter: {metricName: "membership-changed-count", metricType: Counter}, + NumShardsGauge: {metricName: "numshards-gauge", metricType: Gauge}, }, Matching: { PollSuccessCounter: {metricName: "poll.success"}, diff --git a/service/history/shardController.go b/service/history/shardController.go index 7617f67d123..d7a9571f6b1 100644 --- a/service/history/shardController.go +++ b/service/history/shardController.go @@ -63,19 +63,17 @@ type ( } historyShardsItem struct { + sync.RWMutex shardID int shardMgr persistence.ShardManager historyMgr persistence.HistoryManager executionMgr persistence.ExecutionManager engineFactory EngineFactory host *membership.HostInfo + engine Engine config *Config logger bark.Logger metricsClient metrics.Client - - sync.RWMutex - engine Engine - context ShardContext } ) @@ -132,7 +130,6 @@ func (c *shardController) Start() { } c.acquireShards() - c.shutdownWG.Add(1) go c.shardManagementPump() @@ -215,6 +212,7 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar return nil, err } c.historyShards[shardID] = shardItem + c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.ShardItemCreatedCounter) logging.LogShardItemCreatedEvent(shardItem.logger, info.Identity(), shardID) return shardItem, nil } @@ -223,51 +221,71 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar } func (c *shardController) removeHistoryShardItem(shardID int) (*historyShardsItem, error) { + nShards := 0 c.Lock() - defer c.Unlock() - item, ok := c.historyShards[shardID] if !ok { + c.Unlock() return nil, fmt.Errorf("No item found to remove for shard: %v", shardID) } - delete(c.historyShards, shardID) - logging.LogShardItemRemovedEvent(item.logger, c.host.Identity(), shardID, len(c.historyShards)) + nShards = len(c.historyShards) + c.Unlock() + c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.ShardItemRemovedCounter) + logging.LogShardItemRemovedEvent(item.logger, c.host.Identity(), shardID, nShards) return item, nil } +// shardManagementPump is the main event loop for +// shardController. It is responsible for acquiring / +// releasing shards in response to any event that can +// change the shard ownership. These events are +// a. Ring membership change +// b. Periodic ticker +// c. ShardOwnershipLostError and subsequent ShardClosedEvents from engine func (c *shardController) shardManagementPump() { + defer c.shutdownWG.Done() acquireTicker := time.NewTicker(c.config.AcquireShardInterval) defer acquireTicker.Stop() + for { + select { case <-c.shutdownCh: - logging.LogShardControllerShuttingDownEvent(c.logger, c.host.Identity()) - c.Lock() - defer c.Unlock() - - for _, item := range c.historyShards { - item.stopEngine() - } - c.historyShards = nil + c.doShutdown() return case <-acquireTicker.C: c.acquireShards() case changedEvent := <-c.membershipUpdateCh: + c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.MembershipChangedCounter) logging.LogRingMembershipChangedEvent(c.logger, c.host.Identity(), len(changedEvent.HostsAdded), len(changedEvent.HostsRemoved), len(changedEvent.HostsUpdated)) c.acquireShards() case shardID := <-c.shardClosedCh: + c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.ShardClosedCounter) logging.LogShardClosedEvent(c.logger, c.host.Identity(), shardID) c.removeEngineForShard(shardID) + // The async close notifications can cause a race + // between acquire/release when nodes are flapping + // The impact of this race is un-necessary shard load/unloads + // even though things will settle eventually + // To reduce the chance of the race happening, lets + // process all closed events at once before we attempt + // to acquire new shards again + c.processShardClosedEvents() } } } func (c *shardController) acquireShards() { + + c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.AcquireShardsCounter) + sw := c.metricsClient.StartTimer(metrics.HistoryShardControllerScope, metrics.AcquireShardsLatency) + defer sw.Stop() + AcquireLoop: for shardID := 0; shardID < c.config.NumberOfShards; shardID++ { info, err := c.hServiceResolver.Lookup(string(shardID)) @@ -287,6 +305,39 @@ AcquireLoop: c.removeEngineForShard(shardID) } } + + c.metricsClient.UpdateGauge(metrics.HistoryShardControllerScope, metrics.NumShardsGauge, float64(c.numShards())) +} + +func (c *shardController) doShutdown() { + logging.LogShardControllerShuttingDownEvent(c.logger, c.host.Identity()) + c.Lock() + defer c.Unlock() + for _, item := range c.historyShards { + item.stopEngine() + } + c.historyShards = nil +} + +func (c *shardController) processShardClosedEvents() { + for { + select { + case shardID := <-c.shardClosedCh: + c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.ShardClosedCounter) + logging.LogShardClosedEvent(c.logger, c.host.Identity(), shardID) + c.removeEngineForShard(shardID) + default: + return + } + } +} + +func (c *shardController) numShards() int { + nShards := 0 + c.RLock() + nShards = len(c.historyShards) + c.RUnlock() + return nShards } func (i *historyShardsItem) getEngine() Engine { @@ -330,11 +381,14 @@ func (i *historyShardsItem) stopEngine() { i.Lock() defer i.Unlock() + if i.executionMgr != nil { + i.executionMgr.Close() + } + if i.engine != nil { logging.LogShardEngineStoppingEvent(i.logger, i.host.Identity(), i.shardID) i.engine.Stop() i.engine = nil - i.executionMgr.Close() logging.LogShardEngineStoppedEvent(i.logger, i.host.Identity(), i.shardID) } }