diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 980ba3a2e54..a381f192568 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -884,6 +884,8 @@ const ( TaskListScavengerScope // BatcherScope is scope used by all metrics emitted by worker.Batcher module BatcherScope + // HistoryScavengerScope is scope used by all metrics emitted by worker.history.Scavenger module + HistoryScavengerScope NumWorkerScopes ) @@ -1270,6 +1272,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ ArchiverPumpScope: {operation: "ArchiverPump"}, ArchiverArchivalWorkflowScope: {operation: "ArchiverArchivalWorkflow"}, TaskListScavengerScope: {operation: "tasklistscavenger"}, + HistoryScavengerScope: {operation: "historyscavenger"}, BatcherScope: {operation: "batcher"}, }, } @@ -1574,6 +1577,10 @@ const ( ExecutorTasksDroppedCount BatcherProcessorSuccess BatcherProcessorFailures + HistoryScavengerSuccessCount + HistoryScavengerErrorCount + HistoryScavengerSkipCount + NumWorkerMetrics ) @@ -1855,6 +1862,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ ExecutorTasksDroppedCount: {metricName: "executor_dropped", metricType: Counter}, BatcherProcessorSuccess: {metricName: "batcher_processor_requests", metricType: Counter}, BatcherProcessorFailures: {metricName: "batcher_processor_errors", metricType: Counter}, + HistoryScavengerSuccessCount: {metricName: "scavenger_success", metricType: Counter}, + HistoryScavengerErrorCount: {metricName: "scavenger_errors", metricType: Counter}, + HistoryScavengerSkipCount: {metricName: "scavenger_skips", metricType: Counter}, }, } diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 6056f95651e..60ab63cac4e 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -22,6 +22,7 @@ package persistence import ( "fmt" + "strings" "time" "github.com/pborman/uuid" @@ -142,6 +143,8 @@ const ( TransferTaskTransferTargetRunID = "30000000-0000-f000-f000-000000000002" ) +const numItemsInGarbageInfo = 3 + type ( // InvalidPersistenceRequestError represents invalid request to persistence InvalidPersistenceRequestError struct { @@ -2408,6 +2411,20 @@ func NewHistoryBranchToken(treeID string) ([]byte, error) { return token, nil } +// NewHistoryBranchTokenByBranchID return a new branch token with treeID/branchID +func NewHistoryBranchTokenByBranchID(treeID, branchID string) ([]byte, error) { + bi := &workflow.HistoryBranch{ + TreeID: &treeID, + BranchID: &branchID, + Ancestors: []*workflow.HistoryBranchRange{}, + } + token, err := internalThriftEncoder.Encode(bi) + if err != nil { + return nil, err + } + return token, nil +} + // NewHistoryBranchTokenFromAnother make up a branchToken func NewHistoryBranchTokenFromAnother(branchID string, anotherToken []byte) ([]byte, error) { var branch workflow.HistoryBranch @@ -2427,3 +2444,20 @@ func NewHistoryBranchTokenFromAnother(branchID string, anotherToken []byte) ([]b } return token, nil } + +func BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID string) string { + return fmt.Sprintf("%v:%v:%v", domainID, workflowID, runID) +} + +func SplitHistoryGarbageCleanupInfo(info string) (domainID, workflowID, runID string, err error) { + ss := strings.Split(info, ":") + // workflowID can contain ":" so len(ss) can be greater than 3 + if len(ss) < numItemsInGarbageInfo { + return "", "", "", fmt.Errorf("not able to split info for %s", info) + } + domainID = ss[0] + runID = ss[len(ss)-1] + workflowEnd := len(info) - len(runID) - 1 + workflowID = info[len(domainID)+1 : workflowEnd] + return +} diff --git a/common/persistence/persistence-tests/shared_test.go b/common/persistence/persistence-tests/shared_test.go new file mode 100644 index 00000000000..d138e519220 --- /dev/null +++ b/common/persistence/persistence-tests/shared_test.go @@ -0,0 +1,50 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package persistencetests + +import ( + "github.com/uber/cadence/common/persistence" + "testing" +) + +func TestGarbageCleanupInfo(t *testing.T) { + domainID := "10000000-5000-f000-f000-000000000000" + workflowID := "workflow-id" + runID := "10000000-5000-f000-f000-000000000002" + + info := persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID) + domainID2, workflowID2, runID2, err := persistence.SplitHistoryGarbageCleanupInfo(info) + if err != nil || domainID != domainID2 || workflowID != workflowID2 || runID != runID2 { + t.Fail() + } +} + +func TestGarbageCleanupInfo_WithColonInWorklfowID(t *testing.T) { + domainID := "10000000-5000-f000-f000-000000000000" + workflowID := "workflow-id:2" + runID := "10000000-5000-f000-f000-000000000002" + + info := persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID) + domainID2, workflowID2, runID2, err := persistence.SplitHistoryGarbageCleanupInfo(info) + if err != nil || domainID != domainID2 || workflowID != workflowID2 || runID != runID2 { + t.Fail() + } +} diff --git a/service/history/workflowExecutionContext.go b/service/history/workflowExecutionContext.go index cf352107802..43b6f645284 100644 --- a/service/history/workflowExecutionContext.go +++ b/service/history/workflowExecutionContext.go @@ -645,7 +645,7 @@ func (c *workflowExecutionContextImpl) persistFirstWorkflowEvents( execution, &persistence.AppendHistoryNodesRequest{ IsNewBranch: true, - Info: historyGarbageCleanupInfo(domainID, workflowID, runID), + Info: persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID), BranchToken: branchToken, Events: events, // TransactionID is set by shard context diff --git a/service/history/workflowResetor.go b/service/history/workflowResetor.go index 24ba102b2cb..c42b7277a49 100644 --- a/service/history/workflowResetor.go +++ b/service/history/workflowResetor.go @@ -342,7 +342,7 @@ func (w *workflowResetorImpl) buildNewMutableStateForReset( forkResp, retError := w.eng.historyV2Mgr.ForkHistoryBranch(&persistence.ForkHistoryBranchRequest{ ForkBranchToken: baseMutableState.GetCurrentBranch(), ForkNodeID: resetDecisionCompletedEventID, - Info: historyGarbageCleanupInfo(domainID, workflowID, newRunID), + Info: persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, newRunID), ShardID: common.IntPtr(w.eng.shard.GetShardID()), }) if retError != nil { @@ -380,10 +380,6 @@ func (w *workflowResetorImpl) terminateIfCurrIsRunning( return } -func historyGarbageCleanupInfo(domainID, workflowID, runID string) string { - return fmt.Sprintf("%v:%v:%v", domainID, workflowID, runID) -} - func (w *workflowResetorImpl) setEventIDsWithHistory(msBuilder mutableState) int64 { history := msBuilder.GetHistoryBuilder().GetHistory().Events firstEvent := history[0] @@ -773,7 +769,7 @@ func (w *workflowResetorImpl) ApplyResetEvent( forkResp, retError := w.eng.historyV2Mgr.ForkHistoryBranch(&persistence.ForkHistoryBranchRequest{ ForkBranchToken: baseMutableState.GetCurrentBranch(), ForkNodeID: decisionFinishEventID, - Info: historyGarbageCleanupInfo(domainID, workflowID, resetAttr.GetNewRunId()), + Info: persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, resetAttr.GetNewRunId()), ShardID: shardID, }) if retError != nil { diff --git a/service/history/workflowResetor_test.go b/service/history/workflowResetor_test.go index edda68ffa61..4447197bbd4 100644 --- a/service/history/workflowResetor_test.go +++ b/service/history/workflowResetor_test.go @@ -4012,7 +4012,7 @@ func (s *resetorSuite) TestApplyReset() { forkReq := &p.ForkHistoryBranchRequest{ ForkBranchToken: forkBranchToken, ForkNodeID: 30, - Info: historyGarbageCleanupInfo(domainID, wid, newRunID), + Info: p.BuildHistoryGarbageCleanupInfo(domainID, wid, newRunID), ShardID: common.IntPtr(s.shardID), } forkResp := &p.ForkHistoryBranchResponse{ diff --git a/service/worker/scanner/history/scavenger.go b/service/worker/scanner/history/scavenger.go new file mode 100644 index 00000000000..79b532f41a7 --- /dev/null +++ b/service/worker/scanner/history/scavenger.go @@ -0,0 +1,308 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "time" + + "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/history/historyserviceclient" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" + p "github.com/uber/cadence/common/persistence" + "go.uber.org/cadence/activity" + "golang.org/x/time/rate" +) + +type ( + // ScavengerHeartbeatDetails is the heartbeat detail for HistoryScavengerActivity + ScavengerHeartbeatDetails struct { + NextPageToken []byte + CurrentPage int + SkipCount int + ErrorCount int + SuccCount int + } + + // Scavenger is the type that holds the state for history scavenger daemon + Scavenger struct { + db p.HistoryV2Manager + client historyserviceclient.Interface + hbd ScavengerHeartbeatDetails + rps int + limiter *rate.Limiter + metrics metrics.Client + logger log.Logger + isInTest bool + } + + taskDetail struct { + domainID string + workflowID string + runID string + treeID string + branchID string + + // passing along the current heartbeat details to make heartbeat within a task so that it won't timeout + hbd ScavengerHeartbeatDetails + } +) + +const ( + // used this to decide how many goroutines to process + rpsPerConcurrency = 50 + pageSize = 1000 + // only clean up history branches that older than this threshold + cleanUpThreshold = time.Hour * 24 +) + +// NewScavenger returns an instance of history scavenger daemon +// The Scavenger can be started by calling the Run() method on the +// returned object. Calling the Run() method will result in one +// complete iteration over all of the history branches in the system. For +// each branch, the scavenger will attempt +// - describe the corresponding workflow execution +// - deletion of history itself, if there are no workflow execution +func NewScavenger( + db p.HistoryV2Manager, + rps int, + client historyserviceclient.Interface, + hbd ScavengerHeartbeatDetails, + metricsClient metrics.Client, + logger log.Logger, +) *Scavenger { + + rateLimiter := rate.NewLimiter(rate.Limit(rps), rps) + + return &Scavenger{ + db: db, + client: client, + hbd: hbd, + rps: rps, + limiter: rateLimiter, + metrics: metricsClient, + logger: logger, + } +} + +// Start starts the scavenger +func (s *Scavenger) Run(ctx context.Context) (ScavengerHeartbeatDetails, error) { + taskCh := make(chan taskDetail, pageSize) + respCh := make(chan error, pageSize) + concurrency := s.rps/rpsPerConcurrency + 1 + + for i := 0; i < concurrency; i++ { + go s.startTaskProcessor(ctx, taskCh, respCh) + } + + for { + resp, err := s.db.GetAllHistoryTreeBranches(&p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + NextPageToken: s.hbd.NextPageToken, + }) + if err != nil { + return s.hbd, err + } + batchCount := len(resp.Branches) + + skips := 0 + errorsOnSplitting := 0 + // send all tasks + for _, br := range resp.Branches { + if time.Now().Add(-cleanUpThreshold).Before(br.ForkTime) { + batchCount-- + skips++ + s.metrics.IncCounter(metrics.HistoryScavengerScope, metrics.HistoryScavengerSkipCount) + continue + } + + domainID, wid, rid, err := p.SplitHistoryGarbageCleanupInfo(br.Info) + if err != nil { + batchCount-- + errorsOnSplitting++ + s.logger.Error("unable to parse the history cleanup info", tag.DetailInfo(br.Info)) + s.metrics.IncCounter(metrics.HistoryScavengerScope, metrics.HistoryScavengerErrorCount) + continue + } + + taskCh <- taskDetail{ + domainID: domainID, + workflowID: wid, + runID: rid, + treeID: br.TreeID, + branchID: br.BranchID, + + hbd: s.hbd, + } + } + + succCount := 0 + errCount := 0 + if batchCount > 0 { + // wait for counters indicate this batch is done + Loop: + for { + select { + case err := <-respCh: + if err == nil { + s.metrics.IncCounter(metrics.HistoryScavengerScope, metrics.HistoryScavengerSuccessCount) + succCount++ + } else { + s.metrics.IncCounter(metrics.HistoryScavengerScope, metrics.HistoryScavengerErrorCount) + errCount++ + } + if succCount+errCount == batchCount { + break Loop + } + case <-ctx.Done(): + return s.hbd, ctx.Err() + } + } + } + + s.hbd.CurrentPage++ + s.hbd.NextPageToken = resp.NextPageToken + s.hbd.SuccCount += succCount + s.hbd.ErrorCount += errCount + errorsOnSplitting + s.hbd.SkipCount += skips + if !s.isInTest { + activity.RecordHeartbeat(ctx, s.hbd) + } + + if len(s.hbd.NextPageToken) == 0 { + break + } + } + return s.hbd, nil +} + +func (s *Scavenger) startTaskProcessor( + ctx context.Context, + taskCh chan taskDetail, + respCh chan error, +) { + for { + select { + case <-ctx.Done(): + return + case task := <-taskCh: + if isDone(ctx) { + return + } + + if !s.isInTest { + activity.RecordHeartbeat(ctx, s.hbd) + } + + err := s.limiter.Wait(ctx) + if err != nil { + respCh <- err + s.logger.Error("encounter error when wait for rate limiter", + getTaskLoggingTags(err, task)...) + continue + } + + // this checks if the mutableState still exists + // if not then the history branch is garbage, we need to delete the history branch + _, err = s.client.DescribeMutableState(ctx, &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr(task.domainID), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr(task.workflowID), + RunId: common.StringPtr(task.runID), + }, + }) + + if err != nil { + if _, ok := err.(*shared.EntityNotExistsError); ok { + //deleting history branch + var branchToken []byte + branchToken, err = p.NewHistoryBranchTokenByBranchID(task.treeID, task.branchID) + if err != nil { + respCh <- err + s.logger.Error("encounter error when creating branch token", + getTaskLoggingTags(err, task)...) + continue + } + + err = s.db.DeleteHistoryBranch(&p.DeleteHistoryBranchRequest{ + BranchToken: branchToken, + // This is a required argument but it is not needed for Cassandra. + // Since this scanner is only for Cassandra, + // we can fill any number here to let to code go through + ShardID: common.IntPtr(1), + }) + if err != nil { + respCh <- err + s.logger.Error("encounter error when deleting garbage history branch", + getTaskLoggingTags(err, task)...) + } else { + // deleted garbage + s.logger.Info("deleted history garbage", + getTaskLoggingTags(nil, task)...) + + respCh <- nil + } + } else { + s.logger.Error("encounter error when describing the mutable state", + getTaskLoggingTags(err, task)...) + respCh <- err + } + } else { + // no garbage + respCh <- nil + } + } + } +} + +func getTaskLoggingTags(err error, task taskDetail) []tag.Tag { + if err != nil { + return []tag.Tag{ + tag.Error(err), + tag.WorkflowDomainID(task.domainID), + tag.WorkflowID(task.workflowID), + tag.WorkflowRunID(task.runID), + tag.WorkflowTreeID(task.treeID), + tag.WorkflowBranchID(task.branchID), + } + } else { + return []tag.Tag{ + tag.WorkflowDomainID(task.domainID), + tag.WorkflowID(task.workflowID), + tag.WorkflowRunID(task.runID), + tag.WorkflowTreeID(task.treeID), + tag.WorkflowBranchID(task.branchID), + } + } +} + +func isDone(ctx context.Context) bool { + select { + case <-ctx.Done(): + return true + default: + return false + } +} diff --git a/service/worker/scanner/history/scavenger_test.go b/service/worker/scanner/history/scavenger_test.go new file mode 100644 index 00000000000..56449028dfd --- /dev/null +++ b/service/worker/scanner/history/scavenger_test.go @@ -0,0 +1,461 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/suite" + "github.com/uber-go/tally" + "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/history/historyservicetest" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/loggerimpl" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/mocks" + p "github.com/uber/cadence/common/persistence" + "go.uber.org/zap" +) + +type ( + ScavengerTestSuite struct { + suite.Suite + logger log.Logger + metric metrics.Client + } +) + +func TestScavengerTestSuite(t *testing.T) { + suite.Run(t, new(ScavengerTestSuite)) +} + +func (s *ScavengerTestSuite) SetupTest() { + zapLogger, err := zap.NewDevelopment() + if err != nil { + s.Require().NoError(err) + } + s.logger = loggerimpl.NewLogger(zapLogger) + s.metric = metrics.NewClient(tally.NoopScope, metrics.Worker) +} + +func (s *ScavengerTestSuite) createTestScavenger(rps int) (*mocks.HistoryV2Manager, *historyservicetest.MockClient, *Scavenger, *gomock.Controller) { + db := &mocks.HistoryV2Manager{} + controller := gomock.NewController(s.T()) + workflowClient := historyservicetest.NewMockClient(controller) + scvgr := NewScavenger(db, 100, workflowClient, ScavengerHeartbeatDetails{}, s.metric, s.logger) + scvgr.isInTest = true + return db, workflowClient, scvgr, controller +} + +func (s *ScavengerTestSuite) TestAllSkipTasksTwoPages() { + db, _, scvgr, controller := s.createTestScavenger(100) + defer controller.Finish() + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + NextPageToken: []byte("page1"), + Branches: []p.HistoryBranchDetail{ + { + TreeID: "treeID1", + BranchID: "branchID1", + ForkTime: time.Now(), + Info: p.BuildHistoryGarbageCleanupInfo("domainID1", "workflowID1", "runID1"), + }, + { + TreeID: "treeID2", + BranchID: "branchID2", + ForkTime: time.Now(), + Info: p.BuildHistoryGarbageCleanupInfo("domainID2", "workflowID2", "runID2"), + }, + }, + }, nil).Once() + + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + NextPageToken: []byte("page1"), + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + Branches: []p.HistoryBranchDetail{ + { + TreeID: "treeID3", + BranchID: "branchID3", + ForkTime: time.Now(), + Info: p.BuildHistoryGarbageCleanupInfo("domainID3", "workflowID3", "runID3"), + }, + { + TreeID: "treeID4", + BranchID: "branchID4", + ForkTime: time.Now(), + Info: p.BuildHistoryGarbageCleanupInfo("domainID4", "workflowID4", "runID4"), + }, + }, + }, nil).Once() + + hbd, err := scvgr.Run(context.Background()) + s.Nil(err) + s.Equal(4, hbd.SkipCount) + s.Equal(0, hbd.SuccCount) + s.Equal(0, hbd.ErrorCount) + s.Equal(2, hbd.CurrentPage) + s.Equal(0, len(hbd.NextPageToken)) +} + +func (s *ScavengerTestSuite) TestAllErrorSplittingTasksTwoPages() { + db, _, scvgr, controller := s.createTestScavenger(100) + defer controller.Finish() + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + NextPageToken: []byte("page1"), + Branches: []p.HistoryBranchDetail{ + { + TreeID: "treeID1", + BranchID: "branchID1", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: "error-info", + }, + { + TreeID: "treeID2", + BranchID: "branchID2", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: "error-info", + }, + }, + }, nil).Once() + + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + NextPageToken: []byte("page1"), + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + Branches: []p.HistoryBranchDetail{ + { + TreeID: "treeID3", + BranchID: "branchID3", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: "error-info", + }, + { + TreeID: "treeID4", + BranchID: "branchID4", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: "error-info", + }, + }, + }, nil).Once() + + hbd, err := scvgr.Run(context.Background()) + s.Nil(err) + s.Equal(0, hbd.SkipCount) + s.Equal(0, hbd.SuccCount) + s.Equal(4, hbd.ErrorCount) + s.Equal(2, hbd.CurrentPage) + s.Equal(0, len(hbd.NextPageToken)) +} + +func (s *ScavengerTestSuite) TestNoGarbageTwoPages() { + db, client, scvgr, controller := s.createTestScavenger(100) + defer controller.Finish() + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + NextPageToken: []byte("page1"), + Branches: []p.HistoryBranchDetail{ + { + TreeID: "treeID1", + BranchID: "branchID1", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID1", "workflowID1", "runID1"), + }, + { + TreeID: "treeID2", + BranchID: "branchID2", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID2", "workflowID2", "runID2"), + }, + }, + }, nil).Once() + + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + NextPageToken: []byte("page1"), + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + Branches: []p.HistoryBranchDetail{ + { + TreeID: "treeID3", + BranchID: "branchID3", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID3", "workflowID3", "runID3"), + }, + { + TreeID: "treeID4", + BranchID: "branchID4", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID4", "workflowID4", "runID4"), + }, + }, + }, nil).Once() + + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID1"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID1"), + RunId: common.StringPtr("runID1"), + }, + }).Return(nil, nil) + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID2"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID2"), + RunId: common.StringPtr("runID2"), + }, + }).Return(nil, nil) + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID3"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID3"), + RunId: common.StringPtr("runID3"), + }, + }).Return(nil, nil) + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID4"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID4"), + RunId: common.StringPtr("runID4"), + }, + }).Return(nil, nil) + + hbd, err := scvgr.Run(context.Background()) + s.Nil(err) + s.Equal(0, hbd.SkipCount) + s.Equal(4, hbd.SuccCount) + s.Equal(0, hbd.ErrorCount) + s.Equal(2, hbd.CurrentPage) + s.Equal(0, len(hbd.NextPageToken)) +} + +func (s *ScavengerTestSuite) TestDeletingBranchesTwoPages() { + db, client, scvgr, controller := s.createTestScavenger(100) + defer controller.Finish() + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + NextPageToken: []byte("page1"), + Branches: []p.HistoryBranchDetail{ + { + TreeID: "treeID1", + BranchID: "branchID1", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID1", "workflowID1", "runID1"), + }, + { + TreeID: "treeID2", + BranchID: "branchID2", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID2", "workflowID2", "runID2"), + }, + }, + }, nil).Once() + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + NextPageToken: []byte("page1"), + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + Branches: []p.HistoryBranchDetail{ + { + TreeID: "treeID3", + BranchID: "branchID3", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID3", "workflowID3", "runID3"), + }, + { + TreeID: "treeID4", + BranchID: "branchID4", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID4", "workflowID4", "runID4"), + }, + }, + }, nil).Once() + + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID1"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID1"), + RunId: common.StringPtr("runID1"), + }, + }).Return(nil, &shared.EntityNotExistsError{}) + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID2"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID2"), + RunId: common.StringPtr("runID2"), + }, + }).Return(nil, &shared.EntityNotExistsError{}) + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID3"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID3"), + RunId: common.StringPtr("runID3"), + }, + }).Return(nil, &shared.EntityNotExistsError{}) + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID4"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID4"), + RunId: common.StringPtr("runID4"), + }, + }).Return(nil, &shared.EntityNotExistsError{}) + + branchToken1, err := p.NewHistoryBranchTokenByBranchID("treeID1", "branchID1") + s.Nil(err) + db.On("DeleteHistoryBranch", &p.DeleteHistoryBranchRequest{ + BranchToken: branchToken1, + ShardID: common.IntPtr(1), + }).Return(nil).Once() + branchToken2, err := p.NewHistoryBranchTokenByBranchID("treeID2", "branchID2") + s.Nil(err) + db.On("DeleteHistoryBranch", &p.DeleteHistoryBranchRequest{ + BranchToken: branchToken2, + ShardID: common.IntPtr(1), + }).Return(nil).Once() + branchToken3, err := p.NewHistoryBranchTokenByBranchID("treeID3", "branchID3") + s.Nil(err) + db.On("DeleteHistoryBranch", &p.DeleteHistoryBranchRequest{ + BranchToken: branchToken3, + ShardID: common.IntPtr(1), + }).Return(nil).Once() + branchToken4, err := p.NewHistoryBranchTokenByBranchID("treeID4", "branchID4") + s.Nil(err) + db.On("DeleteHistoryBranch", &p.DeleteHistoryBranchRequest{ + BranchToken: branchToken4, + ShardID: common.IntPtr(1), + }).Return(nil).Once() + + hbd, err := scvgr.Run(context.Background()) + s.Nil(err) + s.Equal(0, hbd.SkipCount) + s.Equal(4, hbd.SuccCount) + s.Equal(0, hbd.ErrorCount) + s.Equal(2, hbd.CurrentPage) + s.Equal(0, len(hbd.NextPageToken)) +} + +func (s *ScavengerTestSuite) TestMixesTwoPages() { + db, client, scvgr, controller := s.createTestScavenger(100) + defer controller.Finish() + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + NextPageToken: []byte("page1"), + Branches: []p.HistoryBranchDetail{ + { + //skip + TreeID: "treeID1", + BranchID: "branchID1", + ForkTime: time.Now(), + Info: p.BuildHistoryGarbageCleanupInfo("domainID1", "workflowID1", "runID1"), + }, + { + // split error + TreeID: "treeID2", + BranchID: "branchID2", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: "error-info", + }, + }, + }, nil).Once() + db.On("GetAllHistoryTreeBranches", &p.GetAllHistoryTreeBranchesRequest{ + PageSize: pageSize, + NextPageToken: []byte("page1"), + }).Return(&p.GetAllHistoryTreeBranchesResponse{ + Branches: []p.HistoryBranchDetail{ + { + //delete succ + TreeID: "treeID3", + BranchID: "branchID3", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID3", "workflowID3", "runID3"), + }, + { + // delete fail + TreeID: "treeID4", + BranchID: "branchID4", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID4", "workflowID4", "runID4"), + }, + { + //not delete + TreeID: "treeID5", + BranchID: "branchID5", + ForkTime: time.Now().Add(-cleanUpThreshold * 2), + Info: p.BuildHistoryGarbageCleanupInfo("domainID5", "workflowID5", "runID5"), + }, + }, + }, nil).Once() + + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID3"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID3"), + RunId: common.StringPtr("runID3"), + }, + }).Return(nil, &shared.EntityNotExistsError{}) + + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID4"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID4"), + RunId: common.StringPtr("runID4"), + }, + }).Return(nil, &shared.EntityNotExistsError{}) + client.EXPECT().DescribeMutableState(gomock.Any(), &history.DescribeMutableStateRequest{ + DomainUUID: common.StringPtr("domainID5"), + Execution: &shared.WorkflowExecution{ + WorkflowId: common.StringPtr("workflowID5"), + RunId: common.StringPtr("runID5"), + }, + }).Return(nil, nil) + + branchToken3, err := p.NewHistoryBranchTokenByBranchID("treeID3", "branchID3") + s.Nil(err) + db.On("DeleteHistoryBranch", &p.DeleteHistoryBranchRequest{ + BranchToken: branchToken3, + ShardID: common.IntPtr(1), + }).Return(nil).Once() + + branchToken4, err := p.NewHistoryBranchTokenByBranchID("treeID4", "branchID4") + s.Nil(err) + db.On("DeleteHistoryBranch", &p.DeleteHistoryBranchRequest{ + BranchToken: branchToken4, + ShardID: common.IntPtr(1), + }).Return(fmt.Errorf("failed to delete history")).Once() + + hbd, err := scvgr.Run(context.Background()) + s.Nil(err) + s.Equal(1, hbd.SkipCount) + s.Equal(2, hbd.SuccCount) + s.Equal(2, hbd.ErrorCount) + s.Equal(2, hbd.CurrentPage) + s.Equal(0, len(hbd.NextPageToken)) +} diff --git a/service/worker/scanner/scanner.go b/service/worker/scanner/scanner.go index 901f0eea13f..c00e48d521f 100644 --- a/service/worker/scanner/scanner.go +++ b/service/worker/scanner/scanner.go @@ -25,6 +25,7 @@ import ( "time" "github.com/uber-go/tally" + "github.com/uber/cadence/client" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cluster" @@ -60,6 +61,8 @@ type ( Config Config // SDKClient is an instance of cadence sdk client SDKClient workflowserviceclient.Interface + // clientBean is an instance of ClientBean + ClientBean client.Bean // MetricsClient is an instance of metrics object for emitting stats MetricsClient metrics.Client Logger log.Logger @@ -72,8 +75,10 @@ type ( scannerContext struct { taskDB p.TaskManager domainDB p.MetadataManager + historyDB p.HistoryV2Manager cfg Config sdkClient workflowserviceclient.Interface + clientBean client.Bean metricsClient metrics.Client tallyScope tally.Scope logger log.Logger @@ -104,6 +109,7 @@ func New(params *BootstrapParams) *Scanner { context: scannerContext{ cfg: cfg, sdkClient: params.SDKClient, + clientBean: params.ClientBean, metricsClient: params.MetricsClient, tallyScope: params.TallyScope, zapLogger: zapLogger, @@ -124,35 +130,41 @@ func (s *Scanner) Start() error { MaxConcurrentDecisionTaskExecutionSize: maxConcurrentDecisionTaskExecutionSize, BackgroundActivityContext: context.WithValue(context.Background(), scannerContextKey, s.context), } - go s.startWorkflowWithRetry() + + if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeSQL { + go s.startWorkflowWithRetry(tlScannerWFStartOptions, tlScannerWFTypeName) + } else if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeCassandra { + go s.startWorkflowWithRetry(historyScannerWFStartOptions, historyScannerWFTypeName) + } + worker := worker.New(s.context.sdkClient, common.SystemLocalDomainName, tlScannerTaskListName, workerOpts) return worker.Start() } -func (s *Scanner) startWorkflowWithRetry() error { +func (s *Scanner) startWorkflowWithRetry(options cclient.StartWorkflowOptions, wfType string) error { client := cclient.NewClient(s.context.sdkClient, common.SystemLocalDomainName, &cclient.Options{}) policy := backoff.NewExponentialRetryPolicy(time.Second) policy.SetMaximumInterval(time.Minute) policy.SetExpirationInterval(backoff.NoInterval) return backoff.Retry(func() error { - return s.startWorkflow(client) + return s.startWorkflow(client, options, wfType) }, policy, func(err error) bool { return true }) } -func (s *Scanner) startWorkflow(client cclient.Client) error { +func (s *Scanner) startWorkflow(client cclient.Client, options cclient.StartWorkflowOptions, wfType string) error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) - _, err := client.StartWorkflow(ctx, tlScannerWFStartOptions, tlScannerWFTypeName) + _, err := client.StartWorkflow(ctx, options, wfType) cancel() if err != nil { if _, ok := err.(*shared.WorkflowExecutionAlreadyStartedError); ok { return nil } - s.context.logger.Error("error starting scanner workflow", tag.Error(err)) + s.context.logger.Error("error starting "+wfType+" workflow", tag.Error(err)) return err } - s.context.logger.Info("Scanner workflow successfully started") + s.context.logger.Info(wfType + " workflow successfully started") return nil } @@ -167,7 +179,12 @@ func (s *Scanner) buildContext() error { if err != nil { return err } + historyDB, err := pFactory.NewHistoryV2Manager() + if err != nil { + return err + } s.context.taskDB = taskDB s.context.domainDB = domainDB + s.context.historyDB = historyDB return nil } diff --git a/service/worker/scanner/workflow.go b/service/worker/scanner/workflow.go index c5012e59120..1a328ae6099 100644 --- a/service/worker/scanner/workflow.go +++ b/service/worker/scanner/workflow.go @@ -25,6 +25,7 @@ import ( "time" "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/service/worker/scanner/history" "github.com/uber/cadence/service/worker/scanner/tasklist" "go.uber.org/cadence" "go.uber.org/cadence/activity" @@ -32,7 +33,9 @@ import ( "go.uber.org/cadence/workflow" ) -type contextKey int +type ( + contextKey int +) const ( scannerContextKey = contextKey(0) @@ -45,17 +48,28 @@ const ( tlScannerWFTypeName = "cadence-sys-tl-scanner-workflow" tlScannerTaskListName = "cadence-sys-tl-scanner-tasklist-0" taskListScavengerActivityName = "cadence-sys-tl-scanner-scvg-activity" + + historyScannerWFID = "cadence-sys-history-scanner" + historyScannerWFTypeName = "cadence-sys-history-scanner-workflow" + historyScannerTaskListName = "cadence-sys-history-scanner-tasklist-0" + historyScavengerActivityName = "cadence-sys-history-scanner-scvg-activity" ) var ( tlScavengerHBInterval = 10 * time.Second - tlScavengerActivityRetryPolicy = cadence.RetryPolicy{ + activityRetryPolicy = cadence.RetryPolicy{ InitialInterval: 10 * time.Second, BackoffCoefficient: 1.7, MaximumInterval: 5 * time.Minute, ExpirationInterval: infiniteDuration, } + activityOptions = workflow.ActivityOptions{ + ScheduleToStartTimeout: 5 * time.Minute, + StartToCloseTimeout: infiniteDuration, + HeartbeatTimeout: 5 * time.Minute, + RetryPolicy: &activityRetryPolicy, + } tlScannerWFStartOptions = cclient.StartWorkflowOptions{ ID: tlScannerWFID, TaskList: tlScannerTaskListName, @@ -63,25 +77,50 @@ var ( WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate, CronSchedule: "0 */12 * * *", } + historyScannerWFStartOptions = cclient.StartWorkflowOptions{ + ID: historyScannerWFID, + TaskList: historyScannerTaskListName, + ExecutionStartToCloseTimeout: infiniteDuration, + WorkflowIDReusePolicy: cclient.WorkflowIDReusePolicyAllowDuplicate, + } ) func init() { workflow.RegisterWithOptions(TaskListScannerWorkflow, workflow.RegisterOptions{Name: tlScannerWFTypeName}) + workflow.RegisterWithOptions(HistoryScannerWorkflow, workflow.RegisterOptions{Name: historyScannerWFTypeName}) activity.RegisterWithOptions(TaskListScavengerActivity, activity.RegisterOptions{Name: taskListScavengerActivityName}) + activity.RegisterWithOptions(HistoryScavengerActivity, activity.RegisterOptions{Name: historyScavengerActivityName}) } // TaskListScannerWorkflow is the workflow that runs the task-list scanner background daemon func TaskListScannerWorkflow(ctx workflow.Context) error { - opts := workflow.ActivityOptions{ - ScheduleToStartTimeout: 5 * time.Minute, - StartToCloseTimeout: infiniteDuration, - HeartbeatTimeout: 5 * time.Minute, - RetryPolicy: &tlScavengerActivityRetryPolicy, - } - future := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, opts), taskListScavengerActivityName) + + future := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, activityOptions), taskListScavengerActivityName) + return future.Get(ctx, nil) +} + +// HistoryScannerWorkflow is the workflow that runs the history scanner background daemon +func HistoryScannerWorkflow(ctx workflow.Context) error { + future := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, activityOptions), historyScavengerActivityName) return future.Get(ctx, nil) } +// HistoryScavengerActivity is the activity that runs history scavenger +func HistoryScavengerActivity(aCtx context.Context) (history.ScavengerHeartbeatDetails, error) { + ctx := aCtx.Value(scannerContextKey).(scannerContext) + rps := ctx.cfg.PersistenceMaxQPS() + + hbd := history.ScavengerHeartbeatDetails{} + if activity.HasHeartbeatDetails(aCtx) { + if err := activity.GetHeartbeatDetails(aCtx, &hbd); err != nil { + ctx.logger.Error("Failed to recover from last heartbeat, start over from beginning", tag.Error(err)) + } + } + + scavenger := history.NewScavenger(ctx.historyDB, rps, ctx.clientBean.GetHistoryClient(), hbd, ctx.metricsClient, ctx.logger) + return scavenger.Run(aCtx) +} + // TaskListScavengerActivity is the activity that runs task list scavenger func TaskListScavengerActivity(aCtx context.Context) error { ctx := aCtx.Value(scannerContextKey).(scannerContext) diff --git a/service/worker/service.go b/service/worker/service.go index 48ad7312c55..25981b8abcb 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -36,7 +36,6 @@ import ( "github.com/uber/cadence/common/persistence" persistencefactory "github.com/uber/cadence/common/persistence/persistence-factory" "github.com/uber/cadence/common/service" - "github.com/uber/cadence/common/service/config" "github.com/uber/cadence/common/service/dynamicconfig" "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/batcher" @@ -147,29 +146,22 @@ func (s *Service) Start() { replicatorEnabled := base.GetClusterMetadata().IsGlobalDomainEnabled() archiverEnabled := base.GetArchivalMetadata().GetHistoryConfig().ClusterConfiguredForArchival() - scannerEnabled := s.config.ScannerCfg.Persistence.DefaultStoreType() == config.StoreTypeSQL batcherEnabled := s.config.EnableBatcher() - if replicatorEnabled || archiverEnabled || scannerEnabled || batcherEnabled { - pConfig := s.params.PersistenceConfig - pConfig.SetMaxQPS(pConfig.DefaultStore, s.config.ReplicationCfg.PersistenceMaxQPS()) - pFactory := persistencefactory.New(&pConfig, s.params.ClusterMetadata.GetCurrentClusterName(), s.metricsClient, s.logger) + pConfig := s.params.PersistenceConfig + pConfig.SetMaxQPS(pConfig.DefaultStore, s.config.ReplicationCfg.PersistenceMaxQPS()) + pFactory := persistencefactory.New(&pConfig, s.params.ClusterMetadata.GetCurrentClusterName(), s.metricsClient, s.logger) + s.ensureSystemDomainExists(pFactory, base.GetClusterMetadata().GetCurrentClusterName()) - if archiverEnabled || scannerEnabled { - s.ensureSystemDomainExists(pFactory, base.GetClusterMetadata().GetCurrentClusterName()) - } - if replicatorEnabled { - s.startReplicator(base, pFactory) - } - if archiverEnabled { - s.startArchiver(base, pFactory) - } - if scannerEnabled { - s.startScanner(base) - } - if batcherEnabled { - s.startBatcher(base) - } + s.startScanner(base) + if replicatorEnabled { + s.startReplicator(base, pFactory) + } + if archiverEnabled { + s.startArchiver(base, pFactory) + } + if batcherEnabled { + s.startBatcher(base) } s.logger.Info("service started", tag.ComponentWorker) @@ -205,6 +197,7 @@ func (s *Service) startScanner(base service.Service) { params := &scanner.BootstrapParams{ Config: *s.config.ScannerCfg, SDKClient: s.params.PublicClient, + ClientBean: base.GetClientBean(), MetricsClient: s.metricsClient, Logger: s.logger, TallyScope: s.params.MetricScope,