Skip to content

Commit

Permalink
History cleanup scanner for Cassandra (cadence-workflow#2499)
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Sep 6, 2019
1 parent c27ca16 commit 125b77d
Show file tree
Hide file tree
Showing 11 changed files with 953 additions and 45 deletions.
10 changes: 10 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,8 @@ const (
TaskListScavengerScope
// BatcherScope is scope used by all metrics emitted by worker.Batcher module
BatcherScope
// HistoryScavengerScope is scope used by all metrics emitted by worker.history.Scavenger module
HistoryScavengerScope

NumWorkerScopes
)
Expand Down Expand Up @@ -1270,6 +1272,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
ArchiverPumpScope: {operation: "ArchiverPump"},
ArchiverArchivalWorkflowScope: {operation: "ArchiverArchivalWorkflow"},
TaskListScavengerScope: {operation: "tasklistscavenger"},
HistoryScavengerScope: {operation: "historyscavenger"},
BatcherScope: {operation: "batcher"},
},
}
Expand Down Expand Up @@ -1574,6 +1577,10 @@ const (
ExecutorTasksDroppedCount
BatcherProcessorSuccess
BatcherProcessorFailures
HistoryScavengerSuccessCount
HistoryScavengerErrorCount
HistoryScavengerSkipCount

NumWorkerMetrics
)

Expand Down Expand Up @@ -1855,6 +1862,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
ExecutorTasksDroppedCount: {metricName: "executor_dropped", metricType: Counter},
BatcherProcessorSuccess: {metricName: "batcher_processor_requests", metricType: Counter},
BatcherProcessorFailures: {metricName: "batcher_processor_errors", metricType: Counter},
HistoryScavengerSuccessCount: {metricName: "scavenger_success", metricType: Counter},
HistoryScavengerErrorCount: {metricName: "scavenger_errors", metricType: Counter},
HistoryScavengerSkipCount: {metricName: "scavenger_skips", metricType: Counter},
},
}

Expand Down
34 changes: 34 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package persistence

import (
"fmt"
"strings"
"time"

"github.com/pborman/uuid"
Expand Down Expand Up @@ -142,6 +143,8 @@ const (
TransferTaskTransferTargetRunID = "30000000-0000-f000-f000-000000000002"
)

const numItemsInGarbageInfo = 3

type (
// InvalidPersistenceRequestError represents invalid request to persistence
InvalidPersistenceRequestError struct {
Expand Down Expand Up @@ -2408,6 +2411,20 @@ func NewHistoryBranchToken(treeID string) ([]byte, error) {
return token, nil
}

// NewHistoryBranchTokenByBranchID return a new branch token with treeID/branchID
func NewHistoryBranchTokenByBranchID(treeID, branchID string) ([]byte, error) {
bi := &workflow.HistoryBranch{
TreeID: &treeID,
BranchID: &branchID,
Ancestors: []*workflow.HistoryBranchRange{},
}
token, err := internalThriftEncoder.Encode(bi)
if err != nil {
return nil, err
}
return token, nil
}

// NewHistoryBranchTokenFromAnother make up a branchToken
func NewHistoryBranchTokenFromAnother(branchID string, anotherToken []byte) ([]byte, error) {
var branch workflow.HistoryBranch
Expand All @@ -2427,3 +2444,20 @@ func NewHistoryBranchTokenFromAnother(branchID string, anotherToken []byte) ([]b
}
return token, nil
}

func BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID string) string {
return fmt.Sprintf("%v:%v:%v", domainID, workflowID, runID)
}

func SplitHistoryGarbageCleanupInfo(info string) (domainID, workflowID, runID string, err error) {
ss := strings.Split(info, ":")
// workflowID can contain ":" so len(ss) can be greater than 3
if len(ss) < numItemsInGarbageInfo {
return "", "", "", fmt.Errorf("not able to split info for %s", info)
}
domainID = ss[0]
runID = ss[len(ss)-1]
workflowEnd := len(info) - len(runID) - 1
workflowID = info[len(domainID)+1 : workflowEnd]
return
}
50 changes: 50 additions & 0 deletions common/persistence/persistence-tests/shared_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package persistencetests

import (
"github.com/uber/cadence/common/persistence"
"testing"
)

func TestGarbageCleanupInfo(t *testing.T) {
domainID := "10000000-5000-f000-f000-000000000000"
workflowID := "workflow-id"
runID := "10000000-5000-f000-f000-000000000002"

info := persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID)
domainID2, workflowID2, runID2, err := persistence.SplitHistoryGarbageCleanupInfo(info)
if err != nil || domainID != domainID2 || workflowID != workflowID2 || runID != runID2 {
t.Fail()
}
}

func TestGarbageCleanupInfo_WithColonInWorklfowID(t *testing.T) {
domainID := "10000000-5000-f000-f000-000000000000"
workflowID := "workflow-id:2"
runID := "10000000-5000-f000-f000-000000000002"

info := persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID)
domainID2, workflowID2, runID2, err := persistence.SplitHistoryGarbageCleanupInfo(info)
if err != nil || domainID != domainID2 || workflowID != workflowID2 || runID != runID2 {
t.Fail()
}
}
2 changes: 1 addition & 1 deletion service/history/workflowExecutionContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,7 @@ func (c *workflowExecutionContextImpl) persistFirstWorkflowEvents(
execution,
&persistence.AppendHistoryNodesRequest{
IsNewBranch: true,
Info: historyGarbageCleanupInfo(domainID, workflowID, runID),
Info: persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID),
BranchToken: branchToken,
Events: events,
// TransactionID is set by shard context
Expand Down
8 changes: 2 additions & 6 deletions service/history/workflowResetor.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (w *workflowResetorImpl) buildNewMutableStateForReset(
forkResp, retError := w.eng.historyV2Mgr.ForkHistoryBranch(&persistence.ForkHistoryBranchRequest{
ForkBranchToken: baseMutableState.GetCurrentBranch(),
ForkNodeID: resetDecisionCompletedEventID,
Info: historyGarbageCleanupInfo(domainID, workflowID, newRunID),
Info: persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, newRunID),
ShardID: common.IntPtr(w.eng.shard.GetShardID()),
})
if retError != nil {
Expand Down Expand Up @@ -380,10 +380,6 @@ func (w *workflowResetorImpl) terminateIfCurrIsRunning(
return
}

func historyGarbageCleanupInfo(domainID, workflowID, runID string) string {
return fmt.Sprintf("%v:%v:%v", domainID, workflowID, runID)
}

func (w *workflowResetorImpl) setEventIDsWithHistory(msBuilder mutableState) int64 {
history := msBuilder.GetHistoryBuilder().GetHistory().Events
firstEvent := history[0]
Expand Down Expand Up @@ -773,7 +769,7 @@ func (w *workflowResetorImpl) ApplyResetEvent(
forkResp, retError := w.eng.historyV2Mgr.ForkHistoryBranch(&persistence.ForkHistoryBranchRequest{
ForkBranchToken: baseMutableState.GetCurrentBranch(),
ForkNodeID: decisionFinishEventID,
Info: historyGarbageCleanupInfo(domainID, workflowID, resetAttr.GetNewRunId()),
Info: persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, resetAttr.GetNewRunId()),
ShardID: shardID,
})
if retError != nil {
Expand Down
2 changes: 1 addition & 1 deletion service/history/workflowResetor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4012,7 +4012,7 @@ func (s *resetorSuite) TestApplyReset() {
forkReq := &p.ForkHistoryBranchRequest{
ForkBranchToken: forkBranchToken,
ForkNodeID: 30,
Info: historyGarbageCleanupInfo(domainID, wid, newRunID),
Info: p.BuildHistoryGarbageCleanupInfo(domainID, wid, newRunID),
ShardID: common.IntPtr(s.shardID),
}
forkResp := &p.ForkHistoryBranchResponse{
Expand Down
Loading

0 comments on commit 125b77d

Please sign in to comment.