Skip to content

Commit

Permalink
shardController: fix fd leak, add metrics and process closeShard in b…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Aug 11, 2017
1 parent 886a2dc commit 85ba1df
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 18 deletions.
17 changes: 17 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ const (
HistoryRecordChildExecutionCompletedScope
// HistoryRequestCancelWorkflowExecutionScope tracks RequestCancelWorkflowExecution API calls received by service
HistoryRequestCancelWorkflowExecutionScope
// HistoryShardControllerScope is the scope used by shard controller
HistoryShardControllerScope
// TransferQueueProcessorScope is the scope used by all metric emitted by transfer queue processor
TransferQueueProcessorScope
// TransferTaskActivityScope is the scope used for activity task processing by transfer queue processor
Expand Down Expand Up @@ -403,6 +405,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryScheduleDecisionTaskScope: {operation: "ScheduleDecisionTask"},
HistoryRecordChildExecutionCompletedScope: {operation: "RecordChildExecutionCompleted"},
HistoryRequestCancelWorkflowExecutionScope: {operation: "RequestCancelWorkflowExecution"},
HistoryShardControllerScope: {operation: "ShardController"},
TransferQueueProcessorScope: {operation: "TransferQueueProcessor"},
TransferTaskActivityScope: {operation: "TransferTaskActivity"},
TransferTaskDecisionScope: {operation: "TransferTaskDecision"},
Expand Down Expand Up @@ -478,6 +481,13 @@ const (
ScheduleToCloseTimeoutCounter
NewTimerCounter
NewTimerNotifyCounter
AcquireShardsCounter
AcquireShardsLatency
ShardClosedCounter
ShardItemCreatedCounter
ShardItemRemovedCounter
MembershipChangedCounter
NumShardsGauge
)

// Matching metrics enum
Expand Down Expand Up @@ -541,6 +551,13 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ScheduleToCloseTimeoutCounter: {metricName: "schedule-to-close-timeout", metricType: Counter},
NewTimerCounter: {metricName: "new-timer", metricType: Counter},
NewTimerNotifyCounter: {metricName: "new-timer-notifications", metricType: Counter},
AcquireShardsCounter: {metricName: "acquire-shards-count", metricType: Counter},
AcquireShardsLatency: {metricName: "acquire-shards-latency", metricType: Timer},
ShardClosedCounter: {metricName: "shard-closed-count", metricType: Counter},
ShardItemCreatedCounter: {metricName: "sharditem-created-count", metricType: Counter},
ShardItemRemovedCounter: {metricName: "sharditem-removed-count", metricType: Counter},
MembershipChangedCounter: {metricName: "membership-changed-count", metricType: Counter},
NumShardsGauge: {metricName: "numshards-gauge", metricType: Gauge},
},
Matching: {
PollSuccessCounter: {metricName: "poll.success"},
Expand Down
90 changes: 72 additions & 18 deletions service/history/shardController.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,17 @@ type (
}

historyShardsItem struct {
sync.RWMutex
shardID int
shardMgr persistence.ShardManager
historyMgr persistence.HistoryManager
executionMgr persistence.ExecutionManager
engineFactory EngineFactory
host *membership.HostInfo
engine Engine
config *Config
logger bark.Logger
metricsClient metrics.Client

sync.RWMutex
engine Engine
context ShardContext
}
)

Expand Down Expand Up @@ -132,7 +130,6 @@ func (c *shardController) Start() {
}

c.acquireShards()

c.shutdownWG.Add(1)
go c.shardManagementPump()

Expand Down Expand Up @@ -215,6 +212,7 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar
return nil, err
}
c.historyShards[shardID] = shardItem
c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.ShardItemCreatedCounter)
logging.LogShardItemCreatedEvent(shardItem.logger, info.Identity(), shardID)
return shardItem, nil
}
Expand All @@ -223,51 +221,71 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar
}

func (c *shardController) removeHistoryShardItem(shardID int) (*historyShardsItem, error) {
nShards := 0
c.Lock()
defer c.Unlock()

item, ok := c.historyShards[shardID]
if !ok {
c.Unlock()
return nil, fmt.Errorf("No item found to remove for shard: %v", shardID)
}

delete(c.historyShards, shardID)
logging.LogShardItemRemovedEvent(item.logger, c.host.Identity(), shardID, len(c.historyShards))
nShards = len(c.historyShards)
c.Unlock()

c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.ShardItemRemovedCounter)
logging.LogShardItemRemovedEvent(item.logger, c.host.Identity(), shardID, nShards)
return item, nil
}

// shardManagementPump is the main event loop for
// shardController. It is responsible for acquiring /
// releasing shards in response to any event that can
// change the shard ownership. These events are
// a. Ring membership change
// b. Periodic ticker
// c. ShardOwnershipLostError and subsequent ShardClosedEvents from engine
func (c *shardController) shardManagementPump() {

defer c.shutdownWG.Done()

acquireTicker := time.NewTicker(c.config.AcquireShardInterval)
defer acquireTicker.Stop()

for {

select {
case <-c.shutdownCh:
logging.LogShardControllerShuttingDownEvent(c.logger, c.host.Identity())
c.Lock()
defer c.Unlock()

for _, item := range c.historyShards {
item.stopEngine()
}
c.historyShards = nil
c.doShutdown()
return
case <-acquireTicker.C:
c.acquireShards()
case changedEvent := <-c.membershipUpdateCh:
c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.MembershipChangedCounter)
logging.LogRingMembershipChangedEvent(c.logger, c.host.Identity(), len(changedEvent.HostsAdded),
len(changedEvent.HostsRemoved), len(changedEvent.HostsUpdated))
c.acquireShards()
case shardID := <-c.shardClosedCh:
c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.ShardClosedCounter)
logging.LogShardClosedEvent(c.logger, c.host.Identity(), shardID)
c.removeEngineForShard(shardID)
// The async close notifications can cause a race
// between acquire/release when nodes are flapping
// The impact of this race is un-necessary shard load/unloads
// even though things will settle eventually
// To reduce the chance of the race happening, lets
// process all closed events at once before we attempt
// to acquire new shards again
c.processShardClosedEvents()
}
}
}

func (c *shardController) acquireShards() {

c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.AcquireShardsCounter)
sw := c.metricsClient.StartTimer(metrics.HistoryShardControllerScope, metrics.AcquireShardsLatency)
defer sw.Stop()

AcquireLoop:
for shardID := 0; shardID < c.config.NumberOfShards; shardID++ {
info, err := c.hServiceResolver.Lookup(string(shardID))
Expand All @@ -287,6 +305,39 @@ AcquireLoop:
c.removeEngineForShard(shardID)
}
}

c.metricsClient.UpdateGauge(metrics.HistoryShardControllerScope, metrics.NumShardsGauge, float64(c.numShards()))
}

func (c *shardController) doShutdown() {
logging.LogShardControllerShuttingDownEvent(c.logger, c.host.Identity())
c.Lock()
defer c.Unlock()
for _, item := range c.historyShards {
item.stopEngine()
}
c.historyShards = nil
}

func (c *shardController) processShardClosedEvents() {
for {
select {
case shardID := <-c.shardClosedCh:
c.metricsClient.IncCounter(metrics.HistoryShardControllerScope, metrics.ShardClosedCounter)
logging.LogShardClosedEvent(c.logger, c.host.Identity(), shardID)
c.removeEngineForShard(shardID)
default:
return
}
}
}

func (c *shardController) numShards() int {
nShards := 0
c.RLock()
nShards = len(c.historyShards)
c.RUnlock()
return nShards
}

func (i *historyShardsItem) getEngine() Engine {
Expand Down Expand Up @@ -330,11 +381,14 @@ func (i *historyShardsItem) stopEngine() {
i.Lock()
defer i.Unlock()

if i.executionMgr != nil {
i.executionMgr.Close()
}

if i.engine != nil {
logging.LogShardEngineStoppingEvent(i.logger, i.host.Identity(), i.shardID)
i.engine.Stop()
i.engine = nil
i.executionMgr.Close()
logging.LogShardEngineStoppedEvent(i.logger, i.host.Identity(), i.shardID)
}
}
Expand Down

0 comments on commit 85ba1df

Please sign in to comment.