Skip to content

Commit

Permalink
Handle edge case when doing history re-replication from standby side. (
Browse files Browse the repository at this point in the history
…cadence-workflow#1358)

* Handle edge case where history re-replication triggered by standby, can have longer (stale) history, causing source cluster returning empty event blobs.

* Add dynamic config to enable re-replication feature
  • Loading branch information
wxing1292 authored Jan 3, 2019
1 parent 0ff94d9 commit 3148978
Show file tree
Hide file tree
Showing 15 changed files with 214 additions and 12 deletions.
15 changes: 11 additions & 4 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var keys = map[Key]string{

// history settings
EnableSyncActivityHeartbeat: "history.enableSyncActivityHeartbeat",
EnableHistoryRereplication: "history.enableHistoryRereplication",
HistoryRPS: "history.rps",
HistoryPersistenceMaxQPS: "history.persistenceMaxQPS",
HistoryVisibilityOpenMaxQPS: "history.historyVisibilityOpenMaxQPS",
Expand Down Expand Up @@ -150,9 +151,10 @@ var keys = map[Key]string{
EnableEventsV2: "history.enableEventsV2",
NumSystemWorkflows: "history.numSystemWorkflows",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerReplicatorConcurrency: "worker.replicatorConcurrency",
WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry",
WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerReplicatorConcurrency: "worker.replicatorConcurrency",
WorkerReplicatorBufferRetryCount: "worker.replicatorBufferRetryCount",
WorkerReplicationTaskMaxRetry: "worker.replicationTaskMaxRetry",
}

const (
Expand Down Expand Up @@ -246,6 +248,8 @@ const (

// EnableSyncActivityHeartbeat whether enable sending out sync activity heartbeat replication task
EnableSyncActivityHeartbeat
// EnableHistoryRereplication whether enable history re-replication
EnableHistoryRereplication
// HistoryRPS is request rate per second for each history host
HistoryRPS
// HistoryPersistenceMaxQPS is the max qps history host can query DB
Expand Down Expand Up @@ -370,12 +374,15 @@ const (

// EnableEventsV2 is whether to use eventsV2
EnableEventsV2
// key for histoworkerry

// key for history worker

// WorkerPersistenceMaxQPS is the max qps worker host can query DB
WorkerPersistenceMaxQPS
// WorkerReplicatorConcurrency is the max concurrenct tasks to be processed at any given time
WorkerReplicatorConcurrency
// WorkerReplicatorBufferRetryCount is the retry attempt when encounter retry error
WorkerReplicatorBufferRetryCount
// WorkerReplicationTaskMaxRetry is the max retry for any task
WorkerReplicationTaskMaxRetry

Expand Down
39 changes: 38 additions & 1 deletion common/xdc/historyRereplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type (

// HistoryRereplicatorImpl is the implementation of HistoryRereplicator
HistoryRereplicatorImpl struct {
targetClusterName string
domainCache cache.DomainCache
adminClient a.Client
historyReplicationFn historyReplicationFn
Expand Down Expand Up @@ -104,10 +105,11 @@ func newHistoryRereplicationContext(domainID string, workflowID string,
}

// NewHistoryRereplicator create a new HistoryRereplicatorImpl
func NewHistoryRereplicator(domainCache cache.DomainCache, adminClient a.Client, historyReplicationFn historyReplicationFn,
func NewHistoryRereplicator(targetClusterName string, domainCache cache.DomainCache, adminClient a.Client, historyReplicationFn historyReplicationFn,
serializer persistence.HistorySerializer, replicationTimeout time.Duration, logger bark.Logger) *HistoryRereplicatorImpl {

return &HistoryRereplicatorImpl{
targetClusterName: targetClusterName,
domainCache: domainCache,
adminClient: adminClient,
historyReplicationFn: historyReplicationFn,
Expand Down Expand Up @@ -199,6 +201,13 @@ func (c *historyRereplicationContext) sendSingleWorkflowHistory(domainID string,
replicationInfo = response.ReplicationInfo
token = response.NextPageToken

if len(response.HistoryBatches) == 0 {
// this case can happen if standby side try to fetch history events
// from active while active's history length < standby's history length
// due to standby containing stale history
return "", c.handleEmptyHistory(domainID, workflowID, runID, replicationInfo)
}

for _, batch := range response.HistoryBatches {
// it is intentional that the first request is nil
// the reason is, we need to check the last request, if that request contains
Expand Down Expand Up @@ -339,6 +348,34 @@ func (c *historyRereplicationContext) sendReplicationRawRequest(request *history
return c.rereplicator.historyReplicationFn(ctxAgain, request)
}

func (c *historyRereplicationContext) handleEmptyHistory(domainID string, workfloID string, runID string,
replicationInfo map[string]*shared.ReplicationInfo) error {

ri, ok := replicationInfo[c.rereplicator.targetClusterName]
var firstEventID int64
if !ok {
firstEventID = common.FirstEventID
} else {
firstEventID = ri.GetLastEventId() + 1
}
_, err := c.sendSingleWorkflowHistory(
domainID,
workfloID,
runID,
firstEventID,
common.EndEventID,
)
if err != nil {
c.rereplicator.logger.WithFields(bark.Fields{
logging.TagDomainID: domainID,
logging.TagWorkflowExecutionID: workfloID,
logging.TagWorkflowRunID: runID,
logging.TagErr: err,
}).Error("error sending history")
}
return err
}

func (c *historyRereplicationContext) getHistory(domainID string, workflowID string, runID string,
firstEventID int64, nextEventID int64, token []byte, pageSize int32) (*admin.GetWorkflowExecutionRawHistoryResponse, error) {

Expand Down
121 changes: 119 additions & 2 deletions common/xdc/historyRereplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ type (
historyRereplicatorSuite struct {
suite.Suite

domainID string
domainName string
domainID string
domainName string
targetClusterName string

mockClusterMetadata *mocks.ClusterMetadata
mockMetadataMgr *mocks.MetadataManager
Expand Down Expand Up @@ -86,6 +87,7 @@ func (s *historyRereplicatorSuite) SetupTest() {

s.domainID = uuid.New()
s.domainName = "some random domain name"
s.targetClusterName = "some random target cluster name"
s.mockMetadataMgr.On("GetDomain", mock.Anything).Return(
&persistence.GetDomainResponse{
Info: &persistence.DomainInfo{ID: s.domainID, Name: s.domainName},
Expand All @@ -105,6 +107,7 @@ func (s *historyRereplicatorSuite) SetupTest() {
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
domainCache := cache.NewDomainCache(s.mockMetadataMgr, s.mockClusterMetadata, metricsClient, s.logger)
s.rereplicator = NewHistoryRereplicator(
s.targetClusterName,
domainCache,
s.mockAdminClient,
func(ctx context.Context, request *history.ReplicateRawEventsRequest) error {
Expand Down Expand Up @@ -962,6 +965,120 @@ func (s *historyRereplicatorSuite) TestSendReplicationRawRequest_Err() {
s.Equal(retryErr, err)
}

func (s *historyRereplicatorSuite) TestHandleEmptyHistory_FoundReplicationInfoEntry() {
workflowID := "some random workflow ID"
runID := uuid.New()
lastVersion := int64(777)
lastEventID := int64(999)
replicationInfo := map[string]*shared.ReplicationInfo{
s.targetClusterName: &shared.ReplicationInfo{
Version: common.Int64Ptr(lastVersion),
LastEventId: common.Int64Ptr(lastEventID),
},
}
eventStoreVersion := int32(9)
eventBatch := []*shared.HistoryEvent{
&shared.HistoryEvent{
EventId: common.Int64Ptr(lastEventID + 1),
Version: common.Int64Ptr(lastVersion + 1),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
EventType: shared.EventTypeTimerFired.Ptr(),
},
}
blob := s.serializeEvents(eventBatch)

s.mockAdminClient.On("GetWorkflowExecutionRawHistory", mock.Anything, &admin.GetWorkflowExecutionRawHistoryRequest{
Domain: common.StringPtr(s.domainName),
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
},
FirstEventId: common.Int64Ptr(lastEventID + 1),
NextEventId: common.Int64Ptr(common.EndEventID),
MaximumPageSize: common.Int32Ptr(defaultPageSize),
NextPageToken: nil,
}).Return(&admin.GetWorkflowExecutionRawHistoryResponse{
HistoryBatches: []*shared.DataBlob{blob},
NextPageToken: nil,
ReplicationInfo: replicationInfo,
EventStoreVersion: common.Int32Ptr(eventStoreVersion),
}, nil).Once()

s.mockHistoryClient.On("ReplicateRawEvents", mock.Anything, &history.ReplicateRawEventsRequest{
DomainUUID: common.StringPtr(s.domainID),
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
},
ReplicationInfo: replicationInfo,
History: blob,
NewRunHistory: nil,
EventStoreVersion: common.Int32Ptr(eventStoreVersion),
NewRunEventStoreVersion: nil,
}).Return(nil).Once()

rereplicationContext := newHistoryRereplicationContext(s.domainID, workflowID, runID, int64(123), uuid.New(), int64(111), s.rereplicator)
err := rereplicationContext.handleEmptyHistory(s.domainID, workflowID, runID, replicationInfo)
s.Nil(err)
}

func (s *historyRereplicatorSuite) TestHandleEmptyHistory_NoReplicationInfoEntry() {
workflowID := "some random workflow ID"
runID := uuid.New()
lastVersion := int64(777)
lastEventID := int64(999)
replicationInfo := map[string]*shared.ReplicationInfo{
"some randon cluster": &shared.ReplicationInfo{
Version: common.Int64Ptr(lastVersion),
LastEventId: common.Int64Ptr(lastEventID),
},
}
eventStoreVersion := int32(9)
eventBatch := []*shared.HistoryEvent{
&shared.HistoryEvent{
EventId: common.Int64Ptr(common.FirstEventID),
Version: common.Int64Ptr(1),
Timestamp: common.Int64Ptr(time.Now().UnixNano()),
EventType: shared.EventTypeWorkflowExecutionStarted.Ptr(),
},
}
blob := s.serializeEvents(eventBatch)

s.mockAdminClient.On("GetWorkflowExecutionRawHistory", mock.Anything, &admin.GetWorkflowExecutionRawHistoryRequest{
Domain: common.StringPtr(s.domainName),
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
},
FirstEventId: common.Int64Ptr(common.FirstEventID),
NextEventId: common.Int64Ptr(common.EndEventID),
MaximumPageSize: common.Int32Ptr(defaultPageSize),
NextPageToken: nil,
}).Return(&admin.GetWorkflowExecutionRawHistoryResponse{
HistoryBatches: []*shared.DataBlob{blob},
NextPageToken: nil,
ReplicationInfo: replicationInfo,
EventStoreVersion: common.Int32Ptr(eventStoreVersion),
}, nil).Once()

s.mockHistoryClient.On("ReplicateRawEvents", mock.Anything, &history.ReplicateRawEventsRequest{
DomainUUID: common.StringPtr(s.domainID),
WorkflowExecution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(workflowID),
RunId: common.StringPtr(runID),
},
ReplicationInfo: replicationInfo,
History: blob,
NewRunHistory: nil,
EventStoreVersion: common.Int32Ptr(eventStoreVersion),
NewRunEventStoreVersion: nil,
}).Return(nil).Once()

rereplicationContext := newHistoryRereplicationContext(s.domainID, workflowID, runID, int64(123), uuid.New(), int64(111), s.rereplicator)
err := rereplicationContext.handleEmptyHistory(s.domainID, workflowID, runID, replicationInfo)
s.Nil(err)
}

func (s *historyRereplicatorSuite) TestGetHistory() {
workflowID := "some random workflow ID"
runID := uuid.New()
Expand Down
10 changes: 10 additions & 0 deletions service/frontend/adminHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,16 @@ func (adh *AdminHandler) GetWorkflowExecutionRawHistory(
pageSize,
)
if err != nil {
if _, ok := err.(*gen.EntityNotExistsError); ok {
// when no events can be returned from DB, DB layer will return
// EntityNotExistsError, this API shall return empty response
return &admin.GetWorkflowExecutionRawHistoryResponse{
HistoryBatches: []*gen.DataBlob{},
ReplicationInfo: token.ReplicationInfo,
EventStoreVersion: common.Int32Ptr(token.EventStoreVersion),
NextPageToken: nil, // no further pagination
}, nil
}
return nil, err
}

Expand Down
2 changes: 2 additions & 0 deletions service/history/historyReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ var (
ErrRetrySyncActivity = &shared.RetryTaskError{Message: "retry on applying sync activity"}
// ErrRetryBufferEventsMsg is returned when events are arriving out of order, should retry, or specify force apply
ErrRetryBufferEventsMsg = "retry on applying buffer events"
// ErrRetryEmptyEventsMsg is returned when events size is 0
ErrRetryEmptyEventsMsg = "retry on applying empty events"
// ErrWorkflowNotFoundMsg is returned when workflow not found
ErrWorkflowNotFoundMsg = "retry on workflow not found"
// ErrRetryExistingWorkflowMsg is returned when events are arriving out of order, and there is another workflow with same version running
Expand Down
2 changes: 2 additions & 0 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
NumberOfShards int

EnableSyncActivityHeartbeat dynamicconfig.BoolPropertyFn
EnableHistoryRereplication dynamicconfig.BoolPropertyFn
RPS dynamicconfig.IntPropertyFn
MaxIDLengthLimit dynamicconfig.IntPropertyFn
PersistenceMaxQPS dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -136,6 +137,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int) *Config {
return &Config{
NumberOfShards: numberOfShards,
EnableSyncActivityHeartbeat: dc.GetBoolProperty(dynamicconfig.EnableSyncActivityHeartbeat, false),
EnableHistoryRereplication: dc.GetBoolProperty(dynamicconfig.EnableHistoryRereplication, false),
RPS: dc.GetIntProperty(dynamicconfig.HistoryRPS, 3000),
MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000),
PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.HistoryPersistenceMaxQPS, 9000),
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func newTimerQueueProcessor(shard ShardContext, historyService *historyEngineImp
for clusterName := range shard.GetService().GetClusterMetadata().GetAllClusterFailoverVersions() {
if clusterName != shard.GetService().GetClusterMetadata().GetCurrentClusterName() {
historyRereplicator := xdc.NewHistoryRereplicator(
currentClusterName,
shard.GetDomainCache(),
shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName),
func(ctx context.Context, request *h.ReplicateRawEventsRequest) error {
Expand Down
4 changes: 4 additions & 0 deletions service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ func (t *timerQueueStandbyProcessorImpl) fetchHistoryAndVerifyOnce(timerTask *pe
}

func (t *timerQueueStandbyProcessorImpl) fetchHistoryFromRemote(timerTask *persistence.TimerTaskInfo, nextEventID int64) error {
if !t.shard.GetConfig().EnableHistoryRereplication() {
return nil
}

err := t.historyRereplicator.SendMultiWorkflowHistory(
timerTask.DomainID, timerTask.WorkflowID,
timerTask.RunID, nextEventID,
Expand Down
5 changes: 4 additions & 1 deletion service/history/timerQueueStandbyProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/common/xdc"
)

Expand Down Expand Up @@ -119,6 +120,8 @@ func (s *timerQueueStandbyProcessorSuite) SetupTest() {
s.mockClientBean = &client.MockClientBean{}
s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.mockClientBean, s.logger)

config := NewDynamicConfigForTest()
config.EnableHistoryRereplication = dynamicconfig.GetBoolPropertyFn(true)
s.mockShard = &shardContextImpl{
service: s.mockService,
shardInfo: &persistence.ShardInfo{ShardID: shardID, RangeID: 1, TransferAckLevel: 0},
Expand All @@ -128,7 +131,7 @@ func (s *timerQueueStandbyProcessorSuite) SetupTest() {
historyMgr: s.mockHistoryMgr,
maxTransferSequenceNumber: 100000,
closeCh: make(chan int, 100),
config: NewDynamicConfigForTest(),
config: config,
logger: s.logger,
domainCache: cache.NewDomainCache(s.mockMetadataMgr, s.mockClusterMetadata, metricsClient, s.logger),
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
Expand Down
1 change: 1 addition & 0 deletions service/history/transferQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func newTransferQueueProcessor(shard ShardContext, historyService *historyEngine
for clusterName := range shard.GetService().GetClusterMetadata().GetAllClusterFailoverVersions() {
if clusterName != currentClusterName {
historyRereplicator := xdc.NewHistoryRereplicator(
currentClusterName,
shard.GetDomainCache(),
shard.GetService().GetClientBean().GetRemoteAdminClient(clusterName),
func(ctx context.Context, request *h.ReplicateRawEventsRequest) error {
Expand Down
3 changes: 3 additions & 0 deletions service/history/transferQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ func (t *transferQueueStandbyProcessorImpl) fetchHistoryAndVerifyOnce(transferTa
}

func (t *transferQueueStandbyProcessorImpl) fetchHistoryFromRemote(transferTask *persistence.TransferTaskInfo, nextEventID int64) error {
if !t.shard.GetConfig().EnableHistoryRereplication() {
return nil
}
err := t.historyRereplicator.SendMultiWorkflowHistory(
transferTask.DomainID, transferTask.WorkflowID,
transferTask.RunID, nextEventID,
Expand Down
5 changes: 4 additions & 1 deletion service/history/transferQueueStandbyProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/common/xdc"
)

Expand Down Expand Up @@ -121,6 +122,8 @@ func (s *transferQueueStandbyProcessorSuite) SetupTest() {
s.mockClientBean = &client.MockClientBean{}
s.mockService = service.NewTestService(s.mockClusterMetadata, s.mockMessagingClient, metricsClient, s.mockClientBean, s.logger)

config := NewDynamicConfigForTest()
config.EnableHistoryRereplication = dynamicconfig.GetBoolPropertyFn(true)
s.mockShard = &shardContextImpl{
service: s.mockService,
shardInfo: &persistence.ShardInfo{ShardID: shardID, RangeID: 1, TransferAckLevel: 0},
Expand All @@ -130,7 +133,7 @@ func (s *transferQueueStandbyProcessorSuite) SetupTest() {
historyMgr: s.mockHistoryMgr,
maxTransferSequenceNumber: 100000,
closeCh: make(chan int, 100),
config: NewDynamicConfigForTest(),
config: config,
logger: s.logger,
domainCache: cache.NewDomainCache(s.mockMetadataMgr, s.mockClusterMetadata, metricsClient, s.logger),
metricsClient: metrics.NewClient(tally.NoopScope, metrics.History),
Expand Down
Loading

0 comments on commit 3148978

Please sign in to comment.