Skip to content

Commit

Permalink
History DLQ implementation (cadence-workflow#2699)
Browse files Browse the repository at this point in the history
* Implement DLQ for history replication
* Add integration test for replicationk fetcher, processor and DLQ
* Move configs to dynamic config. Continue to work on integration tests.
  • Loading branch information
meiliang86 authored Oct 29, 2019
1 parent 5945043 commit 0b398c7
Show file tree
Hide file tree
Showing 27 changed files with 1,106 additions and 305 deletions.
6 changes: 0 additions & 6 deletions common/cluster/metadataTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,6 @@ func GetTestClusterMetadata(enableGlobalDomain bool, isMasterCluster bool) Metad
TestAllClusterInfo,
&config.ReplicationConsumerConfig{
Type: config.ReplicationConsumerTypeRPC,
FetcherConfig: &config.FetcherConfig{
RPCParallelism: 1,
AggregationIntervalSecs: 2,
ErrorRetryWaitSecs: 1,
TimerJitterCoefficient: 0.15,
},
},
)
}
Expand Down
12 changes: 9 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,14 +153,18 @@ const (
PersistenceGetCurrentExecutionScope
// PersistenceGetTransferTasksScope tracks GetTransferTasks calls made by service to persistence layer
PersistenceGetTransferTasksScope
// PersistenceGetReplicationTasksScope tracks GetReplicationTasks calls made by service to persistence layer
PersistenceGetReplicationTasksScope
// PersistenceCompleteTransferTaskScope tracks CompleteTransferTasks calls made by service to persistence layer
PersistenceCompleteTransferTaskScope
// PersistenceRangeCompleteTransferTaskScope tracks CompleteTransferTasks calls made by service to persistence layer
PersistenceRangeCompleteTransferTaskScope
// PersistenceGetReplicationTasksScope tracks GetReplicationTasks calls made by service to persistence layer
PersistenceGetReplicationTasksScope
// PersistenceCompleteReplicationTaskScope tracks CompleteReplicationTasks calls made by service to persistence layer
PersistenceCompleteReplicationTaskScope
// PersistencePutReplicationTaskToDLQScope tracks PersistencePutReplicationTaskToDLQScope calls made by service to persistence layer
PersistencePutReplicationTaskToDLQScope
// PersistenceGetReplicationTasksFromDLQScope tracks PersistenceGetReplicationTasksFromDLQScope calls made by service to persistence layer
PersistenceGetReplicationTasksFromDLQScope
// PersistenceGetTimerIndexTasksScope tracks GetTimerIndexTasks calls made by service to persistence layer
PersistenceGetTimerIndexTasksScope
// PersistenceCompleteTimerTaskScope tracks CompleteTimerTasks calls made by service to persistence layer
Expand Down Expand Up @@ -949,10 +953,12 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceDeleteTaskScope: {operation: "PersistenceDelete"},
PersistenceGetCurrentExecutionScope: {operation: "GetCurrentExecution"},
PersistenceGetTransferTasksScope: {operation: "GetTransferTasks"},
PersistenceGetReplicationTasksScope: {operation: "GetReplicationTasks"},
PersistenceCompleteTransferTaskScope: {operation: "CompleteTransferTask"},
PersistenceRangeCompleteTransferTaskScope: {operation: "RangeCompleteTransferTask"},
PersistenceGetReplicationTasksScope: {operation: "GetReplicationTasks"},
PersistenceCompleteReplicationTaskScope: {operation: "CompleteReplicationTask"},
PersistencePutReplicationTaskToDLQScope: {operation: "PersistencePutReplicationTaskToDLQ"},
PersistenceGetReplicationTasksFromDLQScope: {operation: "PersistenceGetReplicationTasksFromDLQ"},
PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"},
PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"},
PersistenceRangeCompleteTimerTaskScope: {operation: "RangeCompleteTimerTask"},
Expand Down
37 changes: 37 additions & 0 deletions common/mocks/ExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,43 @@ func (_m *ExecutionManager) CompleteReplicationTask(request *persistence.Complet
return r0
}

// PutReplicationTaskToDLQ provides a mock function with given fields: request
func (_m *ExecutionManager) PutReplicationTaskToDLQ(request *persistence.PutReplicationTaskToDLQRequest) error {
ret := _m.Called(request)

var r0 error
if rf, ok := ret.Get(0).(func(*persistence.PutReplicationTaskToDLQRequest) error); ok {
r0 = rf(request)
} else {
r0 = ret.Error(0)
}

return r0
}

// GetReplicationTasksFromDLQ provides a mock function with given fields: request
func (_m *ExecutionManager) GetReplicationTasksFromDLQ(request *persistence.GetReplicationTasksFromDLQRequest) (*persistence.GetReplicationTasksFromDLQResponse, error) {
ret := _m.Called(request)

var r0 *persistence.GetReplicationTasksFromDLQResponse
if rf, ok := ret.Get(0).(func(*persistence.GetReplicationTasksFromDLQRequest) *persistence.GetReplicationTasksFromDLQResponse); ok {
r0 = rf(request)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*persistence.GetReplicationTasksFromDLQResponse)
}
}

var r1 error
if rf, ok := ret.Get(1).(func(*persistence.GetReplicationTasksFromDLQRequest) error); ok {
r1 = rf(request)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// GetTimerIndexTasks provides a mock function with given fields: request
func (_m *ExecutionManager) GetTimerIndexTasks(request *persistence.GetTimerIndexTasksRequest) (*persistence.GetTimerIndexTasksResponse, error) {
ret := _m.Called(request)
Expand Down
76 changes: 74 additions & 2 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const (
rowTypeReplicationDomainID = "10000000-5000-f000-f000-000000000000"
rowTypeReplicationWorkflowID = "20000000-5000-f000-f000-000000000000"
rowTypeReplicationRunID = "30000000-5000-f000-f000-000000000000"
// Row Constants for Replication Task DLQ Row. Source cluster name will be used as WorkflowID.
rowTypeDLQDomainID = "10000000-6000-f000-f000-000000000000"
rowTypeDLQRunID = "30000000-6000-f000-f000-000000000000"
// Special TaskId constants
rowTypeExecutionTaskID = int64(-10)
rowTypeShardTaskID = int64(-11)
Expand All @@ -82,6 +85,7 @@ const (
rowTypeTransferTask
rowTypeTimerTask
rowTypeReplicationTask
rowTypeDLQ
)

const (
Expand Down Expand Up @@ -878,8 +882,11 @@ func newShardPersistence(cfg config.Cassandra, clusterName string, logger log.Lo
}

// NewWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation
func NewWorkflowExecutionPersistence(shardID int, session *gocql.Session,
logger log.Logger) (p.ExecutionStore, error) {
func NewWorkflowExecutionPersistence(
shardID int,
session *gocql.Session,
logger log.Logger,
) (p.ExecutionStore, error) {
return &cassandraPersistence{cassandraStore: cassandraStore{session: session, logger: logger}, shardID: shardID}, nil
}

Expand Down Expand Up @@ -2089,6 +2096,12 @@ func (d *cassandraPersistence) GetReplicationTasks(
request.MaxReadLevel,
).PageSize(request.BatchSize).PageState(request.NextPageToken)

return d.populateGetReplicationTasksResponse(query)
}

func (d *cassandraPersistence) populateGetReplicationTasksResponse(
query *gocql.Query,
) (*p.GetReplicationTasksResponse, error) {
iter := query.Iter()
if iter == nil {
return nil, &workflow.InternalServiceError{
Expand Down Expand Up @@ -2691,3 +2704,62 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *p.GetTimerIndexTasksR

return response, nil
}

func (d *cassandraPersistence) PutReplicationTaskToDLQ(request *p.PutReplicationTaskToDLQRequest) error {
task := request.TaskInfo
query := d.session.Query(templateCreateReplicationTaskQuery,
d.shardID,
rowTypeDLQ,
rowTypeDLQDomainID,
request.SourceClusterName,
rowTypeDLQRunID,
task.DomainID,
task.WorkflowID,
task.RunID,
task.TaskID,
task.TaskType,
task.FirstEventID,
task.NextEventID,
task.Version,
task.LastReplicationInfo,
task.ScheduledID,
defaultEventStoreVersionValue,
task.BranchToken,
task.ResetWorkflow,
defaultEventStoreVersionValue,
task.NewRunBranchToken,
defaultVisibilityTimestamp,
task.GetTaskID())

err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("PutReplicationTaskToDLQ operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("PutReplicationTaskToDLQ operation failed. Error: %v", err),
}
}

return nil
}

func (d *cassandraPersistence) GetReplicationTasksFromDLQ(
request *p.GetReplicationTasksFromDLQRequest,
) (*p.GetReplicationTasksFromDLQResponse, error) {
// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
query := d.session.Query(templateGetReplicationTasksQuery,
d.shardID,
rowTypeDLQ,
rowTypeDLQDomainID,
request.SourceClusterName,
rowTypeDLQRunID,
defaultVisibilityTimestamp,
request.ReadLevel,
request.ReadLevel+int64(request.BatchSize),
).PageSize(request.BatchSize).PageState(request.NextPageToken)

return d.populateGetReplicationTasksResponse(query)
}
36 changes: 36 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,21 @@ type (
TaskID int64
}

// PutReplicationTaskToDLQRequest is used to put a replication task to dlq
PutReplicationTaskToDLQRequest struct {
SourceClusterName string
TaskInfo *ReplicationTaskInfo
}

// GetReplicationTasksFromDLQRequest is used to get replication tasks from dlq
GetReplicationTasksFromDLQRequest struct {
SourceClusterName string
GetReplicationTasksRequest
}

// GetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ
GetReplicationTasksFromDLQResponse = GetReplicationTasksResponse

// RangeCompleteTimerTaskRequest is used to complete a range of tasks in the timer task queue
RangeCompleteTimerTaskRequest struct {
InclusiveBeginTimestamp time.Time
Expand Down Expand Up @@ -1434,6 +1449,8 @@ type (
// Replication task related methods
GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error)
CompleteReplicationTask(request *CompleteReplicationTaskRequest) error
PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)

// Timer related methods.
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
Expand Down Expand Up @@ -2425,3 +2442,22 @@ func SplitHistoryGarbageCleanupInfo(info string) (domainID, workflowID, runID st
workflowID = info[len(domainID)+1 : workflowEnd]
return
}

// NewGetReplicationTasksFromDLQRequest creates a new GetReplicationTasksFromDLQRequest
func NewGetReplicationTasksFromDLQRequest(
sourceClusterName string,
readLevel int64,
maxReadLevel int64,
batchSize int,
nextPageToken []byte,
) *GetReplicationTasksFromDLQRequest {
return &GetReplicationTasksFromDLQRequest{
SourceClusterName: sourceClusterName,
GetReplicationTasksRequest: GetReplicationTasksRequest{
ReadLevel: readLevel,
MaxReadLevel: maxReadLevel,
BatchSize: batchSize,
NextPageToken: nextPageToken,
},
}
}
12 changes: 12 additions & 0 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,18 @@ func (m *executionManagerImpl) CompleteReplicationTask(
return m.persistence.CompleteReplicationTask(request)
}

func (m *executionManagerImpl) PutReplicationTaskToDLQ(
request *PutReplicationTaskToDLQRequest,
) error {
return m.persistence.PutReplicationTaskToDLQ(request)
}

func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
request *GetReplicationTasksFromDLQRequest,
) (*GetReplicationTasksFromDLQResponse, error) {
return m.persistence.GetReplicationTasksFromDLQ(request)
}

// Timer related methods.
func (m *executionManagerImpl) GetTimerIndexTasks(
request *GetTimerIndexTasksRequest,
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type (
// Replication task related methods
GetReplicationTasks(request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error)
CompleteReplicationTask(request *CompleteReplicationTaskRequest) error
PutReplicationTaskToDLQ(request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)

// Timer related methods.
GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error)
Expand Down
36 changes: 34 additions & 2 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,18 +400,50 @@ func (p *workflowExecutionPersistenceClient) CompleteReplicationTask(request *Co
return err
}

func (p *workflowExecutionPersistenceClient) PutReplicationTaskToDLQ(
request *PutReplicationTaskToDLQRequest,
) error {
p.metricClient.IncCounter(metrics.PersistencePutReplicationTaskToDLQScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistencePutReplicationTaskToDLQScope, metrics.PersistenceLatency)
err := p.persistence.PutReplicationTaskToDLQ(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistencePutReplicationTaskToDLQScope, err)
}

return err
}

func (p *workflowExecutionPersistenceClient) GetReplicationTasksFromDLQ(
request *GetReplicationTasksFromDLQRequest,
) (*GetReplicationTasksFromDLQResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetReplicationTasksFromDLQScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceGetReplicationTasksFromDLQScope, metrics.PersistenceLatency)
response, err := p.persistence.GetReplicationTasksFromDLQ(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceGetReplicationTasksFromDLQScope, err)
}

return response, err
}

func (p *workflowExecutionPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
p.metricClient.IncCounter(metrics.PersistenceGetTimerIndexTasksScope, metrics.PersistenceRequests)

sw := p.metricClient.StartTimer(metrics.PersistenceGetTimerIndexTasksScope, metrics.PersistenceLatency)
resonse, err := p.persistence.GetTimerIndexTasks(request)
response, err := p.persistence.GetTimerIndexTasks(request)
sw.Stop()

if err != nil {
p.updateErrorMetric(metrics.PersistenceGetTimerIndexTasksScope, err)
}

return resonse, err
return response, err
}

func (p *workflowExecutionPersistenceClient) CompleteTimerTask(request *CompleteTimerTaskRequest) error {
Expand Down
20 changes: 20 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,26 @@ func (p *workflowExecutionRateLimitedPersistenceClient) CompleteReplicationTask(
return err
}

func (p *workflowExecutionRateLimitedPersistenceClient) PutReplicationTaskToDLQ(
request *PutReplicationTaskToDLQRequest,
) error {
if ok := p.rateLimiter.Allow(); !ok {
return ErrPersistenceLimitExceeded
}

return p.persistence.PutReplicationTaskToDLQ(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetReplicationTasksFromDLQ(
request *GetReplicationTasksFromDLQRequest,
) (*GetReplicationTasksFromDLQResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
}

return p.persistence.GetReplicationTasksFromDLQ(request)
}

func (p *workflowExecutionRateLimitedPersistenceClient) GetTimerIndexTasks(request *GetTimerIndexTasksRequest) (*GetTimerIndexTasksResponse, error) {
if ok := p.rateLimiter.Allow(); !ok {
return nil, ErrPersistenceLimitExceeded
Expand Down
Loading

0 comments on commit 0b398c7

Please sign in to comment.