Skip to content

Commit

Permalink
shardController: improvements for graceful shutdown (temporalio#3136)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Mar 26, 2020
1 parent 4d80f0b commit a5111a6
Show file tree
Hide file tree
Showing 17 changed files with 101 additions and 143 deletions.
12 changes: 10 additions & 2 deletions service/history/MockQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ func (_m *MockQueueAckMgr) getQueueReadLevel() int64 {
}

// updateQueueAckLevel is mock implementation for updateQueueAckLevel of QueueAckMgr
func (_m *MockQueueAckMgr) updateQueueAckLevel() {
_m.Called()
func (_m *MockQueueAckMgr) updateQueueAckLevel() error {
ret := _m.Called()

var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
12 changes: 10 additions & 2 deletions service/history/MockTimerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ func (_m *MockTimerQueueAckMgr) getReadLevel() timerKey {
return r0
}

func (_m *MockTimerQueueAckMgr) updateAckLevel() {
_m.Called()
func (_m *MockTimerQueueAckMgr) updateAckLevel() error {
ret := _m.Called()

var r0 error
if rf, ok := ret.Get(0).(func() error); ok {
r0 = rf()
} else {
r0 = ret.Error(0)
}
return r0
}
4 changes: 2 additions & 2 deletions service/history/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,12 +671,12 @@ func (h *Handler) RemoveTask(
return err
}

// CloseShard returns information about the internal states of a history host
// CloseShard closes a shard hosted by this instance
func (h *Handler) CloseShard(
ctx context.Context,
request *gen.CloseShardRequest,
) (retError error) {
h.controller.removeEngineForShard(int(request.GetShardID()))
h.controller.removeEngineForShard(int(request.GetShardID()), nil)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions service/history/historyEngineInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type (
completeQueueTask(taskID int64)
getQueueAckLevel() int64
getQueueReadLevel() int64
updateQueueAckLevel()
updateQueueAckLevel() error
}

queueTaskInfo interface {
Expand Down Expand Up @@ -119,7 +119,7 @@ type (
completeTimerTask(timerTask *persistence.TimerTaskInfo)
getAckLevel() timerKey
getReadLevel() timerKey
updateAckLevel()
updateAckLevel() error
}

historyEventNotifier interface {
Expand Down
6 changes: 4 additions & 2 deletions service/history/queueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (a *queueAckMgrImpl) getFinishedChan() <-chan struct{} {
return a.finishedChan
}

func (a *queueAckMgrImpl) updateQueueAckLevel() {
func (a *queueAckMgrImpl) updateQueueAckLevel() error {
a.metricsClient.IncCounter(a.options.MetricScope, metrics.AckLevelUpdateCounter)

a.Lock()
Expand Down Expand Up @@ -213,12 +213,14 @@ MoveAckLevelLoop:
if err != nil {
a.logger.Error("Error shutdown queue", tag.Error(err))
}
return
return nil
}

a.Unlock()
if err := a.processor.updateAckLevel(ackLevel); err != nil {
a.metricsClient.IncCounter(a.options.MetricScope, metrics.AckLevelUpdateFailedCounter)
a.logger.Error("Error updating ack level for shard", tag.Error(err), tag.OperationFailed)
return err
}
return nil
}
6 changes: 5 additions & 1 deletion service/history/queueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,11 @@ processorPumpLoop:
p.options.UpdateAckInterval(),
p.options.UpdateAckIntervalJitterCoefficient(),
))
p.ackMgr.updateQueueAckLevel()
if err := p.ackMgr.updateQueueAckLevel(); err == ErrShardClosed {
// shard is no longer owned by this instance, bail out
go p.Stop()
break processorPumpLoop
}
case <-redispatchTimer.C:
redispatchTimer.Reset(backoff.JitDuration(
p.options.RedispatchInterval(),
Expand Down
1 change: 0 additions & 1 deletion service/history/replicationDLQHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ func (s *replicationDLQHandlerSuite) SetupTest() {
ReplicationDLQAckLevel: map[string]int64{"test": -1}},
transferSequenceNumber: 1,
maxTransferSequenceNumber: 100000,
closeCh: make(chan int, 100),
config: NewDynamicConfigForTest(),
logger: logger,
remoteClusterCurrentTime: make(map[string]time.Time),
Expand Down
1 change: 0 additions & 1 deletion service/history/replicationTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ func (s *replicationTaskExecutorSuite) SetupTest() {
ReplicationDLQAckLevel: map[string]int64{"test": -1}},
transferSequenceNumber: 1,
maxTransferSequenceNumber: 100000,
closeCh: make(chan int, 100),
config: NewDynamicConfigForTest(),
logger: logger,
remoteClusterCurrentTime: make(map[string]time.Time),
Expand Down
1 change: 0 additions & 1 deletion service/history/replicationTaskProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ func (s *replicationTaskProcessorSuite) SetupTest() {
shardInfo: &persistence.ShardInfo{ShardID: 0, RangeID: 1, TransferAckLevel: 0},
transferSequenceNumber: 1,
maxTransferSequenceNumber: 100000,
closeCh: make(chan int, 100),
config: NewDynamicConfigForTest(),
logger: logger,
remoteClusterCurrentTime: make(map[string]time.Time),
Expand Down
46 changes: 31 additions & 15 deletions service/history/shardContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package history

import (
"errors"
"fmt"
"strconv"
"sync"
Expand Down Expand Up @@ -115,8 +116,8 @@ type (
rangeID int64
executionManager persistence.ExecutionManager
eventsCache eventsCache
closeCh chan<- int
isClosed bool
closeCallback func(int, *historyShardsItem)
closed int32
config *Config
logger log.Logger
throttledLogger log.Logger
Expand All @@ -140,6 +141,9 @@ type (

var _ ShardContext = (*shardContextImpl)(nil)

// ErrShardClosed is returned when shard is closed and a req cannot be processed
var ErrShardClosed = errors.New("shard closed")

const (
logWarnTransferLevelDiff = 3000000 // 3 million
logWarnTimerLevelDiff = time.Duration(30 * time.Minute)
Expand Down Expand Up @@ -489,6 +493,7 @@ Create_Loop:
} else {
// Shard is stolen, trigger shutdown of history engine
s.closeShard()
break Create_Loop
}
}
default:
Expand All @@ -503,6 +508,7 @@ Create_Loop:
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.closeShard()
break Create_Loop
}
}
}
Expand Down Expand Up @@ -580,6 +586,7 @@ Update_Loop:
} else {
// Shard is stolen, trigger shutdown of history engine
s.closeShard()
break Update_Loop
}
}
default:
Expand All @@ -594,6 +601,7 @@ Update_Loop:
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.closeShard()
break Update_Loop
}
}
}
Expand Down Expand Up @@ -666,6 +674,7 @@ Reset_Loop:
} else {
// Shard is stolen, trigger shutdown of history engine
s.closeShard()
break Reset_Loop
}
}
default:
Expand All @@ -680,6 +689,7 @@ Reset_Loop:
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.closeShard()
break Reset_Loop
}
}
}
Expand Down Expand Up @@ -765,6 +775,7 @@ Reset_Loop:
} else {
// Shard is stolen, trigger shutdown of history engine
s.closeShard()
break Reset_Loop
}
}
default:
Expand All @@ -779,6 +790,7 @@ Reset_Loop:
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.closeShard()
break Reset_Loop
}
}
}
Expand Down Expand Up @@ -856,24 +868,22 @@ func (s *shardContextImpl) getRangeID() int64 {
return s.shardInfo.RangeID
}

func (s *shardContextImpl) isClosed() bool {
return atomic.LoadInt32(&s.closed) != 0
}

func (s *shardContextImpl) closeShard() {
if s.isClosed {
if !atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
return
}

s.isClosed = true

go s.shardItem.stopEngine()
go func() {
s.closeCallback(s.shardID, s.shardItem)
}()

// fails any writes that may start after this point.
s.shardInfo.RangeID = -1
atomic.StoreInt64(&s.rangeID, s.shardInfo.RangeID)

if s.closeCh != nil {
// This is the channel passed in by shard controller to monitor if a shard needs to be unloaded
// It will trigger the HistoryEngine unload and removal of engine from shard controller
s.closeCh <- s.shardID
}
}

func (s *shardContextImpl) generateTransferTaskIDLocked() (int64, error) {
Expand Down Expand Up @@ -943,6 +953,10 @@ func (s *shardContextImpl) updateMaxReadLevelLocked(rl int64) {
}

func (s *shardContextImpl) updateShardInfoLocked() error {
if s.isClosed() {
return ErrShardClosed
}

var err error
now := clock.NewRealTimeSource().Now()
if s.lastUpdated.Add(s.config.ShardUpdateMinInterval()).After(now) {
Expand Down Expand Up @@ -1142,8 +1156,10 @@ func (s *shardContextImpl) GetLastUpdatedTime() time.Time {
return s.lastUpdated
}

func acquireShard(shardItem *historyShardsItem, closeCh chan<- int) (ShardContext,
error) {
func acquireShard(
shardItem *historyShardsItem,
closeCallback func(int, *historyShardsItem),
) (ShardContext, error) {

var shardInfo *persistence.ShardInfo

Expand Down Expand Up @@ -1223,7 +1239,7 @@ func acquireShard(shardItem *historyShardsItem, closeCh chan<- int) (ShardContex
shardID: shardItem.shardID,
executionManager: executionMgr,
shardInfo: updatedShardInfo,
closeCh: closeCh,
closeCallback: closeCallback,
config: shardItem.config,
remoteClusterCurrentTime: remoteClusterCurrentTime,
timerMaxReadLevelMap: timerMaxReadLevelMap, // use ack to init read level
Expand Down
2 changes: 0 additions & 2 deletions service/history/shardContextTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ func newTestShardContext(
rangeID: shardInfo.RangeID,
shardInfo: shardInfo,
executionManager: resource.ExecutionMgr,
isClosed: false,
closeCh: make(chan int, 100),
config: config,
logger: resource.GetLogger(),
throttledLogger: resource.GetThrottledLogger(),
Expand Down
Loading

0 comments on commit a5111a6

Please sign in to comment.