Skip to content

Commit

Permalink
Replication task processor shutdown improvements and start/stop unit …
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored May 9, 2024
1 parent 82e3b9c commit 2fef3c3
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ package engineimpl
import (
"testing"

"go.uber.org/goleak"

"github.com/uber/cadence/service/history/engine/testdata"
)

func TestHistoryEngineStartStop(t *testing.T) {
t.Cleanup(func() { goleak.VerifyNone(t) })

eft := testdata.NewEngineForTest(t, NewEngineWithShardContext)

eft.Engine.Start()
Expand Down
59 changes: 30 additions & 29 deletions service/history/replication/task_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"fmt"
"math"
"strconv"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -93,6 +94,7 @@ type (
requestChan chan<- *request
syncShardChan chan *types.SyncShardStatus
done chan struct{}
wg sync.WaitGroup
}

request struct {
Expand Down Expand Up @@ -158,6 +160,7 @@ func (p *taskProcessorImpl) Start() {
return
}

p.wg.Add(3)
go p.processorLoop()
go p.syncShardStatusLoop()
go p.cleanupReplicationTaskLoop()
Expand All @@ -172,25 +175,39 @@ func (p *taskProcessorImpl) Stop() {

p.logger.Debug("ReplicationTaskProcessor shutting down.")
close(p.done)

if !common.AwaitWaitGroup(&p.wg, 10*time.Second) {
p.logger.Warn("ReplicationTaskProcessor timed out on shutdown.")
} else {
p.logger.Info("ReplicationTaskProcessor shutdown completed")
}
}

func (p *taskProcessorImpl) processorLoop() {
defer func() {
p.logger.Debug("Closing replication task processor.", tag.ReadLevel(p.lastRetrievedMessageID))
p.wg.Done()
}()

Loop:
for {
// for each iteration, do close check first
respChan := make(chan *types.ReplicationMessages, 1)
// TODO: when we support prefetching, LastRetrievedMessageID can be different than LastProcessedMessageID

select {
case <-p.done:
p.logger.Debug("ReplicationTaskProcessor shutting down.")
return
default:
case p.requestChan <- &request{
token: &types.ReplicationToken{
ShardID: int32(p.shard.GetShardID()),
LastRetrievedMessageID: p.lastRetrievedMessageID,
LastProcessedMessageID: p.lastProcessedMessageID,
},
respChan: respChan,
}:
// signal sent, continue to process replication messages
}

respChan := p.sendFetchMessageRequest()

select {
case response, ok := <-respChan:
if !ok {
Expand All @@ -213,16 +230,18 @@ Loop:
}

func (p *taskProcessorImpl) cleanupReplicationTaskLoop() {
defer p.wg.Done()

shardID := p.shard.GetShardID()
timer := time.NewTimer(backoff.JitDuration(
p.config.ReplicationTaskProcessorCleanupInterval(shardID),
p.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID),
))
defer timer.Stop()

for {
select {
case <-p.done:
timer.Stop()
return
case <-timer.C:
if err := p.cleanupAckedReplicationTasks(); err != nil {
Expand Down Expand Up @@ -272,22 +291,7 @@ func (p *taskProcessorImpl) cleanupAckedReplicationTasks() error {
return nil
}

func (p *taskProcessorImpl) sendFetchMessageRequest() <-chan *types.ReplicationMessages {
respChan := make(chan *types.ReplicationMessages, 1)
// TODO: when we support prefetching, LastRetrievedMessageID can be different than LastProcessedMessageID
p.requestChan <- &request{
token: &types.ReplicationToken{
ShardID: int32(p.shard.GetShardID()),
LastRetrievedMessageID: p.lastRetrievedMessageID,
LastProcessedMessageID: p.lastProcessedMessageID,
},
respChan: respChan,
}
return respChan
}

func (p *taskProcessorImpl) processResponse(response *types.ReplicationMessages) {

select {
case p.syncShardChan <- response.GetSyncShardStatus():
default:
Expand Down Expand Up @@ -324,11 +328,14 @@ func (p *taskProcessorImpl) processResponse(response *types.ReplicationMessages)
}

func (p *taskProcessorImpl) syncShardStatusLoop() {
defer p.wg.Done()

timer := time.NewTimer(backoff.JitDuration(
p.config.ShardSyncMinInterval(),
p.config.ShardSyncTimerJitterCoefficient(),
))
defer timer.Stop()

var syncShardTask *types.SyncShardStatus
for {
select {
Expand All @@ -346,19 +353,14 @@ func (p *taskProcessorImpl) syncShardStatusLoop() {
p.config.ShardSyncTimerJitterCoefficient(),
))
case <-p.done:
timer.Stop()
return
}
}
}

func (p *taskProcessorImpl) handleSyncShardStatus(
status *types.SyncShardStatus,
) error {

func (p *taskProcessorImpl) handleSyncShardStatus(status *types.SyncShardStatus) error {
if status == nil ||
p.shard.GetTimeSource().Now().Sub(
time.Unix(0, status.GetTimestamp())) > dropSyncShardTaskTimeThreshold {
p.shard.GetTimeSource().Now().Sub(time.Unix(0, status.GetTimestamp())) > dropSyncShardTaskTimeThreshold {
return nil
}
p.metricsClient.Scope(metrics.HistorySyncShardStatusScope).IncCounter(metrics.SyncShardFromRemoteCounter)
Expand Down Expand Up @@ -556,7 +558,6 @@ func (p *taskProcessorImpl) generateDLQRequest(
}

func (p *taskProcessorImpl) triggerDataInconsistencyScan(replicationTask *types.ReplicationTask) error {

var failoverVersion int64
var domainID string
var workflowID string
Expand Down
19 changes: 17 additions & 2 deletions service/history/replication/task_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"go.uber.org/goleak"
"go.uber.org/yarpc"

"github.com/uber/cadence/client"
Expand Down Expand Up @@ -129,10 +130,15 @@ func (s *taskProcessorSuite) SetupTest() {
}

func (s *taskProcessorSuite) TearDownTest() {
s.controller.Finish()
s.mockShard.Finish(s.T())
}

func (s *taskProcessorSuite) TestStartStop() {
s.taskProcessor.Start()
s.taskProcessor.Stop()
goleak.VerifyNone(s.T())
}

func (s *taskProcessorSuite) TestProcessResponse_NoTask() {
response := &types.ReplicationMessages{
LastRetrievedMessageID: 100,
Expand All @@ -144,12 +150,21 @@ func (s *taskProcessorSuite) TestProcessResponse_NoTask() {
}

func (s *taskProcessorSuite) TestSendFetchMessageRequest() {
s.taskProcessor.sendFetchMessageRequest()
// start the process loop so it poppulates requestChan
s.taskProcessor.wg.Add(1)
go s.taskProcessor.processorLoop()

// wait a bit and terminate the loop
time.Sleep(50 * time.Millisecond)
close(s.taskProcessor.done)

// check the request
requestMessage := <-s.requestChan

s.Equal(int32(0), requestMessage.token.GetShardID())
s.Equal(int64(-1), requestMessage.token.GetLastProcessedMessageID())
s.Equal(int64(-1), requestMessage.token.GetLastRetrievedMessageID())
s.NotNil(requestMessage.respChan)
}

func (s *taskProcessorSuite) TestHandleSyncShardStatus() {
Expand Down

0 comments on commit 2fef3c3

Please sign in to comment.