Skip to content

Commit

Permalink
Support for renewing RangeID for shard and life-cycle
Browse files Browse the repository at this point in the history
Summary:
Persistence API changes to return ShardOwnershipLostError when
conditional update fails due to RangeID.

ShardController exposes a shardClosedCh to manage cleanup of shards when
shard ownership lost is detected.

ShardContext manages the close handling and writes to shardClosedCh when
it sees ShardOwnershipLostError.

TransferQueueProcessor calls update API even if there are no changes to
trigger Shard closure if ownership is lost.

Reviewers: sivakk, tamer, maxim

Reviewed By: tamer, maxim

Subscribers: jenkins, aravindv, venkat

Differential Revision: https://code.uberinternal.com/D736505
  • Loading branch information
samarabbas committed Feb 17, 2017
1 parent 4b68ab7 commit 0daa276
Show file tree
Hide file tree
Showing 17 changed files with 844 additions and 143 deletions.
52 changes: 45 additions & 7 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
`owner: ?, ` +
`range_id: ?, ` +
`stolen_since_renew: ?, ` +
`updated_at: ?, ` +
`transfer_ack_level: ?` +
`}`

Expand Down Expand Up @@ -374,6 +375,7 @@ func NewCassandraTaskPersistence(hosts string, keyspace string, logger bark.Logg
}

func (d *cassandraPersistence) CreateShard(request *CreateShardRequest) error {
cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano())
shardInfo := request.ShardInfo
query := d.session.Query(templateCreateShardQuery,
shardInfo.ShardID,
Expand All @@ -385,6 +387,7 @@ func (d *cassandraPersistence) CreateShard(request *CreateShardRequest) error {
shardInfo.Owner,
shardInfo.RangeID,
shardInfo.StolenSinceRenew,
cqlNowTimestamp,
shardInfo.TransferAckLevel,
shardInfo.RangeID)

Expand Down Expand Up @@ -435,13 +438,15 @@ func (d *cassandraPersistence) GetShard(request *GetShardRequest) (*GetShardResp
}

func (d *cassandraPersistence) UpdateShard(request *UpdateShardRequest) error {
cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano())
shardInfo := request.ShardInfo

query := d.session.Query(templateUpdateShardQuery,
shardInfo.ShardID,
shardInfo.Owner,
shardInfo.RangeID,
shardInfo.StolenSinceRenew,
cqlNowTimestamp,
shardInfo.TransferAckLevel,
shardInfo.RangeID,
shardInfo.ShardID,
Expand All @@ -465,7 +470,7 @@ func (d *cassandraPersistence) UpdateShard(request *UpdateShardRequest) error {
columns = append(columns, fmt.Sprintf("%s=%v", k, v))
}

return &ConditionFailedError{
return &ShardOwnershipLostError{
Msg: fmt.Sprintf("Failed to update shard. previous_range_id: %v, columns: (%v)",
request.PreviousRangeID, strings.Join(columns, ",")),
}
Expand Down Expand Up @@ -525,15 +530,30 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx
}

if !applied {
if rangeID, ok := previous["range_id"].(int64); ok && rangeID != request.RangeID {
// CreateWorkflowExecution failed because rangeID was modified
return nil, &ShardOwnershipLostError{
Msg: fmt.Sprintf("Failed to create workflow execution. Request RangeID: %v, Actual RangeID: %v",
request.RangeID, rangeID),
}
}

var columns []string
for k, v := range previous {
columns = append(columns, fmt.Sprintf("%s=%v", k, v))
}

execution := previous["execution"].(map[string]interface{})
return nil, &workflow.WorkflowExecutionAlreadyStartedError{
Message: fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)",
execution["workflow_id"], execution["run_id"], request.RangeID, strings.Join(columns, ",")),
if execution, ok := previous["execution"].(map[string]interface{}); ok {
// CreateWorkflowExecution failed because it already exists
return nil, &workflow.WorkflowExecutionAlreadyStartedError{
Message: fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)",
execution["workflow_id"], execution["run_id"], request.RangeID, strings.Join(columns, ",")),
}
}

return nil, &ConditionFailedError{
Msg: fmt.Sprintf("Failed to create workflow execution. Request RangeID: %v, columns: (%v)",
request.RangeID, strings.Join(columns, ",")),
}
}

Expand Down Expand Up @@ -614,14 +634,30 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
}

if !applied {
if rangeID, ok := previous["range_id"].(int64); ok && rangeID != request.RangeID {
// UpdateWorkflowExecution failed because rangeID was modified
return &ShardOwnershipLostError{
Msg: fmt.Sprintf("Failed to update workflow execution. Request RangeID: %v, Actual RangeID: %v",
request.RangeID, rangeID),
}
}

if nextEventID, ok := previous["next_event_id"].(int64); ok && nextEventID != request.Condition {
// CreateWorkflowExecution failed because next event ID is unexpected
return &ConditionFailedError{
Msg: fmt.Sprintf("Failed to update workflow execution. Request Condition: %v, Actual Value: %v",
request.Condition, nextEventID),
}
}

var columns []string
for k, v := range previous {
columns = append(columns, fmt.Sprintf("%s=%v", k, v))
}

return &ConditionFailedError{
Msg: fmt.Sprintf("Failed to update workflow execution. condition: %v, columns: (%v)",
request.Condition, strings.Join(columns, ",")),
Msg: fmt.Sprintf("Failed to update workflow execution. RangeID: %v, Condition: %v, columns: (%v)",
request.RangeID, request.Condition, strings.Join(columns, ",")),
}
}

Expand Down Expand Up @@ -1225,6 +1261,8 @@ func createShardInfo(result map[string]interface{}) *ShardInfo {
info.RangeID = v.(int64)
case "stolen_since_renew":
info.StolenSinceRenew = v.(int)
case "updated_at":
info.UpdatedAt = v.(time.Time)
case "transfer_ack_level":
info.TransferAckLevel = v.(int64)
}
Expand Down
51 changes: 50 additions & 1 deletion common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

log "github.com/Sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

gen "code.uber.internal/devexp/minions/.gen/go/shared"
Expand All @@ -17,6 +18,9 @@ type (
cassandraPersistenceSuite struct {
suite.Suite
TestBase
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
}
)

Expand All @@ -38,6 +42,8 @@ func (s *cassandraPersistenceSuite) TearDownSuite() {
}

func (s *cassandraPersistenceSuite) SetupTest() {
// Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
s.Assertions = require.New(s.T())
s.ClearTransferQueue()
}

Expand All @@ -50,8 +56,27 @@ func (s *cassandraPersistenceSuite) TestPersistenceStartWorkflow() {

task1, err1 := s.CreateWorkflowExecution(workflowExecution, "queue1", "event1", nil, 3, 0, 2, nil)
s.NotNil(err1, "Expected workflow creation to fail.")
log.Infof("Unable to start workflow execution: %v", err1)
s.IsType(&gen.WorkflowExecutionAlreadyStartedError{}, err1)
s.Empty(task1, "Expected empty task identifier.")

response, err2 := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{
Execution: workflowExecution,
TaskList: "queue1",
History: []byte("event1"),
ExecutionContext: nil,
NextEventID: int64(3),
LastProcessedEvent: 0,
RangeID: s.ShardContext.GetRangeID() - 1,
TransferTasks: []Task{
&DecisionTask{TaskID: s.GetNextSequenceNumber(), TaskList: "queue1", ScheduleID: int64(2)},
},
TimerTasks: nil})

s.NotNil(err2, "Expected workflow creation to fail.")
s.Nil(response)
log.Infof("Unable to start workflow execution: %v", err1)
s.IsType(&ShardOwnershipLostError{}, err2)
}

func (s *cassandraPersistenceSuite) TestGetWorkflow() {
Expand Down Expand Up @@ -129,7 +154,7 @@ func (s *cassandraPersistenceSuite) TestUpdateWorkflow() {
err4 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(5)}, nil, int64(3), nil, nil, nil, nil, nil, nil)
s.NotNil(err4, "expected non nil error.")
s.IsType(&ConditionFailedError{}, err4)
log.Infof("Conditional update failed with error: %v", err4)
log.Errorf("Conditional update failed with error: %v", err4)

info2, err4 := s.GetWorkflowExecutionInfo(workflowExecution)
s.Nil(err4, "No error expected.")
Expand All @@ -145,6 +170,30 @@ func (s *cassandraPersistenceSuite) TestUpdateWorkflow() {
s.Equal(true, info2.DecisionPending)
s.Equal(true, validateTimeRange(info2.LastUpdatedTimestamp, time.Hour))
log.Infof("Workflow execution last updated: %v", info2.LastUpdatedTimestamp)

failedUpdatedInfo2 := copyWorkflowExecutionInfo(info1)
failedUpdatedInfo2.History = []byte(`event4`)
failedUpdatedInfo2.NextEventID = int64(6)
failedUpdatedInfo2.LastProcessedEvent = int64(3)
err5 := s.UpdateWorkflowExecutionWithRangeID(updatedInfo, []int64{int64(5)}, nil, int64(12345), int64(5), nil, nil, nil, nil, nil, nil)
s.NotNil(err5, "expected non nil error.")
s.IsType(&ShardOwnershipLostError{}, err5)
log.Errorf("Conditional update failed with error: %v", err5)

info3, err6 := s.GetWorkflowExecutionInfo(workflowExecution)
s.Nil(err6, "No error expected.")
s.NotNil(info3, "Valid Workflow info expected.")
s.Equal("update-workflow-test", info3.WorkflowID)
s.Equal("5ba5e531-e46b-48d9-b4b3-859919839553", info3.RunID)
s.Equal("queue1", info3.TaskList)
s.Equal("event2", string(info3.History))
s.Equal([]byte(nil), info3.ExecutionContext)
s.Equal(WorkflowStateCreated, info3.State)
s.Equal(int64(5), info3.NextEventID)
s.Equal(int64(2), info3.LastProcessedEvent)
s.Equal(true, info3.DecisionPending)
s.Equal(true, validateTimeRange(info3.LastUpdatedTimestamp, time.Hour))
log.Infof("Workflow execution last updated: %v", info3.LastUpdatedTimestamp)
}

func (s *cassandraPersistenceSuite) TestDeleteWorkflow() {
Expand Down
11 changes: 10 additions & 1 deletion common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,18 @@ type (
Msg string
}

// ShardOwnershipLostError is returned when conditional update fails due to RangeID for the shard
ShardOwnershipLostError struct {
Msg string
}

// ShardInfo describes a shard
ShardInfo struct {
ShardID int
Owner string
RangeID int64
StolenSinceRenew int
UpdatedAt time.Time
TransferAckLevel int64
}

Expand Down Expand Up @@ -234,7 +240,6 @@ type (
ReadLevel int64
MaxReadLevel int64
BatchSize int
RangeID int64
}

// GetTransferTasksResponse is the response to GetTransferTasksRequest
Expand Down Expand Up @@ -372,6 +377,10 @@ func (e *ShardAlreadyExistError) Error() string {
return e.Msg
}

func (e *ShardOwnershipLostError) Error() string {
return e.Msg
}

// GetType returns the type of the activity task
func (a *ActivityTask) GetType() int {
return TaskListTypeActivity
Expand Down
37 changes: 28 additions & 9 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,8 @@ func (s *testShardContext) GetExecutionManager() ExecutionManager {
return s.executionMgr
}

func (s *testShardContext) GetTransferTaskID() int64 {
return atomic.AddInt64(&s.transferSequenceNumber, 1)
}

func (s *testShardContext) GetRangeID() int64 {
return atomic.LoadInt64(&s.shardInfo.RangeID)
func (s *testShardContext) GetNextTransferTaskID() (int64, error) {
return atomic.AddInt64(&s.transferSequenceNumber, 1), nil
}

func (s *testShardContext) GetTransferAckLevel() int64 {
Expand All @@ -103,6 +99,15 @@ func (s *testShardContext) GetTransferSequenceNumber() int64 {
return atomic.LoadInt64(&s.transferSequenceNumber)
}

func (s *testShardContext) CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) (
*CreateWorkflowExecutionResponse, error) {
return s.executionMgr.CreateWorkflowExecution(request)
}

func (s *testShardContext) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error {
return s.executionMgr.UpdateWorkflowExecution(request)
}

func (s *testShardContext) GetLogger() bark.Logger {
return s.logger
}
Expand All @@ -112,6 +117,10 @@ func (s *testShardContext) Reset() {
atomic.StoreInt64(&s.shardInfo.TransferAckLevel, 0)
}

func (s *testShardContext) GetRangeID() int64 {
return atomic.LoadInt64(&s.shardInfo.RangeID)
}

func newTestExecutionMgrFactory(options TestBaseOptions, cassandra CassandraTestCluster,
logger bark.Logger) ExecutionManagerFactory {
return &testExecutionMgrFactory{
Expand Down Expand Up @@ -286,6 +295,16 @@ func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, d
activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task,
upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64,
upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error {
return s.UpdateWorkflowExecutionWithRangeID(updatedInfo, decisionScheduleIDs, activityScheduleIDs,
s.ShardContext.GetRangeID(), condition, timerTasks, deleteTimerTask, upsertActivityInfos, deleteActivityInfo,
upsertTimerInfos, deleteTimerInfos)
}

// UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution
func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64,
activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task,
upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64,
upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error {
transferTasks := []Task{}
for _, decisionScheduleID := range decisionScheduleIDs {
transferTasks = append(transferTasks, &DecisionTask{TaskList: updatedInfo.TaskList,
Expand All @@ -303,7 +322,7 @@ func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, d
TimerTasks: timerTasks,
Condition: condition,
DeleteTimerTask: deleteTimerTask,
RangeID: s.ShardContext.GetRangeID(),
RangeID: rangeID,
UpsertActivityInfos: upsertActivityInfos,
DeleteActivityInfo: deleteActivityInfo,
UpserTimerInfos: upsertTimerInfos,
Expand All @@ -324,7 +343,6 @@ func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error)
ReadLevel: s.GetReadLevel(),
MaxReadLevel: s.GetMaxAllowedReadLevel(),
BatchSize: batchSize,
RangeID: s.ShardContext.GetRangeID(),
})

if err != nil {
Expand Down Expand Up @@ -501,7 +519,8 @@ func (s *TestBase) TearDownWorkflowStore() {

// GetNextSequenceNumber generates a unique sequence number for can be used for transfer queue taskId
func (s *TestBase) GetNextSequenceNumber() int64 {
return s.ShardContext.GetTransferTaskID()
taskID, _ := s.ShardContext.GetNextTransferTaskID()
return taskID
}

// GetReadLevel returns the current read level for shard
Expand Down
11 changes: 10 additions & 1 deletion common/persistence/shardPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"

log "github.com/Sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"

gen "code.uber.internal/devexp/minions/.gen/go/shared"
Expand All @@ -14,6 +15,9 @@ type (
shardPersistenceSuite struct {
suite.Suite
TestBase
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
}
)

Expand All @@ -30,6 +34,11 @@ func (s *shardPersistenceSuite) SetupSuite() {
s.SetupWorkflowStore()
}

func (s *shardPersistenceSuite) SetupTest() {
// Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil
s.Assertions = require.New(s.T())
}

func (s *shardPersistenceSuite) TearDownSuite() {
s.TearDownWorkflowStore()
}
Expand Down Expand Up @@ -104,7 +113,7 @@ func (s *shardPersistenceSuite) TestUpdateShard() {
failedUpdateInfo.Owner = "failed_owner"
err4 := s.UpdateShard(failedUpdateInfo, shardInfo.RangeID)
s.NotNil(err4)
s.IsType(&ConditionFailedError{}, err4)
s.IsType(&ShardOwnershipLostError{}, err4)
log.Infof("Update shard failed with error: %v", err4)

info2, err5 := s.GetShard(shardID)
Expand Down
2 changes: 1 addition & 1 deletion host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *integrationSuite) SetupSuite() {
}

logger := log.New()
//logger.Level = log.DebugLevel
logger.Level = log.DebugLevel
s.logger = bark.NewLoggerFromLogrus(logger)

s.ch, _ = tchannel.NewChannel("cadence-integration-test", nil)
Expand Down
Loading

0 comments on commit 0daa276

Please sign in to comment.