Skip to content

Commit

Permalink
Admin CLI: Delete history before delete execution records in delete … (
Browse files Browse the repository at this point in the history
…cadence-workflow#1601)

* Admin CLI: Delete history before delete execution records in  delete command

* address comment

* address comment
  • Loading branch information
longquanzheng authored Mar 26, 2019
1 parent aaebd95 commit 9ec28e7
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 58 deletions.
32 changes: 31 additions & 1 deletion common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,8 @@ workflow_state = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? `

templateDeleteWorkflowExecutionCurrentRowQuery = templateDeleteWorkflowExecutionMutableStateQuery + " if current_run_id = ? "

templateDeleteWorkflowExecutionSignalRequestedQuery = `UPDATE executions ` +
`SET signal_requested = signal_requested - ? ` +
`WHERE shard_id = ? ` +
Expand Down Expand Up @@ -904,8 +906,10 @@ type (
}
)

var _ p.ExecutionStore = (*cassandraPersistence)(nil)

//NewWorkflowExecutionPersistenceFromSession returns new ExecutionStore
func NewWorkflowExecutionPersistenceFromSession(session *gocql.Session, shardID int, logger bark.Logger) p.ExecutionStore {
func NewWorkflowExecutionPersistenceFromSession(session *gocql.Session, shardID int, logger bark.Logger) *cassandraPersistence {
return &cassandraPersistence{cassandraStore: cassandraStore{session: session, logger: logger}, shardID: shardID}
}

Expand Down Expand Up @@ -2276,6 +2280,32 @@ func (d *cassandraPersistence) DeleteWorkflowExecution(request *p.DeleteWorkflow
return nil
}

func (d *cassandraPersistence) DeleteWorkflowCurrentRow(request *p.DeleteWorkflowExecutionRequest) error {
query := d.session.Query(templateDeleteWorkflowExecutionCurrentRowQuery,
d.shardID,
rowTypeExecution,
request.DomainID,
request.WorkflowID,
permanentRunID,
defaultVisibilityTimestamp,
rowTypeExecutionTaskID,
request.RunID)

err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("DeleteWorkflowCurrentRow operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("DeleteWorkflowCurrentRow operation failed. Error: %v", err),
}
}

return nil
}

func (d *cassandraPersistence) GetCurrentExecution(request *p.GetCurrentExecutionRequest) (*p.GetCurrentExecutionResponse,
error) {
query := d.session.Query(templateGetCurrentExecutionQuery,
Expand Down
13 changes: 5 additions & 8 deletions tools/cli/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,9 @@ func newAdminWorkflowCommands() []cli.Command {
Name: FlagRunIDWithAlias,
Usage: "RunID",
},
cli.StringFlag{
Name: FlagDomainID,
Usage: "DomainID",
},
cli.IntFlag{
Name: FlagShardID,
Usage: "ShardID",
cli.BoolFlag{
Name: FlagSkipErrorModeWithAlias,
Usage: "skip errors when deleting history",
},

// for cassandra connection
Expand All @@ -129,7 +125,8 @@ func newAdminWorkflowCommands() []cli.Command {
},
cli.IntFlag{
Name: FlagPort,
Usage: "cassandra port for the host (default is 9042)",
Value: 9042,
Usage: "cassandra port for the host",
},
cli.StringFlag{
Name: FlagUsername,
Expand Down
146 changes: 97 additions & 49 deletions tools/cli/adminCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (

"io/ioutil"

"strconv"

"github.com/gocql/gocql"
"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/admin"
Expand Down Expand Up @@ -126,32 +128,15 @@ func AdminShowWorkflow(c *cli.Context) {

// AdminDescribeWorkflow describe a new workflow execution for admin
func AdminDescribeWorkflow(c *cli.Context) {
adminClient := cFactory.ServerAdminClient(c)

domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)

ctx, cancel := newContext(c)
defer cancel()

resp, err := adminClient.DescribeWorkflowExecution(ctx, &admin.DescribeWorkflowExecutionRequest{
Domain: common.StringPtr(domain),
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(wid),
RunId: common.StringPtr(rid),
},
})
if err != nil {
ErrorAndExit("Describe workflow execution failed", err)
}
resp := describeMutableState(c)

prettyPrintJSONObject(resp)

if resp != nil {
msStr := resp.GetMutableStateInDatabase()
ms := persistence.WorkflowMutableState{}
err = json.Unmarshal([]byte(msStr), &ms)
err := json.Unmarshal([]byte(msStr), &ms)
if err != nil {
ErrorAndExit("json.Unmarshal err", err)
}
Expand All @@ -167,50 +152,113 @@ func AdminDescribeWorkflow(c *cli.Context) {
}
}

// AdminDeleteWorkflow describe a new workflow execution for admin
func AdminDeleteWorkflow(c *cli.Context) {
domainID := getRequiredOption(c, FlagDomainID)
func describeMutableState(c *cli.Context) *admin.DescribeWorkflowExecutionResponse {
adminClient := cFactory.ServerAdminClient(c)

domain := getRequiredGlobalOption(c, FlagDomain)
wid := getRequiredOption(c, FlagWorkflowID)
rid := getRequiredOption(c, FlagRunID)
if !c.IsSet(FlagShardID) {
ErrorAndExit("shardID is required", nil)
}
shardID := c.Int(FlagShardID)
rid := c.String(FlagRunID)

session := connectToCassandra(c)
ctx, cancel := newContext(c)
defer cancel()

var err error
permanentRunID := "30000000-0000-f000-f000-000000000001"
selectTmpl := "select execution from executions where shard_id = ? and type = 1 and domain_id = ? and workflow_id = ? and run_id = ? "
deleteTmpl := "delete from executions where shard_id = ? and type = 1 and domain_id = ? and workflow_id = ? and run_id = ? "
resp, err := adminClient.DescribeWorkflowExecution(ctx, &admin.DescribeWorkflowExecutionRequest{
Domain: common.StringPtr(domain),
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(wid),
RunId: common.StringPtr(rid),
},
})
if err != nil {
ErrorAndExit("Get workflow mutableState failed", err)
}
return resp
}

query := session.Query(selectTmpl, shardID, domainID, wid, permanentRunID)
_, err = readOneRow(query)
// AdminDeleteWorkflow describe a new workflow execution for admin
func AdminDeleteWorkflow(c *cli.Context) {
wid := getRequiredOption(c, FlagWorkflowID)
rid := c.String(FlagRunID)

resp := describeMutableState(c)
shardID := resp.GetShardId()
msStr := resp.GetMutableStateInDatabase()
ms := persistence.WorkflowMutableState{}
err := json.Unmarshal([]byte(msStr), &ms)
if err != nil {
fmt.Printf("readOneRow for permanentRunID, %v, skip \n", err)
ErrorAndExit("json.Unmarshal err", err)
}
domainID := ms.ExecutionInfo.DomainID
skipError := c.Bool(FlagSkipErrorMode)
session := connectToCassandra(c)
if ms.ExecutionInfo.EventStoreVersion == persistence.EventStoreVersionV2 {
branchInfo := shared.HistoryBranch{}
thriftrwEncoder := codec.NewThriftRWEncoder()
err := thriftrwEncoder.Decode(ms.ExecutionInfo.BranchToken, &branchInfo)
if err != nil {
ErrorAndExit("thriftrwEncoder.Decode err", err)
}
fmt.Println("deleting history events for ...")
prettyPrintJSONObject(branchInfo)
histV2 := cassp.NewHistoryV2PersistenceFromSession(session, bark.NewNopLogger())
err = histV2.DeleteHistoryBranch(&persistence.InternalDeleteHistoryBranchRequest{
BranchInfo: branchInfo,
})
if err != nil {
if skipError {
fmt.Println("failed to delete history, ", err)
} else {
ErrorAndExit("DeleteHistoryBranch err", err)
}
}
} else {

query := session.Query(deleteTmpl, shardID, domainID, wid, permanentRunID)
err := query.Exec()
histV1 := cassp.NewHistoryPersistenceFromSession(session, bark.NewNopLogger())
err = histV1.DeleteWorkflowExecutionHistory(&persistence.DeleteWorkflowExecutionHistoryRequest{
DomainID: domainID,
Execution: shared.WorkflowExecution{
WorkflowId: common.StringPtr(wid),
RunId: common.StringPtr(rid),
},
})
if err != nil {
ErrorAndExit("delete row failed", err)
if skipError {
fmt.Println("failed to delete history, ", err)
} else {
ErrorAndExit("DeleteWorkflowExecutionHistory err", err)
}
}
fmt.Println("delete row successfully")
}

query = session.Query(selectTmpl, shardID, domainID, wid, rid)
_, err = readOneRow(query)
shardIDInt, err := strconv.Atoi(shardID)
if err != nil {
fmt.Printf("readOneRow for rid %v, %v, skip \n", rid, err)
} else {
ErrorAndExit("strconv.Atoi(shardID) err", err)
}
exeStore := cassp.NewWorkflowExecutionPersistenceFromSession(session, shardIDInt, bark.NewNopLogger())
req := &persistence.DeleteWorkflowExecutionRequest{
DomainID: domainID,
WorkflowID: wid,
RunID: rid,
}

query := session.Query(deleteTmpl, shardID, domainID, wid, rid)
err := query.Exec()
if err != nil {
ErrorAndExit("delete row failed", err)
err = exeStore.DeleteWorkflowExecution(req)
if err != nil {
if skipError {
fmt.Println("delete mutableState row failed, ", err)
} else {
ErrorAndExit("delete mutableState row failed", err)
}
}
fmt.Println("delete mutableState row successfully")

err = exeStore.DeleteWorkflowCurrentRow(req)
if err != nil {
if skipError {
fmt.Println("delete current row failed, ", err)
} else {
ErrorAndExit("delete current row failed", err)
}
fmt.Println("delete row successfully")
}
fmt.Println("delete current row successfully")
}

func readOneRow(query *gocql.Query) (map[string]interface{}, error) {
Expand Down

0 comments on commit 9ec28e7

Please sign in to comment.