Skip to content

Commit

Permalink
Delete uninitialized workflow execution record if workflow failed to …
Browse files Browse the repository at this point in the history
…start (cadence-workflow#5059)

* Add method to verify uninitialized workflow execution record then delete from ES 
* Delete uninitialized workflow execution record if workflow failed to add start event
  • Loading branch information
neil-xie authored Jan 6, 2023
1 parent 747b58e commit 933fb08
Show file tree
Hide file tree
Showing 15 changed files with 253 additions and 89 deletions.
182 changes: 94 additions & 88 deletions common/metrics/defs.go

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions common/mocks/VisibilityManager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/persistence/dataStoreInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type (
ListWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
ScanWorkflowExecutions(ctx context.Context, request *ListWorkflowExecutionsByQueryRequest) (*InternalListWorkflowExecutionsResponse, error)
CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error)
DeleteUninitializedWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
}

ConfigStore interface {
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataVisibilityManagerInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ type (
CountWorkflowExecutions(ctx context.Context, request *CountWorkflowExecutionsRequest) (*CountWorkflowExecutionsResponse, error)
// NOTE: GetClosedWorkflowExecution is only for persistence testing, currently no index is supported for filtering by RunID
GetClosedWorkflowExecution(ctx context.Context, request *GetClosedWorkflowExecutionRequest) (*GetClosedWorkflowExecutionResponse, error)
DeleteUninitializedWorkflowExecution(ctx context.Context, request *VisibilityDeleteWorkflowExecutionRequest) error
}
)

Expand Down
18 changes: 18 additions & 0 deletions common/persistence/elasticsearch/esVisibilityMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,24 @@ func (p *visibilityMetricsClient) DeleteWorkflowExecution(
return err
}

func (p *visibilityMetricsClient) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *p.VisibilityDeleteWorkflowExecutionRequest,
) error {

scopeWithDomainTag := p.metricClient.Scope(metrics.ElasticsearchDeleteWorkflowExecutionsScope, metrics.DomainTag(request.Domain))
scopeWithDomainTag.IncCounter(metrics.ElasticsearchRequestsPerDomain)
sw := scopeWithDomainTag.StartTimer(metrics.ElasticsearchLatencyPerDomain)
err := p.persistence.DeleteWorkflowExecution(ctx, request)
sw.Stop()

if err != nil {
p.updateErrorMetric(scopeWithDomainTag, metrics.ElasticsearchDeleteWorkflowExecutionsScope, err)
}

return err
}

func (p *visibilityMetricsClient) updateErrorMetric(scopeWithDomainTag metrics.Scope, scope int, err error) {

switch err.(type) {
Expand Down
23 changes: 23 additions & 0 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,29 @@ func (v *esVisibilityStore) DeleteWorkflowExecution(
return v.producer.Publish(ctx, msg)
}

func (v *esVisibilityStore) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *p.VisibilityDeleteWorkflowExecutionRequest,
) error {
// verify if it is uninitialized workflow execution record
// if it is, then call the existing delete method to delete
query := fmt.Sprintf("StartTime = missing and DomainID = %s and RunID = %s", request.DomainID, request.RunID)
queryRequest := &p.CountWorkflowExecutionsRequest{
Domain: request.Domain,
Query: query,
}
resp, err := v.CountWorkflowExecutions(ctx, queryRequest)
if err != nil {
return err
}
if resp.Count > 0 {
if err = v.DeleteWorkflowExecution(ctx, request); err != nil {
return err
}
}
return nil
}

func (v *esVisibilityStore) ListWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsByQueryRequest,
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/nosql/nosqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,14 @@ func (v *nosqlVisibilityStore) DeleteWorkflowExecution(
return nil
}

func (v *nosqlVisibilityStore) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *p.VisibilityDeleteWorkflowExecutionRequest,
) error {
// temporary: not implemented, only implemented for ES
return nil
}

func (v *nosqlVisibilityStore) ListWorkflowExecutions(
_ context.Context,
_ *p.ListWorkflowExecutionsByQueryRequest,
Expand Down
24 changes: 24 additions & 0 deletions common/persistence/persistenceErrorInjectionClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,30 @@ func (p *visibilityErrorInjectionPersistenceClient) DeleteWorkflowExecution(
return persistenceErr
}

func (p *visibilityErrorInjectionPersistenceClient) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *VisibilityDeleteWorkflowExecutionRequest,
) error {
fakeErr := generateFakeError(p.errorRate)

var persistenceErr error
var forwardCall bool
if forwardCall = shouldForwardCallToPersistence(fakeErr); forwardCall {
persistenceErr = p.persistence.DeleteUninitializedWorkflowExecution(ctx, request)
}

if fakeErr != nil {
p.logger.Error(msgInjectedFakeErr,
tag.StoreOperationVisibilityDeleteWorkflowExecution,
tag.Error(fakeErr),
tag.Bool(forwardCall),
tag.StoreError(persistenceErr),
)
return fakeErr
}
return persistenceErr
}

func (p *visibilityErrorInjectionPersistenceClient) ListWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsByQueryRequest,
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -1254,6 +1254,16 @@ func (p *visibilityPersistenceClient) DeleteWorkflowExecution(
return p.call(metrics.PersistenceVisibilityDeleteWorkflowExecutionScope, op)
}

func (p *visibilityPersistenceClient) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *VisibilityDeleteWorkflowExecutionRequest,
) error {
op := func() error {
return p.persistence.DeleteUninitializedWorkflowExecution(ctx, request)
}
return p.call(metrics.PersistenceVisibilityDeleteUninitializedWorkflowExecutionScope, op)
}

func (p *visibilityPersistenceClient) ListWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsByQueryRequest,
Expand Down
10 changes: 10 additions & 0 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,16 @@ func (p *visibilityRateLimitedPersistenceClient) DeleteWorkflowExecution(
return p.persistence.DeleteWorkflowExecution(ctx, request)
}

func (p *visibilityRateLimitedPersistenceClient) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *VisibilityDeleteWorkflowExecutionRequest,
) error {
if ok := p.rateLimiter.Allow(); !ok {
return ErrPersistenceLimitExceeded
}
return p.persistence.DeleteUninitializedWorkflowExecution(ctx, request)
}

func (p *visibilityRateLimitedPersistenceClient) ListWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsByQueryRequest,
Expand Down
8 changes: 8 additions & 0 deletions common/persistence/sql/sqlVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,14 @@ func (s *sqlVisibilityStore) DeleteWorkflowExecution(
return nil
}

func (s *sqlVisibilityStore) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *p.VisibilityDeleteWorkflowExecutionRequest,
) error {
// temporary: not implemented, only implemented for ES
return nil
}

func (s *sqlVisibilityStore) ListWorkflowExecutions(
_ context.Context,
_ *p.ListWorkflowExecutionsByQueryRequest,
Expand Down
15 changes: 15 additions & 0 deletions common/persistence/visibilityDualManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,21 @@ func (v *visibilityDualManager) DeleteWorkflowExecution(
)
}

func (v *visibilityDualManager) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *VisibilityDeleteWorkflowExecutionRequest,
) error {
return v.chooseVisibilityManagerForWrite(
ctx,
func() error {
return v.dbVisibilityManager.DeleteUninitializedWorkflowExecution(ctx, request)
},
func() error {
return v.esVisibilityManager.DeleteUninitializedWorkflowExecution(ctx, request)
},
)
}

func (v *visibilityDualManager) UpsertWorkflowExecution(
ctx context.Context,
request *UpsertWorkflowExecutionRequest,
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/visibilitySamplingClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,13 @@ func (p *visibilitySamplingClient) DeleteWorkflowExecution(
return p.persistence.DeleteWorkflowExecution(ctx, request)
}

func (p *visibilitySamplingClient) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *VisibilityDeleteWorkflowExecutionRequest,
) error {
return p.persistence.DeleteUninitializedWorkflowExecution(ctx, request)
}

func (p *visibilitySamplingClient) ListWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsByQueryRequest,
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/visibilitySingleManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,13 @@ func (v *visibilityManagerImpl) DeleteWorkflowExecution(
return v.persistence.DeleteWorkflowExecution(ctx, request)
}

func (v *visibilityManagerImpl) DeleteUninitializedWorkflowExecution(
ctx context.Context,
request *VisibilityDeleteWorkflowExecutionRequest,
) error {
return v.persistence.DeleteUninitializedWorkflowExecution(ctx, request)
}

func (v *visibilityManagerImpl) ListWorkflowExecutions(
ctx context.Context,
request *ListWorkflowExecutionsByQueryRequest,
Expand Down
14 changes: 13 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,21 @@ func (e *historyEngineImpl) startWorkflowHelper(
signalWithStartRequest,
)
if err != nil {
if e.shard.GetConfig().EnableRecordWorkflowExecutionUninitialized(domainEntry.GetInfo().Name) && e.visibilityMgr != nil {
//delete the uninitialized workflow execution record since it failed to start the workflow
//uninitialized record is used to find wfs that didn't make a progress or stuck during the start process
if errVisibility := e.visibilityMgr.DeleteWorkflowExecution(ctx, &persistence.VisibilityDeleteWorkflowExecutionRequest{
DomainID: domainID,
Domain: domain,
RunID: workflowExecution.RunID,
WorkflowID: workflowID,
}); errVisibility != nil {
e.logger.Error("Failed to delete uninitialized workflow execution record", tag.Error(errVisibility))
}
}

return nil, err
}

wfContext := execution.NewContext(domainID, workflowExecution, e.shard, e.executionManager, e.logger)

newWorkflow, newWorkflowEventsSeq, err := curMutableState.CloseTransactionAsSnapshot(
Expand Down

0 comments on commit 933fb08

Please sign in to comment.