Skip to content

Commit

Permalink
Unify shard rangeID column and field in Cass persistence (cadence-wor…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Oct 13, 2020
1 parent da72f32 commit 9a787f7
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 7 deletions.
8 changes: 4 additions & 4 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1581,7 +1581,8 @@ func updateBufferedEvents(

func createShardInfo(
currentCluster string,
result map[string]interface{},
rangeID int64,
shard map[string]interface{},
) *p.InternalShardInfo {

var pendingFailoverMarkersRawData []byte
Expand All @@ -1591,14 +1592,13 @@ func createShardInfo(
var timerProcessingQueueStatesRawData []byte
var timerProcessingQueueStatesEncoding string
info := &p.InternalShardInfo{}
for k, v := range result {
info.RangeID = rangeID
for k, v := range shard {
switch k {
case "shard_id":
info.ShardID = v.(int)
case "owner":
info.Owner = v.(string)
case "range_id":
info.RangeID = v.(int64)
case "stolen_since_renew":
info.StolenSinceRenew = v.(int)
case "updated_at":
Expand Down
84 changes: 82 additions & 2 deletions common/persistence/cassandra/cassandraShardPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra"
"github.com/uber/cadence/common/service/config"
Expand Down Expand Up @@ -62,7 +63,7 @@ const (
`shard_id, type, domain_id, workflow_id, run_id, visibility_ts, task_id, shard, range_id)` +
`VALUES(?, ?, ?, ?, ?, ?, ?, ` + templateShardType + `, ?) IF NOT EXISTS`

templateGetShardQuery = `SELECT shard ` +
templateGetShardQuery = `SELECT shard, range_id ` +
`FROM executions ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
Expand All @@ -82,6 +83,17 @@ const (
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF range_id = ?`

templateUpdateRangeIDQuery = `UPDATE executions ` +
`SET range_id = ? ` +
`WHERE shard_id = ? ` +
`and type = ? ` +
`and domain_id = ? ` +
`and workflow_id = ? ` +
`and run_id = ? ` +
`and visibility_ts = ? ` +
`and task_id = ? ` +
`IF range_id = ?`
)

type (
Expand Down Expand Up @@ -218,11 +230,79 @@ func (d *cassandraShardPersistence) GetShard(
}
}

info := createShardInfo(d.currentClusterName, result["shard"].(map[string]interface{}))
// check if rangeID column and rangeID field in shard column matches, if not we need to pick the larger
// rangeID.
//
// If shardInfoRangeID < rangeID, we don't need to do anything here as createShardInfo will ignore
// shardInfoRangeID and return rangeID instead. Later when updating the shard, CAS can still succeed
// as the value from rangeID columns is returned, shardInfoRangeID will also be updated to the correct value.
rangeID := result["range_id"].(int64)
shard := result["shard"].(map[string]interface{})
shardInfoRangeID := shard["range_id"].(int64)
if shardInfoRangeID > rangeID {
// In this case we need to fix the rangeID column before returning the result as:
// 1. if we return shardInfoRangeID, then later shard CAS operation will fail
// 2. if we still return rangeID, CAS will work but rangeID will move backward which
// result in lost tasks, corrupted workflow history, etc.

d.logger.Warn("Corrupted shard rangeID", tag.ShardID(shardID), tag.ShardRangeID(shardInfoRangeID), tag.PreviousShardRangeID(rangeID))
if err := d.updateRangeID(context.TODO(), shardID, shardInfoRangeID, rangeID); err != nil {
return nil, err
}
}

info := createShardInfo(d.currentClusterName, rangeID, shard)

return &p.InternalGetShardResponse{ShardInfo: info}, nil
}

func (d *cassandraShardPersistence) updateRangeID(
_ context.Context,
shardID int,
rangeID int64,
previousRangeID int64,
) error {
query := d.session.Query(templateUpdateRangeIDQuery,
rangeID,
shardID,
rowTypeShard,
rowTypeShardDomainID,
rowTypeShardWorkflowID,
rowTypeShardRunID,
defaultVisibilityTimestamp,
rowTypeShardTaskID,
previousRangeID,
)

previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("UpdateRangeID operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("UpdateRangeID operation failed. Error: %v", err),
}
}

if !applied {
var columns []string
for k, v := range previous {
columns = append(columns, fmt.Sprintf("%s=%v", k, v))
}

return &p.ShardOwnershipLostError{
ShardID: d.shardID,
Msg: fmt.Sprintf("Failed to update shard rangeID. previous_range_id: %v, columns: (%v)",
previousRangeID, strings.Join(columns, ",")),
}
}

return nil
}

func (d *cassandraShardPersistence) UpdateShard(
_ context.Context,
request *p.InternalUpdateShardRequest,
Expand Down
2 changes: 1 addition & 1 deletion schema/cassandra/cadence/schema.cql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ CREATE TYPE shard (
owner text, -- Host identifier processing the shard
-- Range identifier used for generating ack ids for tasks within shard.
-- Also used for optimistic concurrency and all writes to a shard are conditional on this value.
range_id bigint,
range_id bigint, -- TO BE DEPRECATED, IN FAVOR OF range_id column in executions table
-- This field keeps track of number of times owner for a shard changes before updating range_id or ack_levels
stolen_since_renew int,
updated_at timestamp,
Expand Down

0 comments on commit 9a787f7

Please sign in to comment.