Skip to content

Commit

Permalink
Refactor archival workflow (cadence-workflow#2583)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Sep 24, 2019
1 parent c6f99b2 commit d91a7bf
Show file tree
Hide file tree
Showing 19 changed files with 488 additions and 297 deletions.
4 changes: 2 additions & 2 deletions common/archiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ ArchiverOptions is used to handle this. The following shows and example:
func (a *Archiver) Archive(
ctx context.Context,
URI string,
request *ArchiveRequest,
request *ArchiveHistoryRequest,
opts ...ArchiveOption,
) error {
featureCatalog := GetFeatureCatalog(opts...) // this function is defined in options.go
Expand Down Expand Up @@ -99,7 +99,7 @@ func (a *Archiver) Archive(
func (a *Archiver) Archive(
ctx context.Context,
URI string,
request *ArchiveRequest,
request *ArchiveHistoryRequest,
opts ...ArchiveOption,
) error {
featureCatalog := GetFeatureCatalog(opts...) // this function is defined in options.go
Expand Down
18 changes: 9 additions & 9 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -882,12 +882,12 @@ const (
ArchiverDeleteHistoryActivityScope
// ArchiverUploadHistoryActivityScope is scope used by all metrics emitted by archiver.UploadHistoryActivity
ArchiverUploadHistoryActivityScope
// ArchiverScope is scope used by all metrics emitted by archiver.Archiver
ArchiverScope
// ArchiverPumpScope is scope used by all metrics emitted by archiver.Pump
ArchiverPumpScope
// ArchiverArchivalWorkflowScope is scope used by all metrics emitted by archiver.ArchivalWorkflow
ArchiverArchivalWorkflowScope
// HistoryArchivalHandlerScope is scope used by all metrics emitted by archiver.Handler when archiving history
HistoryArchivalHandlerScope
// HistoryArchivalPumpScope is scope used by all metrics emitted by archiver.Pump when archiving history
HistoryArchivalPumpScope
// HistoryArchivalWorkflowScope is scope used by all metrics emitted by archiver.ArchivalHistoryWorkflow
HistoryArchivalWorkflowScope
// TaskListScavengerScope is scope used by all metrics emitted by worker.tasklist.Scavenger module
TaskListScavengerScope
// BatcherScope is scope used by all metrics emitted by worker.Batcher module
Expand Down Expand Up @@ -1282,9 +1282,9 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
IndexProcessorScope: {operation: "IndexProcessor"},
ArchiverDeleteHistoryActivityScope: {operation: "ArchiverDeleteHistoryActivity"},
ArchiverUploadHistoryActivityScope: {operation: "ArchiverUploadHistoryActivity"},
ArchiverScope: {operation: "Archiver"},
ArchiverPumpScope: {operation: "ArchiverPump"},
ArchiverArchivalWorkflowScope: {operation: "ArchiverArchivalWorkflow"},
HistoryArchivalHandlerScope: {operation: "HistoryArchivalHandler"},
HistoryArchivalPumpScope: {operation: "HistoryArchivalPump"},
HistoryArchivalWorkflowScope: {operation: "HistoryArchivalWorkflow"},
TaskListScavengerScope: {operation: "tasklistscavenger"},
HistoryScavengerScope: {operation: "historyscavenger"},
BatcherScope: {operation: "batcher"},
Expand Down
6 changes: 3 additions & 3 deletions service/history/timerQueueProcessorBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func (t *timerQueueProcessorBase) archiveWorkflow(
attemptArchiveInline := executionStats.HistorySize < int64(t.config.TimerProcessorHistoryArchivalSizeLimit())
ctx, cancel := context.WithTimeout(context.Background(), t.config.TimerProcessorHistoryArchivalTimeLimit())
defer cancel()
req := &archiver.ClientRequest{
ArchiveRequest: &archiver.ArchiveRequest{
req := &archiver.ClientHistoryRequest{
ArchiveHistoryRequest: &archiver.ArchiveHistoryRequest{
ShardID: t.shard.GetShardID(),
DomainID: task.DomainID,
DomainName: domainCacheEntry.GetInfo().Name,
Expand All @@ -497,7 +497,7 @@ func (t *timerQueueProcessorBase) archiveWorkflow(
CallerService: common.HistoryServiceName,
AttemptArchiveInline: attemptArchiveInline,
}
resp, err := t.historyService.archivalClient.Archive(ctx, req)
resp, err := t.historyService.archivalClient.ArchiveHistory(ctx, req)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions service/history/timerQueueProcessorBase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func (s *timerQueueProcessorBaseSuite) TestArchiveWorkflow_NoErr_InlineArchivalF
s.mockExecutionManager.On("DeleteWorkflowExecution", mock.Anything).Return(nil).Once()
s.mockVisibilityManager.On("DeleteWorkflowExecution", mock.Anything).Return(nil).Once()

s.mockArchivalClient.On("Archive", mock.Anything, mock.MatchedBy(func(req *archiver.ClientRequest) bool {
s.mockArchivalClient.On("ArchiveHistory", mock.Anything, mock.MatchedBy(func(req *archiver.ClientHistoryRequest) bool {
return req.CallerService == common.HistoryServiceName && req.AttemptArchiveInline
})).Return(&archiver.ClientResponse{
ArchivedInline: false,
Expand All @@ -211,7 +211,7 @@ func (s *timerQueueProcessorBaseSuite) TestArchiveWorkflow_SendSignalErr() {
mockMutableState.On("GetLastWriteVersion").Return(int64(1234)).Once()
mockMutableState.On("GetNextEventID").Return(int64(101)).Once()

s.mockArchivalClient.On("Archive", mock.Anything, mock.MatchedBy(func(req *archiver.ClientRequest) bool {
s.mockArchivalClient.On("ArchiveHistory", mock.Anything, mock.MatchedBy(func(req *archiver.ClientHistoryRequest) bool {
return req.CallerService == common.HistoryServiceName && !req.AttemptArchiveInline
})).Return(nil, errors.New("failed to send signal"))

Expand Down
8 changes: 4 additions & 4 deletions service/worker/archiver/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var (
deleteHistoryActivityNonRetryableErrors = []string{"cadenceInternal:Panic", errDeleteNonRetriable.Error()}
)

func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err error) {
func uploadHistoryActivity(ctx context.Context, request ArchiveHistoryRequest) (err error) {
container := ctx.Value(bootstrapContainerKey).(*BootstrapContainer)
scope := container.MetricsClient.Scope(metrics.ArchiverUploadHistoryActivityScope, metrics.DomainTag(request.DomainName))
sw := scope.StartTimer(metrics.CadenceLatency)
Expand All @@ -60,7 +60,7 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
err = cadence.NewCustomError(err.Error())
}
}()
logger := tagLoggerWithRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), request)
logger := tagLoggerWithArchiveHistoryRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), request)
URI, err := carchiver.NewURI(request.URI)
if err != nil {
logger.Error(carchiver.ArchiveNonRetriableErrorMsg, tag.ArchivalArchiveFailReason("failed to get archival uri"), tag.ArchivalURI(request.URI), tag.Error(err))
Expand Down Expand Up @@ -93,7 +93,7 @@ func uploadHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
return err
}

func deleteHistoryActivity(ctx context.Context, request ArchiveRequest) (err error) {
func deleteHistoryActivity(ctx context.Context, request ArchiveHistoryRequest) (err error) {
container := ctx.Value(bootstrapContainerKey).(*BootstrapContainer)
scope := container.MetricsClient.Scope(metrics.ArchiverDeleteHistoryActivityScope, metrics.DomainTag(request.DomainName))
sw := scope.StartTimer(metrics.CadenceLatency)
Expand All @@ -106,7 +106,7 @@ func deleteHistoryActivity(ctx context.Context, request ArchiveRequest) (err err
err = cadence.NewCustomError(err.Error())
}
}()
logger := tagLoggerWithRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), request)
logger := tagLoggerWithArchiveHistoryRequest(tagLoggerWithActivityInfo(container.Logger, activity.GetInfo(ctx)), request)
if request.EventStoreVersion == persistence.EventStoreVersionV2 {
err = persistence.DeleteWorkflowExecutionHistoryV2(container.HistoryV2Manager, request.BranchToken, common.IntPtr(request.ShardID), container.Logger)
if err == nil {
Expand Down
16 changes: 8 additions & 8 deletions service/worker/archiver/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (s *activitiesSuite) TestUploadHistory_Fail_InvalidURI() {
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container),
})
request := ArchiveRequest{
request := ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
Expand Down Expand Up @@ -132,7 +132,7 @@ func (s *activitiesSuite) TestUploadHistory_Fail_GetArchiverError() {
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container),
})
request := ArchiveRequest{
request := ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s *activitiesSuite) TestUploadHistory_Fail_ArchiveNonRetriableError() {
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container),
})
request := ArchiveRequest{
request := ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
Expand Down Expand Up @@ -190,7 +190,7 @@ func (s *activitiesSuite) TestUploadHistory_Fail_ArchiveRetriableError() {
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container),
})
request := ArchiveRequest{
request := ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
Expand Down Expand Up @@ -218,7 +218,7 @@ func (s *activitiesSuite) TestUploadHistory_Success() {
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container),
})
request := ArchiveRequest{
request := ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
Expand Down Expand Up @@ -247,7 +247,7 @@ func (s *activitiesSuite) TestDeleteHistoryActivity_Fail_DeleteFromV2NonRetryabl
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container),
})
request := ArchiveRequest{
request := ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
Expand Down Expand Up @@ -276,7 +276,7 @@ func (s *activitiesSuite) TestDeleteHistoryActivity_Fail_DeleteFromV1NonRetryabl
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(context.Background(), bootstrapContainerKey, container),
})
request := ArchiveRequest{
request := ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
Expand All @@ -303,7 +303,7 @@ func (s *activitiesSuite) TestDeleteHistoryActivity_Success() {
env.SetWorkerOptions(worker.Options{
BackgroundActivityContext: context.WithValue(getCanceledContext(), bootstrapContainerKey, container),
})
request := ArchiveRequest{
request := ArchiveHistoryRequest{
DomainID: testDomainID,
DomainName: testDomainName,
WorkflowID: testWorkflowID,
Expand Down
54 changes: 27 additions & 27 deletions service/worker/archiver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,20 @@ import (
)

type (
// ClientRequest is the archive request sent to the archiver client
ClientRequest struct {
ArchiveRequest *ArchiveRequest
CallerService string
AttemptArchiveInline bool
// ClientHistoryRequest is the archive request sent to the archiver client
ClientHistoryRequest struct {
ArchiveHistoryRequest *ArchiveHistoryRequest
CallerService string
AttemptArchiveInline bool
}

// ClientResponse is the archive response returned from the archiver client
ClientResponse struct {
ArchivedInline bool
}

// ArchiveRequest is the request signal sent to the archiver workflow
ArchiveRequest struct {
// ArchiveHistoryRequest is the archival history request signal sent to the archiver workflow
ArchiveHistoryRequest struct {
ShardID int
DomainID string
DomainName string
Expand All @@ -68,7 +68,7 @@ type (

// Client is used to archive workflow histories
Client interface {
Archive(context.Context, *ClientRequest) (*ClientResponse, error)
ArchiveHistory(context.Context, *ClientHistoryRequest) (*ClientResponse, error)
}

client struct {
Expand Down Expand Up @@ -110,10 +110,10 @@ func NewClient(
}
}

// Archive starts an archival task
func (c *client) Archive(ctx context.Context, request *ClientRequest) (resp *ClientResponse, err error) {
// ArchiveHistory starts an archival task
func (c *client) ArchiveHistory(ctx context.Context, request *ClientHistoryRequest) (resp *ClientResponse, err error) {
c.metricsClient.IncCounter(metrics.ArchiverClientScope, metrics.CadenceRequests)
taggedLogger := tagLoggerWithRequest(c.logger, *request.ArchiveRequest).WithTags(
taggedLogger := tagLoggerWithArchiveHistoryRequest(c.logger, *request.ArchiveHistoryRequest).WithTags(
tag.ArchivalCallerServiceName(request.CallerService),
tag.ArchivalArchiveAttemptedInline(request.AttemptArchiveInline),
)
Expand All @@ -128,27 +128,27 @@ func (c *client) Archive(ctx context.Context, request *ClientRequest) (resp *Cli
}
}()
if request.AttemptArchiveInline {
err = c.archiveInline(ctx, request, taggedLogger)
err = c.archiveHistoryInline(ctx, request, taggedLogger)
if err != nil {
err = c.sendArchiveSignal(ctx, request.ArchiveRequest, taggedLogger)
err = c.sendArchiveHistorySignal(ctx, request.ArchiveHistoryRequest, taggedLogger)
return
}
archivedInline = true
return
}
err = c.sendArchiveSignal(ctx, request.ArchiveRequest, taggedLogger)
err = c.sendArchiveHistorySignal(ctx, request.ArchiveHistoryRequest, taggedLogger)
return
}

func (c *client) archiveInline(ctx context.Context, request *ClientRequest, taggedLogger log.Logger) (err error) {
func (c *client) archiveHistoryInline(ctx context.Context, request *ClientHistoryRequest, taggedLogger log.Logger) (err error) {
defer func() {
if err != nil {
c.metricsClient.IncCounter(metrics.ArchiverClientScope, metrics.ArchiverClientInlineArchiveFailureCount)
taggedLogger.Info("failed to perform workflow history archival inline", tag.Error(err))
}
}()
c.metricsClient.IncCounter(metrics.ArchiverClientScope, metrics.ArchiverClientInlineArchiveAttemptCount)
URI, err := carchiver.NewURI(request.ArchiveRequest.URI)
URI, err := carchiver.NewURI(request.ArchiveHistoryRequest.URI)
if err != nil {
return err
}
Expand All @@ -159,19 +159,19 @@ func (c *client) archiveInline(ctx context.Context, request *ClientRequest, tagg
}

return historyArchiver.Archive(ctx, URI, &carchiver.ArchiveHistoryRequest{
ShardID: request.ArchiveRequest.ShardID,
DomainID: request.ArchiveRequest.DomainID,
DomainName: request.ArchiveRequest.DomainName,
WorkflowID: request.ArchiveRequest.WorkflowID,
RunID: request.ArchiveRequest.RunID,
EventStoreVersion: request.ArchiveRequest.EventStoreVersion,
BranchToken: request.ArchiveRequest.BranchToken,
NextEventID: request.ArchiveRequest.NextEventID,
CloseFailoverVersion: request.ArchiveRequest.CloseFailoverVersion,
ShardID: request.ArchiveHistoryRequest.ShardID,
DomainID: request.ArchiveHistoryRequest.DomainID,
DomainName: request.ArchiveHistoryRequest.DomainName,
WorkflowID: request.ArchiveHistoryRequest.WorkflowID,
RunID: request.ArchiveHistoryRequest.RunID,
EventStoreVersion: request.ArchiveHistoryRequest.EventStoreVersion,
BranchToken: request.ArchiveHistoryRequest.BranchToken,
NextEventID: request.ArchiveHistoryRequest.NextEventID,
CloseFailoverVersion: request.ArchiveHistoryRequest.CloseFailoverVersion,
})
}

func (c *client) sendArchiveSignal(ctx context.Context, request *ArchiveRequest, taggedLogger log.Logger) error {
func (c *client) sendArchiveHistorySignal(ctx context.Context, request *ArchiveHistoryRequest, taggedLogger log.Logger) error {
if ok := c.rateLimiter.Allow(); !ok {
c.logger.Error(tooManyRequestsErrMsg)
c.metricsClient.IncCounter(metrics.ArchiverClientScope, metrics.CadenceErrServiceBusyCounter)
Expand All @@ -188,7 +188,7 @@ func (c *client) sendArchiveSignal(ctx context.Context, request *ArchiveRequest,
}
signalCtx, cancel := context.WithTimeout(context.Background(), signalTimeout)
defer cancel()
_, err := c.cadenceClient.SignalWithStartWorkflow(signalCtx, workflowID, signalName, *request, workflowOptions, archivalWorkflowFnName, nil)
_, err := c.cadenceClient.SignalWithStartWorkflow(signalCtx, workflowID, archiveHistorySignalName, *request, workflowOptions, archiveHistoryWorkflowFnName, nil)
if err != nil {
taggedLogger = taggedLogger.WithTags(tag.WorkflowID(workflowID), tag.Error(err))
taggedLogger.Error("failed to send signal to archival system workflow")
Expand Down
10 changes: 5 additions & 5 deletions service/worker/archiver/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions service/worker/archiver/client_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type (
const (
workflowIDPrefix = "cadence-archival"
decisionTaskList = "cadence-archival-tl"
signalName = "cadence-archival-signal"
archivalWorkflowFnName = "archivalWorkflow"
archiveHistorySignalName = "cadence-archival-signal"
archiveHistoryWorkflowFnName = "archivalWorkflow"
workflowStartToCloseTimeout = time.Hour * 24 * 30
workflowTaskStartToCloseTimeout = time.Minute

Expand All @@ -91,7 +91,7 @@ var (
)

func init() {
workflow.RegisterWithOptions(archivalWorkflow, workflow.RegisterOptions{Name: archivalWorkflowFnName})
workflow.RegisterWithOptions(archiveHistoryWorkflow, workflow.RegisterOptions{Name: archiveHistoryWorkflowFnName})
activity.RegisterWithOptions(uploadHistoryActivity, activity.RegisterOptions{Name: uploadHistoryActivityFnName})
activity.RegisterWithOptions(deleteHistoryActivity, activity.RegisterOptions{Name: deleteHistoryActivityFnName})
}
Expand Down
Loading

0 comments on commit d91a7bf

Please sign in to comment.