diff --git a/common/cluster/metadataTestBase.go b/common/cluster/metadataTestBase.go index 5812968763b..c3dc457f7fa 100644 --- a/common/cluster/metadataTestBase.go +++ b/common/cluster/metadataTestBase.go @@ -93,12 +93,6 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool) Metad TestAllClusterInfo, &config.ReplicationConsumerConfig{ Type: config.ReplicationConsumerTypeRPC, - FetcherConfig: &config.FetcherConfig{ - RPCParallelism: 1, - AggregationIntervalSecs: 2, - ErrorRetryWaitSecs: 1, - TimerJitterCoefficient: 0.15, - }, }, ) } diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 8abfd33e64d..dc0808d9de4 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -153,14 +153,18 @@ const ( PersistenceGetCurrentExecutionScope // PersistenceGetTransferTasksScope tracks GetTransferTasks calls made by service to persistence layer PersistenceGetTransferTasksScope - // PersistenceGetReplicationTasksScope tracks GetReplicationTasks calls made by service to persistence layer - PersistenceGetReplicationTasksScope // PersistenceCompleteTransferTaskScope tracks CompleteTransferTasks calls made by service to persistence layer PersistenceCompleteTransferTaskScope // PersistenceRangeCompleteTransferTaskScope tracks CompleteTransferTasks calls made by service to persistence layer PersistenceRangeCompleteTransferTaskScope + // PersistenceGetReplicationTasksScope tracks GetReplicationTasks calls made by service to persistence layer + PersistenceGetReplicationTasksScope // PersistenceCompleteReplicationTaskScope tracks CompleteReplicationTasks calls made by service to persistence layer PersistenceCompleteReplicationTaskScope + // PersistencePutReplicationTaskToDLQScope tracks PersistencePutReplicationTaskToDLQScope calls made by service to persistence layer + PersistencePutReplicationTaskToDLQScope + // PersistenceGetReplicationTasksFromDLQScope tracks PersistenceGetReplicationTasksFromDLQScope calls made by service to persistence layer + PersistenceGetReplicationTasksFromDLQScope // PersistenceGetTimerIndexTasksScope tracks GetTimerIndexTasks calls made by service to persistence layer PersistenceGetTimerIndexTasksScope // PersistenceCompleteTimerTaskScope tracks CompleteTimerTasks calls made by service to persistence layer @@ -949,10 +953,12 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ PersistenceDeleteTaskScope: {operation: "PersistenceDelete"}, PersistenceGetCurrentExecutionScope: {operation: "GetCurrentExecution"}, PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"}, - PersistenceGetReplicationTasksScope: {operation: "GetReplicationTasks"}, PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"}, PersistenceRangeCompleteTransferTaskScope: {operation: "RangeCompleteTransferTask"}, + PersistenceGetReplicationTasksScope: {operation: "GetReplicationTasks"}, PersistenceCompleteReplicationTaskScope: {operation: "CompleteReplicationTask"}, + PersistencePutReplicationTaskToDLQScope: {operation: "PersistencePutReplicationTaskToDLQ"}, + PersistenceGetReplicationTasksFromDLQScope: {operation: "PersistenceGetReplicationTasksFromDLQ"}, PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"}, PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"}, PersistenceRangeCompleteTimerTaskScope: {operation: "RangeCompleteTimerTask"}, diff --git a/common/mocks/ExecutionManager.go b/common/mocks/ExecutionManager.go index 7ac80843faf..70755b2bf21 100644 --- a/common/mocks/ExecutionManager.go +++ b/common/mocks/ExecutionManager.go @@ -307,6 +307,43 @@ func (_m *ExecutionManager) CompleteReplicationTask(request *persistence.Complet return r0 } +// PutReplicationTaskToDLQ provides a mock function with given fields: request +func (_m *ExecutionManager) PutReplicationTaskToDLQ(request *persistence.PutReplicationTaskToDLQRequest) error { + ret := _m.Called(request) + + var r0 error + if rf, ok := ret.Get(0).(func(*persistence.PutReplicationTaskToDLQRequest) error); ok { + r0 = rf(request) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetReplicationTasksFromDLQ provides a mock function with given fields: request +func (_m *ExecutionManager) GetReplicationTasksFromDLQ(request *persistence.GetReplicationTasksFromDLQRequest) (*persistence.GetReplicationTasksFromDLQResponse, error) { + ret := _m.Called(request) + + var r0 *persistence.GetReplicationTasksFromDLQResponse + if rf, ok := ret.Get(0).(func(*persistence.GetReplicationTasksFromDLQRequest) *persistence.GetReplicationTasksFromDLQResponse); ok { + r0 = rf(request) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*persistence.GetReplicationTasksFromDLQResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(*persistence.GetReplicationTasksFromDLQRequest) error); ok { + r1 = rf(request) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // GetTimerIndexTasks provides a mock function with given fields: request func (_m *ExecutionManager) GetTimerIndexTasks(request *persistence.GetTimerIndexTasksRequest) (*persistence.GetTimerIndexTasksResponse, error) { ret := _m.Called(request) diff --git a/common/persistence/cassandra/cassandraPersistence.go b/common/persistence/cassandra/cassandraPersistence.go index 275e49bc855..b1359a750f1 100644 --- a/common/persistence/cassandra/cassandraPersistence.go +++ b/common/persistence/cassandra/cassandraPersistence.go @@ -63,6 +63,9 @@ const ( rowTypeReplicationDomainID = "10000000-5000-f000-f000-000000000000" rowTypeReplicationWorkflowID = "20000000-5000-f000-f000-000000000000" rowTypeReplicationRunID = "30000000-5000-f000-f000-000000000000" + // Row Constants for Replication Task DLQ Row. Source cluster name will be used as WorkflowID. + rowTypeDLQDomainID = "10000000-6000-f000-f000-000000000000" + rowTypeDLQRunID = "30000000-6000-f000-f000-000000000000" // Special TaskId constants rowTypeExecutionTaskID = int64(-10) rowTypeShardTaskID = int64(-11) @@ -82,6 +85,7 @@ const ( rowTypeTransferTask rowTypeTimerTask rowTypeReplicationTask + rowTypeDLQ ) const ( @@ -878,8 +882,11 @@ func newShardPersistence(cfg config.Cassandra, clusterName string, logger log.Lo } // NewWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation -func NewWorkflowExecutionPersistence(shardID int, session *gocql.Session, - logger log.Logger) (p.ExecutionStore, error) { +func NewWorkflowExecutionPersistence( + shardID int, + session *gocql.Session, + logger log.Logger, +) (p.ExecutionStore, error) { return &cassandraPersistence{cassandraStore: cassandraStore{session: session, logger: logger}, shardID: shardID}, nil } @@ -2089,6 +2096,12 @@ func (d *cassandraPersistence) GetReplicationTasks( request.MaxReadLevel, ).PageSize(request.BatchSize).PageState(request.NextPageToken) + return d.populateGetReplicationTasksResponse(query) +} + +func (d *cassandraPersistence) populateGetReplicationTasksResponse( + query *gocql.Query, +) (*p.GetReplicationTasksResponse, error) { iter := query.Iter() if iter == nil { return nil, &workflow.InternalServiceError{ @@ -2691,3 +2704,62 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *p.GetTimerIndexTasksR return response, nil } + +func (d *cassandraPersistence) PutReplicationTaskToDLQ(request *p.PutReplicationTaskToDLQRequest) error { + task := request.TaskInfo + query := d.session.Query(templateCreateReplicationTaskQuery, + d.shardID, + rowTypeDLQ, + rowTypeDLQDomainID, + request.SourceClusterName, + rowTypeDLQRunID, + task.DomainID, + task.WorkflowID, + task.RunID, + task.TaskID, + task.TaskType, + task.FirstEventID, + task.NextEventID, + task.Version, + task.LastReplicationInfo, + task.ScheduledID, + defaultEventStoreVersionValue, + task.BranchToken, + task.ResetWorkflow, + defaultEventStoreVersionValue, + task.NewRunBranchToken, + defaultVisibilityTimestamp, + task.GetTaskID()) + + err := query.Exec() + if err != nil { + if isThrottlingError(err) { + return &workflow.ServiceBusyError{ + Message: fmt.Sprintf("PutReplicationTaskToDLQ operation failed. Error: %v", err), + } + } + return &workflow.InternalServiceError{ + Message: fmt.Sprintf("PutReplicationTaskToDLQ operation failed. Error: %v", err), + } + } + + return nil +} + +func (d *cassandraPersistence) GetReplicationTasksFromDLQ( + request *p.GetReplicationTasksFromDLQRequest, +) (*p.GetReplicationTasksFromDLQResponse, error) { + // Reading replication tasks need to be quorum level consistent, otherwise we could loose task + query := d.session.Query(templateGetReplicationTasksQuery, + d.shardID, + rowTypeDLQ, + rowTypeDLQDomainID, + request.SourceClusterName, + rowTypeDLQRunID, + defaultVisibilityTimestamp, + request.ReadLevel, + request.ReadLevel+int64(request.BatchSize), + ).PageSize(request.BatchSize).PageState(request.NextPageToken) + + return d.populateGetReplicationTasksResponse(query) +} diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index a398d78a869..ab7b00d7ffa 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -963,6 +963,21 @@ type ( TaskID int64 } + // PutReplicationTaskToDLQRequest is used to put a replication task to dlq + PutReplicationTaskToDLQRequest struct { + SourceClusterName string + TaskInfo *ReplicationTaskInfo + } + + // GetReplicationTasksFromDLQRequest is used to get replication tasks from dlq + GetReplicationTasksFromDLQRequest struct { + SourceClusterName string + GetReplicationTasksRequest + } + + // GetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ + GetReplicationTasksFromDLQResponse = GetReplicationTasksResponse + // RangeCompleteTimerTaskRequest is used to complete a range of tasks in the timer task queue RangeCompleteTimerTaskRequest struct { InclusiveBeginTimestamp time.Time @@ -1434,6 +1449,8 @@ type ( // Replication task related methods GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error) CompleteReplicationTask(request *CompleteReplicationTaskRequest) error + PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error + GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error) // Timer related methods. GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) @@ -2425,3 +2442,22 @@ func SplitHistoryGarbageCleanupInfo(info string) (domainID, workflowID, runID st workflowID = info[len(domainID)+1 : workflowEnd] return } + +// NewGetReplicationTasksFromDLQRequest creates a new GetReplicationTasksFromDLQRequest +func NewGetReplicationTasksFromDLQRequest( + sourceClusterName string, + readLevel int64, + maxReadLevel int64, + batchSize int, + nextPageToken []byte, +) *GetReplicationTasksFromDLQRequest { + return &GetReplicationTasksFromDLQRequest{ + SourceClusterName: sourceClusterName, + GetReplicationTasksRequest: GetReplicationTasksRequest{ + ReadLevel: readLevel, + MaxReadLevel: maxReadLevel, + BatchSize: batchSize, + NextPageToken: nextPageToken, + }, + } +} diff --git a/common/persistence/executionStore.go b/common/persistence/executionStore.go index 0796649cb10..2d88c164e0e 100644 --- a/common/persistence/executionStore.go +++ b/common/persistence/executionStore.go @@ -813,6 +813,18 @@ func (m *executionManagerImpl) CompleteReplicationTask( return m.persistence.CompleteReplicationTask(request) } +func (m *executionManagerImpl) PutReplicationTaskToDLQ( + request *PutReplicationTaskToDLQRequest, +) error { + return m.persistence.PutReplicationTaskToDLQ(request) +} + +func (m *executionManagerImpl) GetReplicationTasksFromDLQ( + request *GetReplicationTasksFromDLQRequest, +) (*GetReplicationTasksFromDLQResponse, error) { + return m.persistence.GetReplicationTasksFromDLQ(request) +} + // Timer related methods. func (m *executionManagerImpl) GetTimerIndexTasks( request *GetTimerIndexTasksRequest, diff --git a/common/persistence/persistenceInterface.go b/common/persistence/persistenceInterface.go index c6ca188adc1..16b62b1bd40 100644 --- a/common/persistence/persistenceInterface.go +++ b/common/persistence/persistenceInterface.go @@ -77,6 +77,8 @@ type ( // Replication task related methods GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error) CompleteReplicationTask(request *CompleteReplicationTaskRequest) error + PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error + GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error) // Timer related methods. GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) diff --git a/common/persistence/persistenceMetricClients.go b/common/persistence/persistenceMetricClients.go index bf3acc7ae43..a81c3a5f705 100644 --- a/common/persistence/persistenceMetricClients.go +++ b/common/persistence/persistenceMetricClients.go @@ -400,18 +400,50 @@ func (p *workflowExecutionPersistenceClient) CompleteReplicationTask(request *Co return err } +func (p *workflowExecutionPersistenceClient) PutReplicationTaskToDLQ( + request *PutReplicationTaskToDLQRequest, +) error { + p.metricClient.IncCounter(metrics.PersistencePutReplicationTaskToDLQScope, metrics.PersistenceRequests) + + sw := p.metricClient.StartTimer(metrics.PersistencePutReplicationTaskToDLQScope, metrics.PersistenceLatency) + err := p.persistence.PutReplicationTaskToDLQ(request) + sw.Stop() + + if err != nil { + p.updateErrorMetric(metrics.PersistencePutReplicationTaskToDLQScope, err) + } + + return err +} + +func (p *workflowExecutionPersistenceClient) GetReplicationTasksFromDLQ( + request *GetReplicationTasksFromDLQRequest, +) (*GetReplicationTasksFromDLQResponse, error) { + p.metricClient.IncCounter(metrics.PersistenceGetReplicationTasksFromDLQScope, metrics.PersistenceRequests) + + sw := p.metricClient.StartTimer(metrics.PersistenceGetReplicationTasksFromDLQScope, metrics.PersistenceLatency) + response, err := p.persistence.GetReplicationTasksFromDLQ(request) + sw.Stop() + + if err != nil { + p.updateErrorMetric(metrics.PersistenceGetReplicationTasksFromDLQScope, err) + } + + return response, err +} + func (p *workflowExecutionPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) { p.metricClient.IncCounter(metrics.PersistenceGetTimerIndexTasksScope, metrics.PersistenceRequests) sw := p.metricClient.StartTimer(metrics.PersistenceGetTimerIndexTasksScope, metrics.PersistenceLatency) - resonse, err := p.persistence.GetTimerIndexTasks(request) + response, err := p.persistence.GetTimerIndexTasks(request) sw.Stop() if err != nil { p.updateErrorMetric(metrics.PersistenceGetTimerIndexTasksScope, err) } - return resonse, err + return response, err } func (p *workflowExecutionPersistenceClient) CompleteTimerTask(request *CompleteTimerTaskRequest) error { diff --git a/common/persistence/persistenceRateLimitedClients.go b/common/persistence/persistenceRateLimitedClients.go index d8c50253dae..7f85eefd4c5 100644 --- a/common/persistence/persistenceRateLimitedClients.go +++ b/common/persistence/persistenceRateLimitedClients.go @@ -308,6 +308,26 @@ func (p *workflowExecutionRateLimitedPersistenceClient) CompleteReplicationTask( return err } +func (p *workflowExecutionRateLimitedPersistenceClient) PutReplicationTaskToDLQ( + request *PutReplicationTaskToDLQRequest, +) error { + if ok := p.rateLimiter.Allow(); !ok { + return ErrPersistenceLimitExceeded + } + + return p.persistence.PutReplicationTaskToDLQ(request) +} + +func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTasksFromDLQ( + request *GetReplicationTasksFromDLQRequest, +) (*GetReplicationTasksFromDLQResponse, error) { + if ok := p.rateLimiter.Allow(); !ok { + return nil, ErrPersistenceLimitExceeded + } + + return p.persistence.GetReplicationTasksFromDLQ(request) +} + func (p *workflowExecutionRateLimitedPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) { if ok := p.rateLimiter.Allow(); !ok { return nil, ErrPersistenceLimitExceeded diff --git a/common/persistence/sql/sqlExecutionManager.go b/common/persistence/sql/sqlExecutionManager.go index 72769d2c2f5..7f139c6d23d 100644 --- a/common/persistence/sql/sqlExecutionManager.go +++ b/common/persistence/sql/sqlExecutionManager.go @@ -29,6 +29,7 @@ import ( "time" workflow "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/.gen/go/sqlblobs" "github.com/uber/cadence/common" "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/log" @@ -907,33 +908,48 @@ func (m *sqlExecutionManager) GetReplicationTasks( request *p.GetReplicationTasksRequest, ) (*p.GetReplicationTasksResponse, error) { - var readLevel int64 - var maxReadLevelInclusive int64 - var err error + readLevel, maxReadLevelInclusive, err := getReadLevels(request) + if err != nil { + return nil, err + } + + rows, err := m.db.SelectFromReplicationTasks( + &sqldb.ReplicationTasksFilter{ + ShardID: m.shardID, + MinTaskID: readLevel, + MaxTaskID: maxReadLevelInclusive, + PageSize: request.BatchSize, + }) + + switch err { + case nil: + return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel) + case sql.ErrNoRows: + return &p.GetReplicationTasksResponse{}, nil + default: + return nil, &workflow.InternalServiceError{ + Message: fmt.Sprintf("GetReplicationTasks operation failed. Select failed: %v", err), + } + } +} + +func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxReadLevelInclusive int64, err error) { + readLevel = request.ReadLevel if len(request.NextPageToken) > 0 { readLevel, err = deserializePageToken(request.NextPageToken) if err != nil { - return nil, err + return 0, 0, err } - } else { - readLevel = request.ReadLevel } - maxReadLevelInclusive = collection.MaxInt64( - readLevel+int64(request.BatchSize), request.MaxReadLevel) - rows, err := m.db.SelectFromReplicationTasks(&sqldb.ReplicationTasksFilter{ - ShardID: m.shardID, - MinTaskID: &readLevel, - MaxTaskID: &maxReadLevelInclusive, - PageSize: &request.BatchSize, - }) - if err != nil { - if err != sql.ErrNoRows { - return nil, &workflow.InternalServiceError{ - Message: fmt.Sprintf("GetReplicationTasks operation failed. Select failed: %v", err), - } - } - } + maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel) + return readLevel, maxReadLevelInclusive, nil +} + +func (m *sqlExecutionManager) populateGetReplicationTasksResponse( + rows []sqldb.ReplicationTasksRow, + requestMaxReadLevel int64, +) (*p.GetReplicationTasksResponse, error) { if len(rows) == 0 { return &p.GetReplicationTasksResponse{}, nil } @@ -971,7 +987,7 @@ func (m *sqlExecutionManager) GetReplicationTasks( } var nextPageToken []byte lastTaskID := rows[len(rows)-1].TaskID - if lastTaskID < request.MaxReadLevel { + if lastTaskID < requestMaxReadLevel { nextPageToken = serializePageToken(lastTaskID) } return &p.GetReplicationTasksResponse{ @@ -984,10 +1000,7 @@ func (m *sqlExecutionManager) CompleteReplicationTask( request *p.CompleteReplicationTaskRequest, ) error { - if _, err := m.db.DeleteFromReplicationTasks(&sqldb.ReplicationTasksFilter{ - ShardID: m.shardID, - TaskID: &request.TaskID, - }); err != nil { + if _, err := m.db.DeleteFromReplicationTasks(m.shardID, int(request.TaskID)); err != nil { return &workflow.InternalServiceError{ Message: fmt.Sprintf("CompleteReplicationTask operation failed. Error: %v", err), } @@ -995,6 +1008,38 @@ func (m *sqlExecutionManager) CompleteReplicationTask( return nil } +func (m *sqlExecutionManager) GetReplicationTasksFromDLQ( + request *p.GetReplicationTasksFromDLQRequest, +) (*p.GetReplicationTasksFromDLQResponse, error) { + + readLevel, maxReadLevelInclusive, err := getReadLevels(&request.GetReplicationTasksRequest) + if err != nil { + return nil, err + } + + filter := sqldb.ReplicationTasksFilter{ + ShardID: m.shardID, + MinTaskID: readLevel, + MaxTaskID: maxReadLevelInclusive, + PageSize: request.BatchSize, + } + rows, err := m.db.SelectFromReplicationTasksDLQ(&sqldb.ReplicationTasksDLQFilter{ + ReplicationTasksFilter: filter, + SourceClusterName: request.SourceClusterName, + }) + + switch err { + case nil: + return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel) + case sql.ErrNoRows: + return &p.GetReplicationTasksResponse{}, nil + default: + return nil, &workflow.InternalServiceError{ + Message: fmt.Sprintf("GetReplicationTasks operation failed. Select failed: %v", err), + } + } +} + type timerTaskPageToken struct { TaskID int64 Timestamp time.Time @@ -1106,3 +1151,56 @@ func (m *sqlExecutionManager) RangeCompleteTimerTask( } return nil } + +func (m *sqlExecutionManager) PutReplicationTaskToDLQ(request *p.PutReplicationTaskToDLQRequest) error { + replicationTask := request.TaskInfo + blob, err := replicationTaskInfoToBlob(&sqlblobs.ReplicationTaskInfo{ + DomainID: sqldb.MustParseUUID(replicationTask.DomainID), + WorkflowID: &replicationTask.WorkflowID, + RunID: sqldb.MustParseUUID(replicationTask.RunID), + TaskType: common.Int16Ptr(int16(replicationTask.TaskType)), + FirstEventID: &replicationTask.FirstEventID, + NextEventID: &replicationTask.NextEventID, + Version: &replicationTask.Version, + LastReplicationInfo: toSqldbReplicationInfo(replicationTask.LastReplicationInfo), + ScheduledID: &replicationTask.ScheduledID, + BranchToken: replicationTask.BranchToken, + NewRunBranchToken: replicationTask.NewRunBranchToken, + ResetWorkflow: &replicationTask.ResetWorkflow, + }) + if err != nil { + return err + } + + row := &sqldb.ReplicationTaskDLQRow{ + SourceClusterName: request.SourceClusterName, + ShardID: m.shardID, + TaskID: replicationTask.TaskID, + Data: blob.Data, + DataEncoding: string(blob.Encoding), + } + + _, err = m.db.InsertIntoReplicationTasksDLQ(row) + + // Tasks are immutable. So it's fine if we already persisted it before. + // This can happen when tasks are retried (ack and cleanup can have lag on source side). + if err != nil && !isDupEntry(err) { + return &workflow.InternalServiceError{ + Message: fmt.Sprintf("Failed to create replication tasks. Error: %v", err), + } + } + + return nil +} + +func toSqldbReplicationInfo(info map[string]*p.ReplicationInfo) map[string]*sqlblobs.ReplicationInfo { + replicationInfoMap := make(map[string]*sqlblobs.ReplicationInfo) + for k, v := range info { + replicationInfoMap[k] = &sqlblobs.ReplicationInfo{ + Version: common.Int64Ptr(v.Version), + LastEventID: common.Int64Ptr(v.LastEventID), + } + } + + return replicationInfoMap +} diff --git a/common/persistence/sql/storage/mysql/execution.go b/common/persistence/sql/storage/mysql/execution.go index 9f19ff3ba2b..d4546d7a819 100644 --- a/common/persistence/sql/storage/mysql/execution.go +++ b/common/persistence/sql/storage/mysql/execution.go @@ -111,6 +111,13 @@ ORDER BY task_id LIMIT ?` deleteReplicationTaskQry = `DELETE FROM replication_tasks WHERE shard_id = ? AND task_id = ?` + getReplicationTasksDLQQry = `SELECT task_id, data, data_encoding FROM replication_tasks_dlq WHERE +source_cluster_name = ? AND +shard_id = ? AND +task_id > ? AND +task_id <= ? +ORDER BY task_id LIMIT ?` + bufferedEventsColumns = `shard_id, domain_id, workflow_id, run_id, data, data_encoding` createBufferedEventsQury = `INSERT INTO buffered_events(` + bufferedEventsColumns + `) VALUES (:shard_id, :domain_id, :workflow_id, :run_id, :data, :data_encoding)` @@ -118,6 +125,20 @@ VALUES (:shard_id, :domain_id, :workflow_id, :run_id, :data, :data_encoding)` deleteBufferedEventsQury = `DELETE FROM buffered_events WHERE shard_id=? AND domain_id=? AND workflow_id=? AND run_id=?` getBufferedEventsQury = `SELECT data, data_encoding FROM buffered_events WHERE shard_id=? AND domain_id=? AND workflow_id=? AND run_id=?` + + insertReplicationTaskDLQQry = ` +INSERT INTO replication_tasks_dlq + (source_cluster_name, + shard_id, + task_id, + data, + data_encoding) +VALUES (:source_cluster_name, + :shard_id, + :task_id, + :data, + :data_encoding) +` ) // InsertIntoExecutions inserts a row into executions table @@ -285,11 +306,29 @@ func (mdb *DB) InsertIntoReplicationTasks(rows []sqldb.ReplicationTasksRow) (sql // SelectFromReplicationTasks reads one or more rows from replication_tasks table func (mdb *DB) SelectFromReplicationTasks(filter *sqldb.ReplicationTasksFilter) ([]sqldb.ReplicationTasksRow, error) { var rows []sqldb.ReplicationTasksRow - err := mdb.conn.Select(&rows, getReplicationTasksQry, filter.ShardID, *filter.MinTaskID, *filter.MaxTaskID, *filter.PageSize) + err := mdb.conn.Select(&rows, getReplicationTasksQry, filter.ShardID, filter.MinTaskID, filter.MaxTaskID, filter.PageSize) return rows, err } // DeleteFromReplicationTasks deletes one or more rows from replication_tasks table -func (mdb *DB) DeleteFromReplicationTasks(filter *sqldb.ReplicationTasksFilter) (sql.Result, error) { - return mdb.conn.Exec(deleteReplicationTaskQry, filter.ShardID, *filter.TaskID) +func (mdb *DB) DeleteFromReplicationTasks(shardID, taskID int) (sql.Result, error) { + return mdb.conn.Exec(deleteReplicationTaskQry, shardID, taskID) +} + +// InsertIntoReplicationTasksDLQ inserts one or more rows into replication_tasks_dlq table +func (mdb *DB) InsertIntoReplicationTasksDLQ(row *sqldb.ReplicationTaskDLQRow) (sql.Result, error) { + return mdb.conn.NamedExec(insertReplicationTaskDLQQry, row) +} + +// SelectFromReplicationTasksDLQ reads one or more rows from replication_tasks_dlq table +func (mdb *DB) SelectFromReplicationTasksDLQ(filter *sqldb.ReplicationTasksDLQFilter) ([]sqldb.ReplicationTasksRow, error) { + var rows []sqldb.ReplicationTasksRow + err := mdb.conn.Select( + &rows, getReplicationTasksDLQQry, + filter.SourceClusterName, + filter.ShardID, + filter.MinTaskID, + filter.MaxTaskID, + filter.PageSize) + return rows, err } diff --git a/common/persistence/sql/storage/sqldb/interfaces.go b/common/persistence/sql/storage/sqldb/interfaces.go index d6515f9fb64..6cfba21abd1 100644 --- a/common/persistence/sql/storage/sqldb/interfaces.go +++ b/common/persistence/sql/storage/sqldb/interfaces.go @@ -99,7 +99,7 @@ type ( VersionHistoriesEncoding string } - // ExecutionsFilter contains the column names within domain table that + // ExecutionsFilter contains the column names within executions table that // can be used to filter results through a WHERE clause ExecutionsFilter struct { ShardID int @@ -121,7 +121,7 @@ type ( StartVersion int64 } - // CurrentExecutionsFilter contains the column names within domain table that + // CurrentExecutionsFilter contains the column names within current_executions table that // can be used to filter results through a WHERE clause CurrentExecutionsFilter struct { ShardID int64 @@ -140,7 +140,7 @@ type ( DataEncoding string } - // BufferedEventsFilter contains the column names within domain table that + // BufferedEventsFilter contains the column names within buffered_events table that // can be used to filter results through a WHERE clause BufferedEventsFilter struct { ShardID int @@ -159,7 +159,7 @@ type ( DataEncoding string } - // TasksFilter contains the column names within domain table that + // TasksFilter contains the column names within tasks table that // can be used to filter results through a WHERE clause TasksFilter struct { DomainID UUID @@ -184,7 +184,7 @@ type ( DataEncoding string } - // TaskListsFilter contains the column names within domain table that + // TaskListsFilter contains the column names within task_lists table that // can be used to filter results through a WHERE clause TaskListsFilter struct { ShardID int @@ -206,14 +206,29 @@ type ( DataEncoding string } - // ReplicationTasksFilter contains the column names within domain table that + // ReplicationTaskDLQRow represents a row in replication_tasks_dlq table + ReplicationTaskDLQRow struct { + SourceClusterName string + ShardID int + TaskID int64 + Data []byte + DataEncoding string + } + + // ReplicationTasksFilter contains the column names within replication_tasks table that // can be used to filter results through a WHERE clause ReplicationTasksFilter struct { ShardID int - TaskID *int64 - MinTaskID *int64 - MaxTaskID *int64 - PageSize *int + MinTaskID int64 + MaxTaskID int64 + PageSize int + } + + // ReplicationTasksDLQFilter contains the column names within replication_tasks_dlq table that + // can be used to filter results through a WHERE clause + ReplicationTasksDLQFilter struct { + ReplicationTasksFilter + SourceClusterName string } // TimerTasksRow represents a row in timer_tasks table @@ -225,7 +240,7 @@ type ( DataEncoding string } - // TimerTasksFilter contains the column names within domain table that + // TimerTasksFilter contains the column names within timer_tasks table that // can be used to filter results through a WHERE clause TimerTasksFilter struct { ShardID int @@ -249,7 +264,7 @@ type ( DataEncoding string } - // EventsFilter contains the column names within domain table that + // EventsFilter contains the column names within events table that // can be used to filter results through a WHERE clause EventsFilter struct { DomainID UUID @@ -315,7 +330,7 @@ type ( LastHeartbeatUpdatedTime time.Time } - // ActivityInfoMapsFilter contains the column names within domain table that + // ActivityInfoMapsFilter contains the column names within activity_info_maps table that // can be used to filter results through a WHERE clause ActivityInfoMapsFilter struct { ShardID int64 @@ -336,7 +351,7 @@ type ( DataEncoding string } - // TimerInfoMapsFilter contains the column names within domain table that + // TimerInfoMapsFilter contains the column names within timer_info_maps table that // can be used to filter results through a WHERE clause TimerInfoMapsFilter struct { ShardID int64 @@ -357,7 +372,7 @@ type ( DataEncoding string } - // ChildExecutionInfoMapsFilter contains the column names within domain table that + // ChildExecutionInfoMapsFilter contains the column names within child_execution_info_maps table that // can be used to filter results through a WHERE clause ChildExecutionInfoMapsFilter struct { ShardID int64 @@ -378,7 +393,7 @@ type ( DataEncoding string } - // RequestCancelInfoMapsFilter contains the column names within domain table that + // RequestCancelInfoMapsFilter contains the column names within request_cancel_info_maps table that // can be used to filter results through a WHERE clause RequestCancelInfoMapsFilter struct { ShardID int64 @@ -399,7 +414,7 @@ type ( DataEncoding string } - // SignalInfoMapsFilter contains the column names within domain table that + // SignalInfoMapsFilter contains the column names within signal_info_maps table that // can be used to filter results through a WHERE clause SignalInfoMapsFilter struct { ShardID int64 @@ -418,7 +433,7 @@ type ( SignalID string } - // SignalsRequestedSetsFilter contains the column names within domain table that + // SignalsRequestedSetsFilter contains the column names within signals_requested_sets table that // can be used to filter results through a WHERE clause SignalsRequestedSetsFilter struct { ShardID int64 @@ -443,7 +458,7 @@ type ( Encoding string } - // VisibilityFilter contains the column names within domain table that + // VisibilityFilter contains the column names within executions_visibility table that // can be used to filter results through a WHERE clause VisibilityFilter struct { DomainID string @@ -574,7 +589,12 @@ type ( SelectFromReplicationTasks(filter *ReplicationTasksFilter) ([]ReplicationTasksRow, error) // DeleteFromReplicationTasks deletes a row from replication_tasks table // Required filter params - {shardID, taskID} - DeleteFromReplicationTasks(filter *ReplicationTasksFilter) (sql.Result, error) + DeleteFromReplicationTasks(shardID, taskID int) (sql.Result, error) + // InsertIntoReplicationTasksDLQ puts the replication task into DLQ + InsertIntoReplicationTasksDLQ(row *ReplicationTaskDLQRow) (sql.Result, error) + // SelectFromReplicationTasksDLQ returns one or more rows from replication_tasks_dlq table + // Required filter params - {sourceClusterName, shardID, minTaskID, pageSize} + SelectFromReplicationTasksDLQ(filter *ReplicationTasksDLQFilter) ([]ReplicationTasksRow, error) ReplaceIntoActivityInfoMaps(rows []ActivityInfoMapsRow) (sql.Result, error) // SelectFromActivityInfoMaps returns one or more rows from activity_info_maps diff --git a/common/service/config/config.go b/common/service/config/config.go index 9537badc775..94b1a9ad79d 100644 --- a/common/service/config/config.go +++ b/common/service/config/config.go @@ -261,18 +261,6 @@ type ( ReplicationConsumerConfig struct { // Type determines how we consume replication tasks. It can be either kafka(default) or rpc. Type string `yaml:"type"` - // FetcherConfig is the config for replication task fetcher. - FetcherConfig *FetcherConfig `yaml:"fetcher"` - // ProcessorConfig is the config for replication task processor. - ProcessorConfig *ReplicationTaskProcessorConfig `yaml:"processor"` - } - - // FetcherConfig is the config for replication task fetcher. - FetcherConfig struct { - RPCParallelism int `yaml:"rpcParallelism"` - AggregationIntervalSecs int `yaml:"aggregationIntervalSecs"` - ErrorRetryWaitSecs int `yaml:"errorRetryWaitSecs"` - TimerJitterCoefficient float64 `yaml:"timerJitterCoefficient"` } // ReplicationTaskProcessorConfig is the config for replication task processor. diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 73171120f7b..9550f8501f5 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -190,6 +190,13 @@ var keys = map[Key]string{ DecisionHeartbeatTimeout: "history.decisionHeartbeatTimeout", ParentClosePolicyThreshold: "history.parentClosePolicyThreshold", NumParentClosePolicySystemWorkflows: "history.numParentClosePolicySystemWorkflows", + ReplicationTaskFetcherParallelism: "history.ReplicationTaskFetcherParallelism", + ReplicationTaskFetcherAggregationInterval: "history.ReplicationTaskFetcherAggregationInterval", + ReplicationTaskFetcherTimerJitterCoefficient: "history.ReplicationTaskFetcherTimerJitterCoefficient", + ReplicationTaskFetcherErrorRetryWait: "history.ReplicationTaskFetcherErrorRetryWait", + ReplicationTaskProcessorErrorRetryWait: "history.ReplicationTaskProcessorErrorRetryWait", + ReplicationTaskProcessorErrorRetryMaxAttempts: "history.ReplicationTaskProcessorErrorRetryMaxAttempts", + ReplicationTaskProcessorNoTaskInitialWait: "history.ReplicationTaskProcessorNoTaskInitialWait", WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS", WorkerReplicatorMetaTaskConcurrency: "worker.replicatorMetaTaskConcurrency", @@ -566,6 +573,21 @@ const ( // EnableParentClosePolicyWorker decides whether or not enable system workers for processing parent close policy task EnableParentClosePolicyWorker + //ReplicationTaskFetcherParallelism determines how many go routines we spin up for fetching tasks + ReplicationTaskFetcherParallelism + // ReplicationTaskFetcherAggregationInterval determines how frequently the fetch requests are sent + ReplicationTaskFetcherAggregationInterval + // ReplicationTaskFetcherTimerJitterCoefficient is the jitter for fetcher timer + ReplicationTaskFetcherTimerJitterCoefficient + // ReplicationTaskFetcherErrorRetryWait is the wait time when fetcher encounters error + ReplicationTaskFetcherErrorRetryWait + // ReplicationTaskProcessorErrorRetryWait is the initial retry wait when we see errors in applying replication tasks + ReplicationTaskProcessorErrorRetryWait + // ReplicationTaskProcessorErrorRetryMaxAttempts is the max retry attempts for applying replication tasks + ReplicationTaskProcessorErrorRetryMaxAttempts + // ReplicationTaskProcessorNoTaskInitialWait is the wait time when not ask is returned + ReplicationTaskProcessorNoTaskInitialWait + // lastKeyForTest must be the last one in this const group for testing purpose lastKeyForTest ) diff --git a/host/ndc/nDC_integration_test.go b/host/ndc/nDC_integration_test.go index 8b7f9e69b62..7dc65a68ac1 100644 --- a/host/ndc/nDC_integration_test.go +++ b/host/ndc/nDC_integration_test.go @@ -26,16 +26,16 @@ import ( "fmt" "io/ioutil" "os" + "sync/atomic" "testing" "time" + "go.uber.org/yarpc" + "github.com/golang/mock/gomock" "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - "go.uber.org/zap" - "gopkg.in/yaml.v2" - "github.com/uber/cadence/.gen/go/admin" "github.com/uber/cadence/.gen/go/cadence/workflowservicetest" "github.com/uber/cadence/.gen/go/history" @@ -52,6 +52,8 @@ import ( test "github.com/uber/cadence/common/testing" "github.com/uber/cadence/environment" "github.com/uber/cadence/host" + "go.uber.org/zap" + "gopkg.in/yaml.v2" ) type ( @@ -65,11 +67,13 @@ type ( serializer persistence.PayloadSerializer logger log.Logger - domainName string - domainID string - version int64 - versionIncrement int64 - mockFrontendClient map[string]frontend.Client + domainName string + domainID string + version int64 + versionIncrement int64 + mockFrontendClient map[string]frontend.Client + standByReplicationTasksChan chan *replicator.ReplicationTask + standByTaskID int64 } ) @@ -110,16 +114,18 @@ func (s *nDCIntegrationTestSuite) SetupSuite() { clusterConfigs[0].WorkerConfig = &host.WorkerConfig{} clusterConfigs[1].WorkerConfig = &host.WorkerConfig{} + s.standByReplicationTasksChan = make(chan *replicator.ReplicationTask, 100) + + s.standByTaskID = 0 s.mockFrontendClient = make(map[string]frontend.Client) controller := gomock.NewController(s.T()) mockStandbyClient := workflowservicetest.NewMockClient(controller) - mockStandbyClient.EXPECT().GetReplicationMessages(gomock.Any(), gomock.Any()).Return(&replicator.GetReplicationMessagesResponse{ - MessagesByShard: make(map[int32]*replicator.ReplicationMessages), - }, nil).AnyTimes() + mockStandbyClient.EXPECT().GetReplicationMessages(gomock.Any(), gomock.Any()).DoAndReturn(s.GetReplicationMessagesMock).AnyTimes() mockOtherClient := workflowservicetest.NewMockClient(controller) - mockOtherClient.EXPECT().GetReplicationMessages(gomock.Any(), gomock.Any()).Return(&replicator.GetReplicationMessagesResponse{ - MessagesByShard: make(map[int32]*replicator.ReplicationMessages), - }, nil).AnyTimes() + mockOtherClient.EXPECT().GetReplicationMessages(gomock.Any(), gomock.Any()).Return( + &replicator.GetReplicationMessagesResponse{ + MessagesByShard: make(map[int32]*replicator.ReplicationMessages), + }, nil).AnyTimes() s.mockFrontendClient["standby"] = mockStandbyClient s.mockFrontendClient["other"] = mockOtherClient clusterConfigs[0].MockFrontendClient = s.mockFrontendClient @@ -135,6 +141,39 @@ func (s *nDCIntegrationTestSuite) SetupSuite() { s.generator = test.InitializeHistoryEventGenerator(s.domainName, s.version) } +func (s *nDCIntegrationTestSuite) GetReplicationMessagesMock( + ctx context.Context, + request *replicator.GetReplicationMessagesRequest, + opts ...yarpc.CallOption, +) (*replicator.GetReplicationMessagesResponse, error) { + select { + case task := <-s.standByReplicationTasksChan: + taskID := atomic.AddInt64(&s.standByTaskID, 1) + task.SourceTaskId = common.Int64Ptr(taskID) + tasks := []*replicator.ReplicationTask{task} + for len(s.standByReplicationTasksChan) > 0 { + task = <-s.standByReplicationTasksChan + taskID := atomic.AddInt64(&s.standByTaskID, 1) + task.SourceTaskId = common.Int64Ptr(taskID) + tasks = append(tasks, task) + } + + replicationMessage := &replicator.ReplicationMessages{ + ReplicationTasks: tasks, + LastRetrivedMessageId: tasks[len(tasks)-1].SourceTaskId, + HasMore: common.BoolPtr(true), + } + + return &replicator.GetReplicationMessagesResponse{ + MessagesByShard: map[int32]*replicator.ReplicationMessages{0: replicationMessage}, + }, nil + default: + return &replicator.GetReplicationMessagesResponse{ + MessagesByShard: make(map[int32]*replicator.ReplicationMessages), + }, nil + } +} + func (s *nDCIntegrationTestSuite) SetupTest() { // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil s.Assertions = require.New(s.T()) @@ -185,39 +224,56 @@ func (s *nDCIntegrationTestSuite) TestSingleBranch() { historyClient, ) - // get replicated history events from passive side - passiveClient := s.active.GetFrontendClient() - replicatedHistory, err := passiveClient.GetWorkflowExecutionHistory( - s.createContext(), - &shared.GetWorkflowExecutionHistoryRequest{ - Domain: common.StringPtr(s.domainName), - Execution: &shared.WorkflowExecution{ - WorkflowId: common.StringPtr(workflowID), - RunId: common.StringPtr(runID), - }, - MaximumPageSize: common.Int32Ptr(1000), - NextPageToken: nil, - WaitForNewEvent: common.BoolPtr(false), - HistoryEventFilterType: shared.HistoryEventFilterTypeAllEvent.Ptr(), + err := s.verifyEventHistory(workflowID, runID, historyBatch) + s.Require().NoError(err) + } +} + +func (s *nDCIntegrationTestSuite) verifyEventHistory( + workflowID string, + runID string, + historyBatch []*workflow.History, +) error { + // get replicated history events from passive side + passiveClient := s.active.GetFrontendClient() + replicatedHistory, err := passiveClient.GetWorkflowExecutionHistory( + s.createContext(), + &shared.GetWorkflowExecutionHistoryRequest{ + Domain: common.StringPtr(s.domainName), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), }, - ) - s.Nil(err, "Failed to get history event from passive side") - - // compare origin events with replicated events - batchIndex := 0 - batch := historyBatch[batchIndex].Events - eventIndex := 0 - for _, event := range replicatedHistory.GetHistory().GetEvents() { - if eventIndex >= len(batch) { - batchIndex++ - batch = historyBatch[batchIndex].Events - eventIndex = 0 - } - originEvent := batch[eventIndex] - eventIndex++ - s.Equal(originEvent.GetEventType().String(), event.GetEventType().String(), "The replicated event and the origin event are not the same") + MaximumPageSize: common.Int32Ptr(1000), + NextPageToken: nil, + WaitForNewEvent: common.BoolPtr(false), + HistoryEventFilterType: shared.HistoryEventFilterTypeAllEvent.Ptr(), + }, + ) + + if err != nil { + return fmt.Errorf("failed to get history event from passive side: %v", err) + } + + // compare origin events with replicated events + batchIndex := 0 + batch := historyBatch[batchIndex].Events + eventIndex := 0 + for _, event := range replicatedHistory.GetHistory().GetEvents() { + if eventIndex >= len(batch) { + batchIndex++ + batch = historyBatch[batchIndex].Events + eventIndex = 0 + } + originEvent := batch[eventIndex] + eventIndex++ + if originEvent.GetEventType() != event.GetEventType() { + return fmt.Errorf("the replicated event (%v) and the origin event (%v) are not the same", + originEvent.GetEventType().String(), event.GetEventType().String()) } } + + return nil } func (s *nDCIntegrationTestSuite) TestMultipleBranches() { @@ -1356,6 +1412,26 @@ func (s *nDCIntegrationTestSuite) toThriftDataBlob( } } +func (s *nDCIntegrationTestSuite) generateEventBlobs( + workflowID string, + runID string, + workflowType string, + tasklist string, + batch *shared.History, +) (*persistence.DataBlob, *persistence.DataBlob) { + // TODO temporary code to generate next run first event + // we should generate these as part of modeled based testing + lastEvent := batch.Events[len(batch.Events)-1] + newRunEventBlob := s.generateNewRunHistory( + lastEvent, s.domainName, workflowID, runID, lastEvent.GetVersion(), workflowType, tasklist, + ) + // must serialize events batch after attempt on continue as new as generateNewRunHistory will + // modify the NewExecutionRunId attr + eventBlob, err := s.serializer.SerializeBatchEvents(batch.Events, common.EncodingTypeThriftRW) + s.NoError(err) + return eventBlob, newRunEventBlob +} + func (s *nDCIntegrationTestSuite) applyEvents( workflowID string, runID string, @@ -1365,22 +1441,10 @@ func (s *nDCIntegrationTestSuite) applyEvents( eventBatches []*shared.History, historyClient host.HistoryClient, ) { - for _, batch := range eventBatches { + eventBlob, newRunEventBlob := s.generateEventBlobs(workflowID, runID, workflowType, tasklist, batch) - // TODO temporary code to generate next run first event - // we should generate these as part of modeled based testing - lastEvent := batch.Events[len(batch.Events)-1] - newRunEventBlob := s.generateNewRunHistory( - lastEvent, s.domainName, workflowID, runID, lastEvent.GetVersion(), workflowType, tasklist, - ) - - // must serialize events batch after attempt on continue as new as generateNewRunHistory will - // modify the NewExecutionRunId attr - eventBlob, err := s.serializer.SerializeBatchEvents(batch.Events, common.EncodingTypeThriftRW) - s.NoError(err) - - err = historyClient.ReplicateEventsV2(s.createContext(), &history.ReplicateEventsV2Request{ + err := historyClient.ReplicateEventsV2(s.createContext(), &history.ReplicateEventsV2Request{ DomainUUID: common.StringPtr(s.domainID), WorkflowExecution: &shared.WorkflowExecution{ WorkflowId: common.StringPtr(workflowID), @@ -1394,6 +1458,38 @@ func (s *nDCIntegrationTestSuite) applyEvents( } } +func (s *nDCIntegrationTestSuite) applyEventsThroughFetcher( + workflowID string, + runID string, + workflowType string, + tasklist string, + versionHistory *persistence.VersionHistory, + eventBatches []*shared.History, + historyClient host.HistoryClient, + frontend *workflowservicetest.MockClient, +) { + for _, batch := range eventBatches { + eventBlob, newRunEventBlob := s.generateEventBlobs(workflowID, runID, workflowType, tasklist, batch) + + taskType := replicator.ReplicationTaskTypeHistoryV2 + replicationTask := &replicator.ReplicationTask{ + TaskType: &taskType, + SourceTaskId: common.Int64Ptr(1), + HistoryTaskV2Attributes: &replicator.HistoryTaskV2Attributes{ + TaskId: common.Int64Ptr(1), + DomainId: common.StringPtr(s.domainID), + WorkflowId: common.StringPtr(workflowID), + RunId: common.StringPtr(runID), + VersionHistoryItems: s.toThriftVersionHistoryItems(versionHistory), + Events: s.toThriftDataBlob(eventBlob), + NewRunEvents: s.toThriftDataBlob(newRunEventBlob), + }, + } + + s.standByReplicationTasksChan <- replicationTask + } +} + func (s *nDCIntegrationTestSuite) eventBatchesToVersionHistory( versionHistory *persistence.VersionHistory, eventBatches []*shared.History, diff --git a/host/ndc/replication_integration_test.go b/host/ndc/replication_integration_test.go new file mode 100644 index 00000000000..76530c3c648 --- /dev/null +++ b/host/ndc/replication_integration_test.go @@ -0,0 +1,141 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package ndc + +import ( + "math" + "time" + + "github.com/uber/cadence/common/persistence" + + "github.com/pborman/uuid" + "github.com/uber/cadence/.gen/go/cadence/workflowservicetest" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + test "github.com/uber/cadence/common/testing" +) + +func (s *nDCIntegrationTestSuite) TestReplicationMessageApplication() { + + workflowID := "replication-message-test" + uuid.New() + runID := uuid.New() + workflowType := "event-generator-workflow-type" + tasklist := "event-generator-taskList" + + // active has initial version 0 + historyClient := s.active.GetHistoryClient() + + var historyBatch []*shared.History + s.generator = test.InitializeHistoryEventGenerator(s.domainName, 1) + + for s.generator.HasNextVertex() { + events := s.generator.GetNextVertices() + historyEvents := &shared.History{} + for _, event := range events { + historyEvents.Events = append(historyEvents.Events, event.GetData().(*shared.HistoryEvent)) + } + historyBatch = append(historyBatch, historyEvents) + } + + versionHistory := s.eventBatchesToVersionHistory(nil, historyBatch) + standbyClient := s.mockFrontendClient["standby"].(*workflowservicetest.MockClient) + + s.applyEventsThroughFetcher( + workflowID, + runID, + workflowType, + tasklist, + versionHistory, + historyBatch, + historyClient, + standbyClient, + ) + + // Applying replication messages through fetcher is Async. + // So we need to retry a couple of times. + for i := 0; i < 10; i++ { + time.Sleep(time.Second) + err := s.verifyEventHistory(workflowID, runID, historyBatch) + if err == nil { + return + } + } + + s.Fail("Verification of replicated messages failed") +} + +func (s *nDCIntegrationTestSuite) TestReplicationMessageDLQ() { + + workflowID := "replication-message-dlq-test" + uuid.New() + runID := uuid.New() + workflowType := "event-generator-workflow-type" + tasklist := "event-generator-taskList" + + // active has initial version 0 + historyClient := s.active.GetHistoryClient() + + var historyBatch []*shared.History + s.generator = test.InitializeHistoryEventGenerator(s.domainName, 1) + + for s.generator.HasNextVertex() { + events := s.generator.GetNextVertices() + historyEvents := &shared.History{} + for _, event := range events { + historyEvents.Events = append(historyEvents.Events, event.GetData().(*shared.HistoryEvent)) + } + historyBatch = append(historyBatch, historyEvents) + } + + versionHistory := s.eventBatchesToVersionHistory(nil, historyBatch) + + s.NotNil(historyBatch) + historyBatch[0].Events[1].Version = common.Int64Ptr(2) + standbyClient := s.mockFrontendClient["standby"].(*workflowservicetest.MockClient) + + s.applyEventsThroughFetcher( + workflowID, + runID, + workflowType, + tasklist, + versionHistory, + historyBatch, + historyClient, + standbyClient, + ) + + execMgrFactory := s.active.GetExecutionManagerFactory() + executionManager, err := execMgrFactory.NewExecutionManager(0) + s.NoError(err) + + // Applying replication messages through fetcher is Async. + // So we need to retry a couple of times. + for i := 0; i < 10; i++ { + time.Sleep(time.Second) + request := persistence.NewGetReplicationTasksFromDLQRequest( + "standby", -1, math.MaxInt64, math.MaxInt64, nil) + response, err := executionManager.GetReplicationTasksFromDLQ(request) + if err == nil && len(response.Tasks) == len(historyBatch) { + return + } + } + + s.Fail("Failed to get messages from DLQ.") +} diff --git a/host/onebox.go b/host/onebox.go index 884ae45e9e5..bc051f00f1e 100644 --- a/host/onebox.go +++ b/host/onebox.go @@ -74,6 +74,7 @@ type Cadence interface { FrontendAddress() string GetFrontendService() service.Service GetHistoryClient() historyserviceclient.Interface + GetExecutionManagerFactory() persistence.ExecutionManagerFactory } type ( @@ -521,13 +522,21 @@ func (c *cadenceImpl) startHistory( service := service.New(params) c.historyService = service hConfig := c.historyConfig - historyConfig := history.NewConfig(dynamicconfig.NewCollection(params.DynamicConfig, c.logger), - hConfig.NumHistoryShards, config.StoreTypeCassandra, params.PersistenceConfig.IsAdvancedVisibilityConfigExist()) + historyConfig := history.NewConfig( + dynamicconfig.NewCollection(params.DynamicConfig, c.logger), + hConfig.NumHistoryShards, + config.StoreTypeCassandra, + params.PersistenceConfig.IsAdvancedVisibilityConfigExist(), + ) historyConfig.HistoryMgrNumConns = dynamicconfig.GetIntPropertyFn(hConfig.NumHistoryShards) historyConfig.ExecutionMgrNumConns = dynamicconfig.GetIntPropertyFn(hConfig.NumHistoryShards) historyConfig.DecisionHeartbeatTimeout = dynamicconfig.GetDurationPropertyFnFilteredByDomain(time.Second * 5) historyConfig.TimerProcessorHistoryArchivalSizeLimit = dynamicconfig.GetIntPropertyFn(5 * 1024) historyConfig.EnableNDC = dynamicconfig.GetBoolPropertyFnFilteredByDomain(enableNDC) + historyConfig.ReplicationTaskFetcherAggregationInterval = dynamicconfig.GetDurationPropertyFn(200 * time.Millisecond) + historyConfig.ReplicationTaskFetcherErrorRetryWait = dynamicconfig.GetDurationPropertyFn(50 * time.Millisecond) + historyConfig.ReplicationTaskProcessorErrorRetryWait = dynamicconfig.GetDurationPropertyFn(time.Millisecond) + historyConfig.ReplicationTaskProcessorErrorRetryMaxAttempts = dynamicconfig.GetIntPropertyFn(1) if c.workerConfig.EnableIndexer { historyConfig.AdvancedVisibilityWritingMode = dynamicconfig.GetStringPropertyFn(common.AdvancedVisibilityWritingModeDual) @@ -781,6 +790,10 @@ func (c *cadenceImpl) createSystemDomain() error { return nil } +func (c *cadenceImpl) GetExecutionManagerFactory() persistence.ExecutionManagerFactory { + return c.executionMgrFactory +} + func newMembershipFactory(serviceName string, hosts map[string][]string) service.MembershipMonitorFactory { return &membershipFactoryImpl{ serviceName: serviceName, diff --git a/host/testcluster.go b/host/testcluster.go index 158c2e0d0f3..9bc0e29d8e3 100644 --- a/host/testcluster.go +++ b/host/testcluster.go @@ -272,3 +272,7 @@ func (tc *TestCluster) GetAdminClient() AdminClient { func (tc *TestCluster) GetHistoryClient() HistoryClient { return tc.host.GetHistoryClient() } + +func (tc *TestCluster) GetExecutionManagerFactory() persistence.ExecutionManagerFactory { + return tc.host.GetExecutionManagerFactory() +} diff --git a/schema/mysql/v57/cadence/schema.sql b/schema/mysql/v57/cadence/schema.sql index c478c063d4d..431e21e6f3c 100644 --- a/schema/mysql/v57/cadence/schema.sql +++ b/schema/mysql/v57/cadence/schema.sql @@ -106,6 +106,16 @@ CREATE TABLE replication_tasks ( PRIMARY KEY (shard_id, task_id) ); +CREATE TABLE replication_tasks_dlq ( + source_cluster_name VARCHAR(255) NOT NULL, + shard_id INT NOT NULL, + task_id BIGINT NOT NULL, + -- + data BLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (source_cluster_name, shard_id, task_id) +); + CREATE TABLE timer_tasks ( shard_id INT NOT NULL, visibility_timestamp DATETIME(6) NOT NULL, diff --git a/schema/mysql/v57/cadence/versioned/v0.3/manifest.json b/schema/mysql/v57/cadence/versioned/v0.3/manifest.json new file mode 100644 index 00000000000..ff6fc2d483c --- /dev/null +++ b/schema/mysql/v57/cadence/versioned/v0.3/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.3", + "MinCompatibleVersion": "0.3", + "Description": "add replication_tasks_dlq table", + "SchemaUpdateCqlFiles": [ + "replication_tasks_dlq.sql" + ] +} diff --git a/schema/mysql/v57/cadence/versioned/v0.3/replication_tasks_dlq.sql b/schema/mysql/v57/cadence/versioned/v0.3/replication_tasks_dlq.sql new file mode 100644 index 00000000000..3947a7cc8fb --- /dev/null +++ b/schema/mysql/v57/cadence/versioned/v0.3/replication_tasks_dlq.sql @@ -0,0 +1,9 @@ +CREATE TABLE replication_tasks_dlq ( + source_cluster_name VARCHAR(255) NOT NULL, + shard_id INT NOT NULL, + task_id BIGINT NOT NULL, + -- + data BLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (source_cluster_name, shard_id, task_id) +); diff --git a/schema/mysql/version.go b/schema/mysql/version.go index 50cac4e5e22..bbe7dfce3c9 100644 --- a/schema/mysql/version.go +++ b/schema/mysql/version.go @@ -23,7 +23,7 @@ package mysql // NOTE: whenever there is a new data base schema update, plz update the following versions // Version is the MySQL database release version -const Version = "0.2" +const Version = "0.3" // VisibilityVersion is the MySQL visibility database release version const VisibilityVersion = "0.1" diff --git a/service/history/handler.go b/service/history/handler.go index 0d65dcbe7f3..739341eb71e 100644 --- a/service/history/handler.go +++ b/service/history/handler.go @@ -44,7 +44,6 @@ import ( "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/quotas" "github.com/uber/cadence/common/service" - "github.com/uber/cadence/service/worker/replicator" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/yarpc/yarpcerrors" ) @@ -72,7 +71,6 @@ type ( publisher messaging.Producer rateLimiter quotas.Limiter replicationTaskFetchers *ReplicationTaskFetchers - domainReplicator replicator.DomainReplicator service.Service } ) @@ -104,8 +102,6 @@ func NewHandler( domainCache cache.DomainCache, publicClient workflowserviceclient.Interface, ) *Handler { - domainReplicator := replicator.NewDomainReplicator(metadataMgr, sVice.GetLogger()) - handler := &Handler{ Service: sVice, config: config, @@ -121,8 +117,7 @@ func NewHandler( return float64(config.RPS()) }, ), - publicClient: publicClient, - domainReplicator: domainReplicator, + publicClient: publicClient, } // prevent us from trying to serve requests before shard controller is started and ready @@ -176,6 +171,7 @@ func (h *Handler) Start() error { h.replicationTaskFetchers = NewReplicationTaskFetchers( h.GetLogger(), + h.config, h.GetClusterMetadata().GetReplicationConsumerConfig(), h.Service.GetClusterMetadata(), h.Service.GetClientBean()) @@ -213,7 +209,7 @@ func (h *Handler) Stop() { // CreateEngine is implementation for HistoryEngineFactory used for creating the engine instance for shard func (h *Handler) CreateEngine(context ShardContext) Engine { return NewEngineWithShardContext(context, h.visibilityMgr, h.matchingServiceClient, h.historyServiceClient, - h.publicClient, h.historyEventNotifier, h.publisher, h.config, h.replicationTaskFetchers, h.domainReplicator, h.rawMatchingClient) + h.publicClient, h.historyEventNotifier, h.publisher, h.config, h.replicationTaskFetchers, h.rawMatchingClient) } // Health is for health check diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 9534476f43c..e1cb91bc469 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -51,7 +51,6 @@ import ( "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/service/config" warchiver "github.com/uber/cadence/service/worker/archiver" - "github.com/uber/cadence/service/worker/replicator" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" "go.uber.org/yarpc/yarpcerrors" "golang.org/x/net/context" @@ -195,7 +194,6 @@ func NewEngineWithShardContext( publisher messaging.Producer, config *Config, replicationTaskFetchers *ReplicationTaskFetchers, - domainReplicator replicator.DomainReplicator, rawMatchingClient matching.Client, ) Engine { currentClusterName := shard.GetService().GetClusterMetadata().GetCurrentClusterName() @@ -282,7 +280,12 @@ func NewEngineWithShardContext( var replicationTaskProcessors []*ReplicationTaskProcessor for _, replicationTaskFetcher := range replicationTaskFetchers.GetFetchers() { - replicationTaskProcessor := NewReplicationTaskProcessor(shard, historyEngImpl, domainReplicator, shard.GetMetricsClient(), replicationTaskFetcher) + replicationTaskProcessor := NewReplicationTaskProcessor( + shard, + historyEngImpl, + config, + shard.GetMetricsClient(), + replicationTaskFetcher) replicationTaskProcessors = append(replicationTaskProcessors, replicationTaskProcessor) } historyEngImpl.replicationTaskProcessors = replicationTaskProcessors diff --git a/service/history/replicationTaskFetcher.go b/service/history/replicationTaskFetcher.go index b28f469b3e8..d4797bc73f7 100644 --- a/service/history/replicationTaskFetcher.go +++ b/service/history/replicationTaskFetcher.go @@ -33,7 +33,7 @@ import ( "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" - "github.com/uber/cadence/common/service/config" + serviceConfig "github.com/uber/cadence/common/service/config" ) const ( @@ -46,7 +46,7 @@ type ( ReplicationTaskFetcher struct { status int32 sourceCluster string - config *config.FetcherConfig + config *Config logger log.Logger remotePeer workflowserviceclient.Interface requestChan chan *request @@ -64,14 +64,14 @@ type ( // NewReplicationTaskFetchers creates an instance of ReplicationTaskFetchers with given configs. func NewReplicationTaskFetchers( logger log.Logger, - consumerConfig *config.ReplicationConsumerConfig, + config *Config, + consumerConfig *serviceConfig.ReplicationConsumerConfig, clusterMetadata cluster.Metadata, clientBean client.Bean, ) *ReplicationTaskFetchers { var fetchers []*ReplicationTaskFetcher - if consumerConfig.Type == config.ReplicationConsumerTypeRPC { - fetcherConfig := consumerConfig.FetcherConfig + if consumerConfig.Type == serviceConfig.ReplicationConsumerTypeRPC { for clusterName, info := range clusterMetadata.GetAllClusterInfo() { if !info.Enabled { continue @@ -79,11 +79,10 @@ func NewReplicationTaskFetchers( if clusterName != clusterMetadata.GetCurrentClusterName() { remoteFrontendClient := clientBean.GetRemoteFrontendClient(clusterName) - fetcher := newReplicationTaskFetcher(logger, clusterName, fetcherConfig, remoteFrontendClient) + fetcher := newReplicationTaskFetcher(logger, clusterName, config, remoteFrontendClient) fetchers = append(fetchers, fetcher) } } - } return &ReplicationTaskFetchers{ @@ -126,14 +125,14 @@ func (f *ReplicationTaskFetchers) GetFetchers() []*ReplicationTaskFetcher { func newReplicationTaskFetcher( logger log.Logger, sourceCluster string, - config *config.FetcherConfig, + config *Config, sourceFrontend workflowserviceclient.Interface, ) *ReplicationTaskFetcher { return &ReplicationTaskFetcher{ status: common.DaemonStatusInitialized, config: config, - logger: logger, + logger: logger.WithTags(tag.ClusterName(sourceCluster)), remotePeer: sourceFrontend, sourceCluster: sourceCluster, requestChan: make(chan *request, requestChanBufferSize), @@ -147,10 +146,10 @@ func (f *ReplicationTaskFetcher) Start() { return } - for i := 0; i < f.config.RPCParallelism; i++ { + for i := 0; i < f.config.ReplicationTaskFetcherParallelism(); i++ { go f.fetchTasks() } - f.logger.Info("Replication task fetcher started.", tag.ClusterName(f.sourceCluster), tag.Counter(f.config.RPCParallelism)) + f.logger.Info("Replication task fetcher started.", tag.Counter(f.config.ReplicationTaskFetcherParallelism())) } // Stop stops the fetcher @@ -160,19 +159,18 @@ func (f *ReplicationTaskFetcher) Stop() { } close(f.done) - f.logger.Info("Replication task fetcher stopped.", tag.ClusterName(f.sourceCluster)) + f.logger.Info("Replication task fetcher stopped.") } // fetchTasks collects getReplicationTasks request from shards and send out aggregated request to source frontend. func (f *ReplicationTaskFetcher) fetchTasks() { timer := time.NewTimer(backoff.JitDuration( - time.Duration(f.config.AggregationIntervalSecs)*time.Second, - f.config.TimerJitterCoefficient, + f.config.ReplicationTaskFetcherAggregationInterval(), + f.config.ReplicationTaskFetcherTimerJitterCoefficient(), )) requestByShard := make(map[int32]*request) -Loop: for { select { case request := <-f.requestChan: @@ -188,54 +186,69 @@ Loop: requestByShard[request.token.GetShardID()] = request case <-timer.C: - if len(requestByShard) == 0 { - // We don't receive tasks from previous fetch so processors are all sleeping. - f.logger.Debug("Skip fetching as no processor is asking for tasks.") + // When timer fires, we collect all the requests we have so far and attempt to send them to remote. + err := f.fetchAndDistributeTasks(requestByShard) + if err != nil { + timer.Reset(backoff.JitDuration( + f.config.ReplicationTaskFetcherErrorRetryWait(), + f.config.ReplicationTaskFetcherTimerJitterCoefficient(), + )) + } else { timer.Reset(backoff.JitDuration( - time.Duration(f.config.AggregationIntervalSecs)*time.Second, - f.config.TimerJitterCoefficient, + f.config.ReplicationTaskFetcherAggregationInterval(), + f.config.ReplicationTaskFetcherTimerJitterCoefficient(), )) - continue Loop } + case <-f.done: + timer.Stop() + return + } + } +} - // When timer fires, we collect all the requests we have so far and attempt to send them to remote. - var tokens []*r.ReplicationToken - for _, request := range requestByShard { - tokens = append(tokens, request.token) - } +func (f *ReplicationTaskFetcher) fetchAndDistributeTasks(requestByShard map[int32]*request) error { + if len(requestByShard) == 0 { + // We don't receive tasks from previous fetch so processors are all sleeping. + f.logger.Debug("Skip fetching as no processor is asking for tasks.") + return nil + } - ctx, cancel := context.WithTimeout(context.Background(), fetchTaskRequestTimeout) - request := &r.GetReplicationMessagesRequest{Tokens: tokens} - response, err := f.remotePeer.GetReplicationMessages(ctx, request) - cancel() - if err != nil { - f.logger.Error("Failed to get replication tasks", tag.Error(err)) - timer.Reset(backoff.JitDuration(time.Duration( - f.config.ErrorRetryWaitSecs)*time.Second, - f.config.TimerJitterCoefficient, - )) - continue Loop - } + messagesByShard, err := f.getMessages(requestByShard) + if err != nil { + f.logger.Error("Failed to get replication tasks", tag.Error(err)) + return err + } - f.logger.Debug("Successfully fetched replication tasks.", tag.Counter(len(response.MessagesByShard))) + f.logger.Debug("Successfully fetched replication tasks.", tag.Counter(len(messagesByShard))) - for shardID, tasks := range response.MessagesByShard { - request := requestByShard[shardID] - request.respChan <- tasks - close(request.respChan) - delete(requestByShard, shardID) - } + for shardID, tasks := range messagesByShard { + request := requestByShard[shardID] + request.respChan <- tasks + close(request.respChan) + delete(requestByShard, shardID) + } - timer.Reset(backoff.JitDuration(time.Duration( - f.config.AggregationIntervalSecs)*time.Second, - f.config.TimerJitterCoefficient, - )) + return nil +} - case <-f.done: - timer.Stop() - return - } +func (f *ReplicationTaskFetcher) getMessages( + requestByShard map[int32]*request, +) (map[int32]*r.ReplicationMessages, error) { + var tokens []*r.ReplicationToken + for _, request := range requestByShard { + tokens = append(tokens, request.token) } + + ctx, cancel := context.WithTimeout(context.Background(), fetchTaskRequestTimeout) + defer cancel() + + request := &r.GetReplicationMessagesRequest{Tokens: tokens} + response, err := f.remotePeer.GetReplicationMessages(ctx, request) + if err != nil { + return nil, err + } + + return response.GetMessagesByShard(), err } // GetSourceCluster returns the source cluster for the fetcher diff --git a/service/history/replicationTaskProcessor.go b/service/history/replicationTaskProcessor.go index d03cc2ecb2b..d8a80eb4570 100644 --- a/service/history/replicationTaskProcessor.go +++ b/service/history/replicationTaskProcessor.go @@ -22,11 +22,10 @@ package history import ( "context" + "fmt" "sync/atomic" "time" - "go.uber.org/yarpc/yarpcerrors" - h "github.com/uber/cadence/.gen/go/history" r "github.com/uber/cadence/.gen/go/replicator" "github.com/uber/cadence/.gen/go/shared" @@ -36,15 +35,15 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" - "github.com/uber/cadence/service/worker/replicator" + "github.com/uber/cadence/common/persistence" + "go.uber.org/yarpc/yarpcerrors" ) const ( - dropSyncShardTaskTimeThreshold = 10 * time.Minute - replicationTimeout = 30 * time.Second - taskProcessorErrorRetryWait = time.Second - taskProcessorErrorRetryBackoffCoefficient = 1 - taskProcessorErrorRetryMaxAttampts = 5 + dropSyncShardTaskTimeThreshold = 10 * time.Minute + replicationTimeout = 30 * time.Second + taskErrorRetryBackoffCoefficient = 1.2 + dlqErrorRetryWait = time.Second ) var ( @@ -55,18 +54,20 @@ var ( type ( // ReplicationTaskProcessor is responsible for processing replication tasks for a shard. ReplicationTaskProcessor struct { - currentCluster string - sourceCluster string - status int32 - shard ShardContext - historyEngine Engine - domainCache cache.DomainCache - metricsClient metrics.Client - domainReplicator replicator.DomainReplicator - logger log.Logger - - retryPolicy backoff.RetryPolicy - noTaskBackoffRetrier backoff.Retrier + currentCluster string + sourceCluster string + status int32 + shard ShardContext + historyEngine Engine + historySerializer persistence.PayloadSerializer + config *Config + domainCache cache.DomainCache + metricsClient metrics.Client + logger log.Logger + + taskRetryPolicy backoff.RetryPolicy + dlqRetryPolicy backoff.RetryPolicy + noTaskRetrier backoff.Retrier lastProcessedMessageID int64 lastRetrievedMessageID int64 @@ -85,39 +86,36 @@ type ( func NewReplicationTaskProcessor( shard ShardContext, historyEngine Engine, - domainReplicator replicator.DomainReplicator, + config *Config, metricsClient metrics.Client, replicationTaskFetcher *ReplicationTaskFetcher, ) *ReplicationTaskProcessor { - retryPolicy := backoff.NewExponentialRetryPolicy(taskProcessorErrorRetryWait) - retryPolicy.SetBackoffCoefficient(taskProcessorErrorRetryBackoffCoefficient) - retryPolicy.SetMaximumAttempts(taskProcessorErrorRetryMaxAttampts) - - var noTaskBackoffRetrier backoff.Retrier - config := shard.GetClusterMetadata().GetReplicationConsumerConfig().ProcessorConfig - // TODO: add a default noTaskBackoffRetrier? - if config != nil { - noTaskBackoffPolicy := backoff.NewExponentialRetryPolicy(time.Duration(config.NoTaskInitialWaitIntervalSecs) * time.Second) - noTaskBackoffPolicy.SetBackoffCoefficient(config.NoTaskWaitBackoffCoefficient) - noTaskBackoffPolicy.SetMaximumInterval(time.Duration(config.NoTaskMaxWaitIntervalSecs) * time.Second) - noTaskBackoffPolicy.SetExpirationInterval(backoff.NoInterval) - noTaskBackoffRetrier = backoff.NewRetrier(noTaskBackoffPolicy, backoff.SystemClock) - } + taskRetryPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorErrorRetryWait()) + taskRetryPolicy.SetBackoffCoefficient(taskErrorRetryBackoffCoefficient) + taskRetryPolicy.SetMaximumAttempts(config.ReplicationTaskProcessorErrorRetryMaxAttempts()) + + dlqRetryPolicy := backoff.NewExponentialRetryPolicy(dlqErrorRetryWait) + dlqRetryPolicy.SetExpirationInterval(backoff.NoInterval) + + noTaskBackoffPolicy := backoff.NewExponentialRetryPolicy(config.ReplicationTaskProcessorNoTaskRetryWait()) + noTaskBackoffPolicy.SetBackoffCoefficient(1) + noTaskBackoffPolicy.SetExpirationInterval(backoff.NoInterval) + noTaskRetrier := backoff.NewRetrier(noTaskBackoffPolicy, backoff.SystemClock) return &ReplicationTaskProcessor{ - currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), - sourceCluster: replicationTaskFetcher.GetSourceCluster(), - status: common.DaemonStatusInitialized, - shard: shard, - historyEngine: historyEngine, - domainCache: shard.GetDomainCache(), - metricsClient: metricsClient, - domainReplicator: domainReplicator, - logger: shard.GetLogger(), - retryPolicy: retryPolicy, - noTaskBackoffRetrier: noTaskBackoffRetrier, - requestChan: replicationTaskFetcher.GetRequestChan(), - done: make(chan struct{}), + currentCluster: shard.GetClusterMetadata().GetCurrentClusterName(), + sourceCluster: replicationTaskFetcher.GetSourceCluster(), + status: common.DaemonStatusInitialized, + shard: shard, + historyEngine: historyEngine, + historySerializer: persistence.NewPayloadSerializer(), + domainCache: shard.GetDomainCache(), + metricsClient: metricsClient, + logger: shard.GetLogger(), + taskRetryPolicy: taskRetryPolicy, + noTaskRetrier: noTaskRetrier, + requestChan: replicationTaskFetcher.GetRequestChan(), + done: make(chan struct{}), } } @@ -142,7 +140,6 @@ func (p *ReplicationTaskProcessor) Stop() { func (p *ReplicationTaskProcessor) processorLoop() { p.lastProcessedMessageID = p.shard.GetClusterReplicationLevel(p.sourceCluster) - scope := p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope, metrics.TargetClusterTag(p.sourceCluster)) defer func() { p.logger.Info("Closing replication task processor.", tag.ReadLevel(p.lastRetrievedMessageID)) @@ -153,20 +150,12 @@ Loop: // for each iteration, do close check first select { case <-p.done: + p.logger.Info("ReplicationTaskProcessor shutting down.") return default: } - respChan := make(chan *r.ReplicationMessages, 1) - // TODO: when we support prefetching, LastRetrivedMessageId can be different than LastProcessedMessageId - p.requestChan <- &request{ - token: &r.ReplicationToken{ - ShardID: common.Int32Ptr(int32(p.shard.GetShardID())), - LastRetrivedMessageId: common.Int64Ptr(p.lastRetrievedMessageID), - LastProcessedMessageId: common.Int64Ptr(p.lastProcessedMessageID), - }, - respChan: respChan, - } + respChan := p.sendFetchMessageRequest() select { case response, ok := <-respChan: @@ -181,52 +170,74 @@ Loop: tag.Counter(len(response.GetReplicationTasks())), ) - // Note here we check replication tasks instead of hasMore. The expectation is that in a steady state - // we will receive replication tasks but hasMore is false (meaning that we are always catching up). - // So hasMore might not be a good indicator for additional wait. - if len(response.ReplicationTasks) == 0 { - backoffDuration := p.noTaskBackoffRetrier.NextBackOff() - time.Sleep(backoffDuration) - continue - } - - for _, replicationTask := range response.ReplicationTasks { - p.processTask(replicationTask) - } - - p.lastProcessedMessageID = response.GetLastRetrivedMessageId() - p.lastRetrievedMessageID = response.GetLastRetrivedMessageId() - err := p.shard.UpdateClusterReplicationLevel(p.sourceCluster, p.lastRetrievedMessageID) - if err != nil { - p.logger.Error("Error updating replication level for shard", tag.Error(err), tag.OperationFailed) - } - - scope.UpdateGauge(metrics.LastRetrievedMessageID, float64(p.lastRetrievedMessageID)) - p.noTaskBackoffRetrier.Reset() + p.processResponse(response) case <-p.done: + p.logger.Info("ReplicationTaskProcessor shutting down.") return } } } -func (p *ReplicationTaskProcessor) processTask(replicationTask *r.ReplicationTask) { - var err error +func (p *ReplicationTaskProcessor) sendFetchMessageRequest() <-chan *r.ReplicationMessages { + respChan := make(chan *r.ReplicationMessages, 1) + // TODO: when we support prefetching, LastRetrivedMessageId can be different than LastProcessedMessageId + p.requestChan <- &request{ + token: &r.ReplicationToken{ + ShardID: common.Int32Ptr(int32(p.shard.GetShardID())), + LastRetrivedMessageId: common.Int64Ptr(p.lastRetrievedMessageID), + LastProcessedMessageId: common.Int64Ptr(p.lastProcessedMessageID), + }, + respChan: respChan, + } + return respChan +} - for execute := true; execute; execute = err != nil { - err = backoff.Retry(func() error { - return p.processTaskOnce(replicationTask) - }, p.retryPolicy, isTransientRetryableError) +func (p *ReplicationTaskProcessor) processResponse(response *r.ReplicationMessages) { + // Note here we check replication tasks instead of hasMore. The expectation is that in a steady state + // we will receive replication tasks but hasMore is false (meaning that we are always catching up). + // So hasMore might not be a good indicator for additional wait. + if len(response.ReplicationTasks) == 0 { + backoffDuration := p.noTaskRetrier.NextBackOff() + time.Sleep(backoffDuration) + return + } + for _, replicationTask := range response.ReplicationTasks { + err := p.processSingleTask(replicationTask) if err != nil { - // TODO: insert into our own dlq in cadence persistence? - // p.nackMsg(msg, err, logger) - p.logger.Error( - "Failed to apply replication task after retry.", - tag.TaskID(replicationTask.GetSourceTaskId()), - tag.Error(err), - ) + // Processor is shutdown. Exit without updating the checkpoint. + return } } + + p.lastProcessedMessageID = response.GetLastRetrivedMessageId() + p.lastRetrievedMessageID = response.GetLastRetrivedMessageId() + err := p.shard.UpdateClusterReplicationLevel(p.sourceCluster, p.lastRetrievedMessageID) + if err != nil { + p.logger.Error("Error updating replication level for shard", tag.Error(err), tag.OperationFailed) + } + + scope := p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope, metrics.TargetClusterTag(p.sourceCluster)) + scope.UpdateGauge(metrics.LastRetrievedMessageID, float64(p.lastRetrievedMessageID)) + p.noTaskRetrier.Reset() +} + +func (p *ReplicationTaskProcessor) processSingleTask(replicationTask *r.ReplicationTask) error { + err := backoff.Retry(func() error { + return p.processTaskOnce(replicationTask) + }, p.taskRetryPolicy, isTransientRetryableError) + + if err != nil { + p.logger.Error( + "Failed to apply replication task after retry. Putting task into DLQ.", + tag.TaskID(replicationTask.GetSourceTaskId()), + tag.Error(err), + ) + + return p.putReplicationTaskToDLQ(replicationTask) + } + + return nil } func (p *ReplicationTaskProcessor) processTaskOnce(replicationTask *r.ReplicationTask) error { @@ -234,8 +245,8 @@ func (p *ReplicationTaskProcessor) processTaskOnce(replicationTask *r.Replicatio var scope int switch replicationTask.GetTaskType() { case r.ReplicationTaskTypeDomain: - scope = metrics.DomainReplicationTaskScope - err = p.handleDomainReplicationTask(replicationTask) + // Domain replication task should be handled in worker (domainReplicationMessageProcessor) + panic("task type not supported") case r.ReplicationTaskTypeSyncShardStatus: scope = metrics.SyncShardTaskScope err = p.handleSyncShardTask(replicationTask) @@ -269,6 +280,91 @@ func (p *ReplicationTaskProcessor) processTaskOnce(replicationTask *r.Replicatio return err } +func (p *ReplicationTaskProcessor) putReplicationTaskToDLQ(replicationTask *r.ReplicationTask) error { + request, err := p.generateDLQRequest(replicationTask) + if err != nil { + p.logger.Error("Failed to generate DLQ replication task.", tag.Error(err)) + // We cannot deserialize the task. Dropping it. + return nil + } + + // The following is guaranteed to success or retry forever until processor is shutdown. + return backoff.Retry(func() error { + err := p.shard.GetExecutionManager().PutReplicationTaskToDLQ(request) + if err != nil { + p.logger.Error("Failed to put replication task to DLQ.", tag.Error(err)) + } + return err + }, p.dlqRetryPolicy, p.shouldRetryDLQ) +} + +func (p *ReplicationTaskProcessor) generateDLQRequest( + replicationTask *r.ReplicationTask, +) (*persistence.PutReplicationTaskToDLQRequest, error) { + switch *replicationTask.TaskType { + case r.ReplicationTaskTypeSyncActivity: + taskAttributes := replicationTask.GetSyncActicvityTaskAttributes() + return &persistence.PutReplicationTaskToDLQRequest{ + SourceClusterName: p.sourceCluster, + TaskInfo: &persistence.ReplicationTaskInfo{ + DomainID: taskAttributes.GetDomainId(), + WorkflowID: taskAttributes.GetWorkflowId(), + RunID: taskAttributes.GetRunId(), + TaskID: replicationTask.GetSourceTaskId(), + TaskType: persistence.ReplicationTaskTypeSyncActivity, + ScheduledID: taskAttributes.GetScheduledId(), + }, + }, nil + + case r.ReplicationTaskTypeHistory: + taskAttributes := replicationTask.GetHistoryTaskAttributes() + return &persistence.PutReplicationTaskToDLQRequest{ + SourceClusterName: p.sourceCluster, + TaskInfo: &persistence.ReplicationTaskInfo{ + DomainID: taskAttributes.GetDomainId(), + WorkflowID: taskAttributes.GetWorkflowId(), + RunID: taskAttributes.GetRunId(), + TaskID: replicationTask.GetSourceTaskId(), + TaskType: persistence.ReplicationTaskTypeHistory, + FirstEventID: taskAttributes.GetFirstEventId(), + NextEventID: taskAttributes.GetNextEventId(), + Version: taskAttributes.GetVersion(), + LastReplicationInfo: toPersistenceReplicationInfo(taskAttributes.GetReplicationInfo()), + ResetWorkflow: taskAttributes.GetResetWorkflow(), + }, + }, nil + case r.ReplicationTaskTypeHistoryV2: + taskAttributes := replicationTask.GetHistoryTaskV2Attributes() + + eventsDataBlob := persistence.NewDataBlobFromThrift(taskAttributes.GetEvents()) + events, err := p.historySerializer.DeserializeBatchEvents(eventsDataBlob) + if err != nil { + return nil, err + } + + if len(events) == 0 { + p.logger.Error("Empty events in a batch") + return nil, fmt.Errorf("corrupted history event batch, empty events") + } + + return &persistence.PutReplicationTaskToDLQRequest{ + SourceClusterName: p.sourceCluster, + TaskInfo: &persistence.ReplicationTaskInfo{ + DomainID: taskAttributes.GetDomainId(), + WorkflowID: taskAttributes.GetWorkflowId(), + RunID: taskAttributes.GetRunId(), + TaskID: replicationTask.GetSourceTaskId(), + TaskType: persistence.ReplicationTaskTypeHistory, + FirstEventID: events[0].GetEventId(), + NextEventID: events[len(events)-1].GetEventId(), + Version: events[0].GetVersion(), + }, + }, nil + default: + return nil, fmt.Errorf("unknown replication task type") + } +} + func isTransientRetryableError(err error) bool { switch err.(type) { case *shared.BadRequestError: @@ -278,6 +374,34 @@ func isTransientRetryableError(err error) bool { } } +func (p *ReplicationTaskProcessor) shouldRetryDLQ(err error) bool { + if err == nil { + return false + } + + select { + case <-p.done: + p.logger.Info("ReplicationTaskProcessor shutting down.") + return false + default: + return true + } +} + +func toPersistenceReplicationInfo( + info map[string]*shared.ReplicationInfo, +) map[string]*persistence.ReplicationInfo { + replicationInfoMap := make(map[string]*persistence.ReplicationInfo) + for k, v := range info { + replicationInfoMap[k] = &persistence.ReplicationInfo{ + Version: v.GetVersion(), + LastEventID: v.GetLastEventId(), + } + } + + return replicationInfoMap +} + func (p *ReplicationTaskProcessor) updateFailureMetric(scope int, err error) { // Always update failure counter for all replicator errors p.metricsClient.IncCounter(scope, metrics.ReplicatorFailures) @@ -412,17 +536,6 @@ func (p *ReplicationTaskProcessor) handleSyncShardTask( return p.historyEngine.SyncShardStatus(ctx, req) } -func (p *ReplicationTaskProcessor) handleDomainReplicationTask( - task *r.ReplicationTask, -) error { - - p.metricsClient.IncCounter(metrics.DomainReplicationTaskScope, metrics.ReplicatorMessages) - sw := p.metricsClient.StartTimer(metrics.DomainReplicationTaskScope, metrics.ReplicatorLatency) - defer sw.Stop() - - return p.domainReplicator.HandleReceivingTask(task.DomainTaskAttributes) -} - func (p *ReplicationTaskProcessor) filterTask( domainID string, ) (bool, error) { diff --git a/service/history/service.go b/service/history/service.go index 4f9c3d81653..76e040dca74 100644 --- a/service/history/service.go +++ b/service/history/service.go @@ -174,6 +174,15 @@ type Config struct { DecisionHeartbeatTimeout dynamicconfig.DurationPropertyFnWithDomainFilter // MaxDecisionStartToCloseSeconds is the StartToCloseSeconds for decision MaxDecisionStartToCloseSeconds dynamicconfig.IntPropertyFnWithDomainFilter + + // The following is used by the new RPC replication stack + ReplicationTaskFetcherParallelism dynamicconfig.IntPropertyFn + ReplicationTaskFetcherAggregationInterval dynamicconfig.DurationPropertyFn + ReplicationTaskFetcherTimerJitterCoefficient dynamicconfig.FloatPropertyFn + ReplicationTaskFetcherErrorRetryWait dynamicconfig.DurationPropertyFn + ReplicationTaskProcessorErrorRetryWait dynamicconfig.DurationPropertyFn + ReplicationTaskProcessorErrorRetryMaxAttempts dynamicconfig.IntPropertyFn + ReplicationTaskProcessorNoTaskRetryWait dynamicconfig.DurationPropertyFn } const ( @@ -276,6 +285,14 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, storeType strin SearchAttributesTotalSizeLimit: dc.GetIntPropertyFilteredByDomain(dynamicconfig.SearchAttributesTotalSizeLimit, 40*1024), StickyTTL: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.StickyTTL, time.Hour*24*365), DecisionHeartbeatTimeout: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.DecisionHeartbeatTimeout, time.Minute*30), + + ReplicationTaskFetcherParallelism: dc.GetIntProperty(dynamicconfig.ReplicationTaskFetcherParallelism, 1), + ReplicationTaskFetcherAggregationInterval: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherAggregationInterval, 2*time.Second), + ReplicationTaskFetcherTimerJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicationTaskFetcherTimerJitterCoefficient, 0.15), + ReplicationTaskFetcherErrorRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskFetcherErrorRetryWait, time.Second), + ReplicationTaskProcessorErrorRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskProcessorErrorRetryWait, time.Second), + ReplicationTaskProcessorErrorRetryMaxAttempts: dc.GetIntProperty(dynamicconfig.ReplicationTaskProcessorErrorRetryMaxAttempts, 20), + ReplicationTaskProcessorNoTaskRetryWait: dc.GetDurationProperty(dynamicconfig.ReplicationTaskProcessorNoTaskInitialWait, 2*time.Second), } return cfg