From 0daa2760403f49b613887c0e42b2db885456d357 Mon Sep 17 00:00:00 2001 From: Samar Abbas Date: Fri, 17 Feb 2017 14:06:40 -0800 Subject: [PATCH] Support for renewing RangeID for shard and life-cycle Summary: Persistence API changes to return ShardOwnershipLostError when conditional update fails due to RangeID. ShardController exposes a shardClosedCh to manage cleanup of shards when shard ownership lost is detected. ShardContext manages the close handling and writes to shardClosedCh when it sees ShardOwnershipLostError. TransferQueueProcessor calls update API even if there are no changes to trigger Shard closure if ownership is lost. Reviewers: sivakk, tamer, maxim Reviewed By: tamer, maxim Subscribers: jenkins, aravindv, venkat Differential Revision: https://code.uberinternal.com/D736505 --- common/persistence/cassandraPersistence.go | 52 +++- .../persistence/cassandraPersistence_test.go | 51 +++- common/persistence/dataInterfaces.go | 11 +- common/persistence/persistenceTestBase.go | 37 ++- common/persistence/shardPersistence_test.go | 11 +- host/integration_test.go | 2 +- schema/workflow_test.cql | 1 + service/history/historyEngine.go | 52 ++-- service/history/historyEngine2_test.go | 18 +- service/history/historyEngine_test.go | 17 +- service/history/loggingHelpers.go | 116 +++++++++ service/history/shardContext.go | 218 +++++++++++++---- service/history/shardController.go | 85 ++++--- service/history/shardController_test.go | 225 ++++++++++++++++++ service/history/timerQueueProcessor.go | 9 +- service/history/timerQueueProcessor_test.go | 22 +- service/history/transferQueueProcessor.go | 60 +++-- 17 files changed, 844 insertions(+), 143 deletions(-) diff --git a/common/persistence/cassandraPersistence.go b/common/persistence/cassandraPersistence.go index e6deb3aefb9..8bc2afb1455 100644 --- a/common/persistence/cassandraPersistence.go +++ b/common/persistence/cassandraPersistence.go @@ -53,6 +53,7 @@ const ( `owner: ?, ` + `range_id: ?, ` + `stolen_since_renew: ?, ` + + `updated_at: ?, ` + `transfer_ack_level: ?` + `}` @@ -374,6 +375,7 @@ func NewCassandraTaskPersistence(hosts string, keyspace string, logger bark.Logg } func (d *cassandraPersistence) CreateShard(request *CreateShardRequest) error { + cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano()) shardInfo := request.ShardInfo query := d.session.Query(templateCreateShardQuery, shardInfo.ShardID, @@ -385,6 +387,7 @@ func (d *cassandraPersistence) CreateShard(request *CreateShardRequest) error { shardInfo.Owner, shardInfo.RangeID, shardInfo.StolenSinceRenew, + cqlNowTimestamp, shardInfo.TransferAckLevel, shardInfo.RangeID) @@ -435,6 +438,7 @@ func (d *cassandraPersistence) GetShard(request *GetShardRequest) (*GetShardResp } func (d *cassandraPersistence) UpdateShard(request *UpdateShardRequest) error { + cqlNowTimestamp := common.UnixNanoToCQLTimestamp(time.Now().UnixNano()) shardInfo := request.ShardInfo query := d.session.Query(templateUpdateShardQuery, @@ -442,6 +446,7 @@ func (d *cassandraPersistence) UpdateShard(request *UpdateShardRequest) error { shardInfo.Owner, shardInfo.RangeID, shardInfo.StolenSinceRenew, + cqlNowTimestamp, shardInfo.TransferAckLevel, shardInfo.RangeID, shardInfo.ShardID, @@ -465,7 +470,7 @@ func (d *cassandraPersistence) UpdateShard(request *UpdateShardRequest) error { columns = append(columns, fmt.Sprintf("%s=%v", k, v)) } - return &ConditionFailedError{ + return &ShardOwnershipLostError{ Msg: fmt.Sprintf("Failed to update shard. previous_range_id: %v, columns: (%v)", request.PreviousRangeID, strings.Join(columns, ",")), } @@ -525,15 +530,30 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx } if !applied { + if rangeID, ok := previous["range_id"].(int64); ok && rangeID != request.RangeID { + // CreateWorkflowExecution failed because rangeID was modified + return nil, &ShardOwnershipLostError{ + Msg: fmt.Sprintf("Failed to create workflow execution. Request RangeID: %v, Actual RangeID: %v", + request.RangeID, rangeID), + } + } + var columns []string for k, v := range previous { columns = append(columns, fmt.Sprintf("%s=%v", k, v)) } - execution := previous["execution"].(map[string]interface{}) - return nil, &workflow.WorkflowExecutionAlreadyStartedError{ - Message: fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)", - execution["workflow_id"], execution["run_id"], request.RangeID, strings.Join(columns, ",")), + if execution, ok := previous["execution"].(map[string]interface{}); ok { + // CreateWorkflowExecution failed because it already exists + return nil, &workflow.WorkflowExecutionAlreadyStartedError{ + Message: fmt.Sprintf("Workflow execution already running. WorkflowId: %v, RunId: %v, rangeID: %v, columns: (%v)", + execution["workflow_id"], execution["run_id"], request.RangeID, strings.Join(columns, ",")), + } + } + + return nil, &ConditionFailedError{ + Msg: fmt.Sprintf("Failed to create workflow execution. Request RangeID: %v, columns: (%v)", + request.RangeID, strings.Join(columns, ",")), } } @@ -614,14 +634,30 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx } if !applied { + if rangeID, ok := previous["range_id"].(int64); ok && rangeID != request.RangeID { + // UpdateWorkflowExecution failed because rangeID was modified + return &ShardOwnershipLostError{ + Msg: fmt.Sprintf("Failed to update workflow execution. Request RangeID: %v, Actual RangeID: %v", + request.RangeID, rangeID), + } + } + + if nextEventID, ok := previous["next_event_id"].(int64); ok && nextEventID != request.Condition { + // CreateWorkflowExecution failed because next event ID is unexpected + return &ConditionFailedError{ + Msg: fmt.Sprintf("Failed to update workflow execution. Request Condition: %v, Actual Value: %v", + request.Condition, nextEventID), + } + } + var columns []string for k, v := range previous { columns = append(columns, fmt.Sprintf("%s=%v", k, v)) } return &ConditionFailedError{ - Msg: fmt.Sprintf("Failed to update workflow execution. condition: %v, columns: (%v)", - request.Condition, strings.Join(columns, ",")), + Msg: fmt.Sprintf("Failed to update workflow execution. RangeID: %v, Condition: %v, columns: (%v)", + request.RangeID, request.Condition, strings.Join(columns, ",")), } } @@ -1225,6 +1261,8 @@ func createShardInfo(result map[string]interface{}) *ShardInfo { info.RangeID = v.(int64) case "stolen_since_renew": info.StolenSinceRenew = v.(int) + case "updated_at": + info.UpdatedAt = v.(time.Time) case "transfer_ack_level": info.TransferAckLevel = v.(int64) } diff --git a/common/persistence/cassandraPersistence_test.go b/common/persistence/cassandraPersistence_test.go index 95246b61461..27f66dcc82a 100644 --- a/common/persistence/cassandraPersistence_test.go +++ b/common/persistence/cassandraPersistence_test.go @@ -7,6 +7,7 @@ import ( "time" log "github.com/Sirupsen/logrus" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" gen "code.uber.internal/devexp/minions/.gen/go/shared" @@ -17,6 +18,9 @@ type ( cassandraPersistenceSuite struct { suite.Suite TestBase + // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, + // not merely log an error + *require.Assertions } ) @@ -38,6 +42,8 @@ func (s *cassandraPersistenceSuite) TearDownSuite() { } func (s *cassandraPersistenceSuite) SetupTest() { + // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil + s.Assertions = require.New(s.T()) s.ClearTransferQueue() } @@ -50,8 +56,27 @@ func (s *cassandraPersistenceSuite) TestPersistenceStartWorkflow() { task1, err1 := s.CreateWorkflowExecution(workflowExecution, "queue1", "event1", nil, 3, 0, 2, nil) s.NotNil(err1, "Expected workflow creation to fail.") + log.Infof("Unable to start workflow execution: %v", err1) + s.IsType(&gen.WorkflowExecutionAlreadyStartedError{}, err1) s.Empty(task1, "Expected empty task identifier.") + + response, err2 := s.WorkflowMgr.CreateWorkflowExecution(&CreateWorkflowExecutionRequest{ + Execution: workflowExecution, + TaskList: "queue1", + History: []byte("event1"), + ExecutionContext: nil, + NextEventID: int64(3), + LastProcessedEvent: 0, + RangeID: s.ShardContext.GetRangeID() - 1, + TransferTasks: []Task{ + &DecisionTask{TaskID: s.GetNextSequenceNumber(), TaskList: "queue1", ScheduleID: int64(2)}, + }, + TimerTasks: nil}) + + s.NotNil(err2, "Expected workflow creation to fail.") + s.Nil(response) log.Infof("Unable to start workflow execution: %v", err1) + s.IsType(&ShardOwnershipLostError{}, err2) } func (s *cassandraPersistenceSuite) TestGetWorkflow() { @@ -129,7 +154,7 @@ func (s *cassandraPersistenceSuite) TestUpdateWorkflow() { err4 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(5)}, nil, int64(3), nil, nil, nil, nil, nil, nil) s.NotNil(err4, "expected non nil error.") s.IsType(&ConditionFailedError{}, err4) - log.Infof("Conditional update failed with error: %v", err4) + log.Errorf("Conditional update failed with error: %v", err4) info2, err4 := s.GetWorkflowExecutionInfo(workflowExecution) s.Nil(err4, "No error expected.") @@ -145,6 +170,30 @@ func (s *cassandraPersistenceSuite) TestUpdateWorkflow() { s.Equal(true, info2.DecisionPending) s.Equal(true, validateTimeRange(info2.LastUpdatedTimestamp, time.Hour)) log.Infof("Workflow execution last updated: %v", info2.LastUpdatedTimestamp) + + failedUpdatedInfo2 := copyWorkflowExecutionInfo(info1) + failedUpdatedInfo2.History = []byte(`event4`) + failedUpdatedInfo2.NextEventID = int64(6) + failedUpdatedInfo2.LastProcessedEvent = int64(3) + err5 := s.UpdateWorkflowExecutionWithRangeID(updatedInfo, []int64{int64(5)}, nil, int64(12345), int64(5), nil, nil, nil, nil, nil, nil) + s.NotNil(err5, "expected non nil error.") + s.IsType(&ShardOwnershipLostError{}, err5) + log.Errorf("Conditional update failed with error: %v", err5) + + info3, err6 := s.GetWorkflowExecutionInfo(workflowExecution) + s.Nil(err6, "No error expected.") + s.NotNil(info3, "Valid Workflow info expected.") + s.Equal("update-workflow-test", info3.WorkflowID) + s.Equal("5ba5e531-e46b-48d9-b4b3-859919839553", info3.RunID) + s.Equal("queue1", info3.TaskList) + s.Equal("event2", string(info3.History)) + s.Equal([]byte(nil), info3.ExecutionContext) + s.Equal(WorkflowStateCreated, info3.State) + s.Equal(int64(5), info3.NextEventID) + s.Equal(int64(2), info3.LastProcessedEvent) + s.Equal(true, info3.DecisionPending) + s.Equal(true, validateTimeRange(info3.LastUpdatedTimestamp, time.Hour)) + log.Infof("Workflow execution last updated: %v", info3.LastUpdatedTimestamp) } func (s *cassandraPersistenceSuite) TestDeleteWorkflow() { diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index 4b3b006c17b..906334d9e71 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -37,12 +37,18 @@ type ( Msg string } + // ShardOwnershipLostError is returned when conditional update fails due to RangeID for the shard + ShardOwnershipLostError struct { + Msg string + } + // ShardInfo describes a shard ShardInfo struct { ShardID int Owner string RangeID int64 StolenSinceRenew int + UpdatedAt time.Time TransferAckLevel int64 } @@ -234,7 +240,6 @@ type ( ReadLevel int64 MaxReadLevel int64 BatchSize int - RangeID int64 } // GetTransferTasksResponse is the response to GetTransferTasksRequest @@ -372,6 +377,10 @@ func (e *ShardAlreadyExistError) Error() string { return e.Msg } +func (e *ShardOwnershipLostError) Error() string { + return e.Msg +} + // GetType returns the type of the activity task func (a *ActivityTask) GetType() int { return TaskListTypeActivity diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index a5f758dfb0c..b08560c2fbb 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -78,12 +78,8 @@ func (s *testShardContext) GetExecutionManager() ExecutionManager { return s.executionMgr } -func (s *testShardContext) GetTransferTaskID() int64 { - return atomic.AddInt64(&s.transferSequenceNumber, 1) -} - -func (s *testShardContext) GetRangeID() int64 { - return atomic.LoadInt64(&s.shardInfo.RangeID) +func (s *testShardContext) GetNextTransferTaskID() (int64, error) { + return atomic.AddInt64(&s.transferSequenceNumber, 1), nil } func (s *testShardContext) GetTransferAckLevel() int64 { @@ -103,6 +99,15 @@ func (s *testShardContext) GetTransferSequenceNumber() int64 { return atomic.LoadInt64(&s.transferSequenceNumber) } +func (s *testShardContext) CreateWorkflowExecution(request *CreateWorkflowExecutionRequest) ( + *CreateWorkflowExecutionResponse, error) { + return s.executionMgr.CreateWorkflowExecution(request) +} + +func (s *testShardContext) UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error { + return s.executionMgr.UpdateWorkflowExecution(request) +} + func (s *testShardContext) GetLogger() bark.Logger { return s.logger } @@ -112,6 +117,10 @@ func (s *testShardContext) Reset() { atomic.StoreInt64(&s.shardInfo.TransferAckLevel, 0) } +func (s *testShardContext) GetRangeID() int64 { + return atomic.LoadInt64(&s.shardInfo.RangeID) +} + func newTestExecutionMgrFactory(options TestBaseOptions, cassandra CassandraTestCluster, logger bark.Logger) ExecutionManagerFactory { return &testExecutionMgrFactory{ @@ -286,6 +295,16 @@ func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, d activityScheduleIDs []int64, condition int64, timerTasks []Task, deleteTimerTask Task, upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error { + return s.UpdateWorkflowExecutionWithRangeID(updatedInfo, decisionScheduleIDs, activityScheduleIDs, + s.ShardContext.GetRangeID(), condition, timerTasks, deleteTimerTask, upsertActivityInfos, deleteActivityInfo, + upsertTimerInfos, deleteTimerInfos) +} + +// UpdateWorkflowExecutionWithRangeID is a utility method to update workflow execution +func (s *TestBase) UpdateWorkflowExecutionWithRangeID(updatedInfo *WorkflowExecutionInfo, decisionScheduleIDs []int64, + activityScheduleIDs []int64, rangeID, condition int64, timerTasks []Task, deleteTimerTask Task, + upsertActivityInfos []*ActivityInfo, deleteActivityInfo *int64, + upsertTimerInfos []*TimerInfo, deleteTimerInfos []string) error { transferTasks := []Task{} for _, decisionScheduleID := range decisionScheduleIDs { transferTasks = append(transferTasks, &DecisionTask{TaskList: updatedInfo.TaskList, @@ -303,7 +322,7 @@ func (s *TestBase) UpdateWorkflowExecution(updatedInfo *WorkflowExecutionInfo, d TimerTasks: timerTasks, Condition: condition, DeleteTimerTask: deleteTimerTask, - RangeID: s.ShardContext.GetRangeID(), + RangeID: rangeID, UpsertActivityInfos: upsertActivityInfos, DeleteActivityInfo: deleteActivityInfo, UpserTimerInfos: upsertTimerInfos, @@ -324,7 +343,6 @@ func (s *TestBase) GetTransferTasks(batchSize int) ([]*TransferTaskInfo, error) ReadLevel: s.GetReadLevel(), MaxReadLevel: s.GetMaxAllowedReadLevel(), BatchSize: batchSize, - RangeID: s.ShardContext.GetRangeID(), }) if err != nil { @@ -501,7 +519,8 @@ func (s *TestBase) TearDownWorkflowStore() { // GetNextSequenceNumber generates a unique sequence number for can be used for transfer queue taskId func (s *TestBase) GetNextSequenceNumber() int64 { - return s.ShardContext.GetTransferTaskID() + taskID, _ := s.ShardContext.GetNextTransferTaskID() + return taskID } // GetReadLevel returns the current read level for shard diff --git a/common/persistence/shardPersistence_test.go b/common/persistence/shardPersistence_test.go index ae2cbf1ae0f..f3ba5afe2c9 100644 --- a/common/persistence/shardPersistence_test.go +++ b/common/persistence/shardPersistence_test.go @@ -5,6 +5,7 @@ import ( "testing" log "github.com/Sirupsen/logrus" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" gen "code.uber.internal/devexp/minions/.gen/go/shared" @@ -14,6 +15,9 @@ type ( shardPersistenceSuite struct { suite.Suite TestBase + // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, + // not merely log an error + *require.Assertions } ) @@ -30,6 +34,11 @@ func (s *shardPersistenceSuite) SetupSuite() { s.SetupWorkflowStore() } +func (s *shardPersistenceSuite) SetupTest() { + // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil + s.Assertions = require.New(s.T()) +} + func (s *shardPersistenceSuite) TearDownSuite() { s.TearDownWorkflowStore() } @@ -104,7 +113,7 @@ func (s *shardPersistenceSuite) TestUpdateShard() { failedUpdateInfo.Owner = "failed_owner" err4 := s.UpdateShard(failedUpdateInfo, shardInfo.RangeID) s.NotNil(err4) - s.IsType(&ConditionFailedError{}, err4) + s.IsType(&ShardOwnershipLostError{}, err4) log.Infof("Update shard failed with error: %v", err4) info2, err5 := s.GetShard(shardID) diff --git a/host/integration_test.go b/host/integration_test.go index d32c5022980..b7fc218e662 100644 --- a/host/integration_test.go +++ b/host/integration_test.go @@ -93,7 +93,7 @@ func (s *integrationSuite) SetupSuite() { } logger := log.New() - //logger.Level = log.DebugLevel + logger.Level = log.DebugLevel s.logger = bark.NewLoggerFromLogrus(logger) s.ch, _ = tchannel.NewChannel("cadence-integration-test", nil) diff --git a/schema/workflow_test.cql b/schema/workflow_test.cql index a53b5469dd1..174544f0db1 100644 --- a/schema/workflow_test.cql +++ b/schema/workflow_test.cql @@ -11,6 +11,7 @@ CREATE TYPE shard ( range_id bigint, -- This field keeps track of number of times owner for a shard changes before updating range_id or ack_levels stolen_since_renew int, + updated_at timestamp, transfer_ack_level bigint, ); diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index c8603ae5e21..cb331f66143 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -138,9 +138,12 @@ func (e *historyEngineImpl) StartWorkflowExecution(request *workflow.StartWorkfl return nil, serializedError } - id := e.tracker.getNextTaskID() + id, err0 := e.tracker.getNextTaskID() + if err0 != nil { + return nil, err0 + } defer e.tracker.completeTask(id) - _, err := e.executionManager.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ + _, err := e.shard.CreateWorkflowExecution(&persistence.CreateWorkflowExecutionRequest{ Execution: workflowExecution, TaskList: request.GetTaskList().GetName(), History: h, @@ -151,7 +154,6 @@ func (e *historyEngineImpl) StartWorkflowExecution(request *workflow.StartWorkfl TaskID: id, TaskList: taskList, ScheduleID: dt.GetEventId(), }}, - RangeID: e.shard.GetRangeID(), }) if err != nil { @@ -397,7 +399,10 @@ Update_History_Loop: case workflow.DecisionType_ScheduleActivityTask: attributes := d.GetScheduleActivityTaskDecisionAttributes() scheduleEvent := builder.AddActivityTaskScheduledEvent(completedID, attributes) - id := e.tracker.getNextTaskID() + id, err2 := e.tracker.getNextTaskID() + if err2 != nil { + return err2 + } defer e.tracker.completeTask(id) transferTasks = append(transferTasks, &persistence.ActivityTask{ TaskID: id, @@ -456,7 +461,10 @@ Update_History_Loop: // Schedule another decision task if new events came in during this decision if (completedID - startedID) > 1 { newDecisionEvent := builder.ScheduleDecisionTask() - id := e.tracker.getNextTaskID() + id, err2 := e.tracker.getNextTaskID() + if err2 != nil { + return err2 + } defer e.tracker.completeTask(id) transferTasks = append(transferTasks, &persistence.DecisionTask{ TaskID: id, @@ -530,7 +538,10 @@ Update_History_Loop: var transferTasks []persistence.Task if !builder.hasPendingDecisionTask() { newDecisionEvent := builder.ScheduleDecisionTask() - id := e.tracker.getNextTaskID() + id, err2 := e.tracker.getNextTaskID() + if err2 != nil { + return err2 + } defer e.tracker.completeTask(id) transferTasks = []persistence.Task{&persistence.DecisionTask{ TaskID: id, @@ -601,7 +612,10 @@ Update_History_Loop: startAttributes := startWorkflowExecutionEvent.GetWorkflowExecutionStartedEventAttributes() newDecisionEvent := builder.AddDecisionTaskScheduledEvent(startAttributes.GetTaskList().GetName(), startAttributes.GetTaskStartToCloseTimeoutSeconds()) - id := e.tracker.getNextTaskID() + id, err2 := e.tracker.getNextTaskID() + if err2 != nil { + return err2 + } defer e.tracker.completeTask(id) transferTasks = []persistence.Task{&persistence.DecisionTask{ TaskID: id, @@ -724,7 +738,7 @@ func (e *historyEngineImpl) deleteWorkflowExecutionWithRetry( func (e *historyEngineImpl) updateWorkflowExecutionWithRetry( request *persistence.UpdateWorkflowExecutionRequest) error { op := func() error { - return e.executionManager.UpdateWorkflowExecution(request) + return e.shard.UpdateWorkflowExecution(request) } @@ -835,7 +849,8 @@ func (c *workflowExecutionContext) updateWorkflowExecutionWithDeleteTask(transfe return c.updateWorkflowExecution(transferTasks, timerTasks) } -func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persistence.Task, timerTasks []persistence.Task) error { +func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persistence.Task, + timerTasks []persistence.Task) error { updatedHistory, err := c.builder.Serialize() if err != nil { logHistorySerializationErrorEvent(c.logger, err, "Unable to serialize execution history for update.") @@ -854,7 +869,6 @@ func (c *workflowExecutionContext) updateWorkflowExecution(transferTasks []persi TimerTasks: timerTasks, Condition: c.updateCondition, DeleteTimerTask: c.deleteTimerTask, - RangeID: c.historyService.shard.GetRangeID(), UpsertActivityInfos: c.msBuilder.updateActivityInfos, DeleteActivityInfo: c.msBuilder.deleteActivityInfo, UpserTimerInfos: c.msBuilder.updateTimerInfos, @@ -889,31 +903,37 @@ func (c *workflowExecutionContext) deleteWorkflowExecution() error { return err } -func (t *pendingTaskTracker) getNextTaskID() int64 { +func (t *pendingTaskTracker) getNextTaskID() (int64, error) { t.lk.Lock() - nextID := t.shard.GetTransferTaskID() + defer t.lk.Unlock() + + nextID, err := t.shard.GetNextTransferTaskID() + if err != nil { + t.logger.Debugf("Error generating next taskID: %v", err) + return -1, err + } + if nextID != t.maxID+1 { t.logger.Fatalf("No holes allowed for nextID. nextID: %v, MaxID: %v", nextID, t.maxID) } t.pendingTasks[nextID] = false t.maxID = nextID - t.lk.Unlock() t.logger.Debugf("Generated new transfer task ID: %v", nextID) - return nextID + return nextID, nil } func (t *pendingTaskTracker) completeTask(taskID int64) { t.lk.Lock() updatedMin := int64(-1) if _, ok := t.pendingTasks[taskID]; ok { - t.logger.Debugf("Completing transfer task ID: %v", taskID) + t.logger.Debugf("Completing transfer task ID: %v, minID: %v, maxID: %v", taskID, t.minID, t.maxID) t.pendingTasks[taskID] = true UpdateMinLoop: for newMin := t.minID + 1; newMin <= t.maxID; newMin++ { + t.logger.Debugf("minID: %v, maxID: %v", newMin, t.maxID) if done, ok := t.pendingTasks[newMin]; ok && done { - t.logger.Debugf("Updating minID for pending transfer tasks: %v", newMin) t.minID = newMin updatedMin = newMin delete(t.pendingTasks, newMin) diff --git a/service/history/historyEngine2_test.go b/service/history/historyEngine2_test.go index 3c638451b33..7d10308202b 100644 --- a/service/history/historyEngine2_test.go +++ b/service/history/historyEngine2_test.go @@ -1,7 +1,6 @@ package history import ( - //"encoding/json" "errors" "os" "testing" @@ -30,6 +29,8 @@ type ( historyEngine *historyEngineImpl mockMatchingClient *mocks.MatchingClient mockExecutionMgr *mocks.ExecutionManager + mockShardManager *mocks.ShardManager + shardClosedCh chan int logger bark.Logger } ) @@ -59,12 +60,18 @@ func (s *engine2Suite) SetupTest() { shardID := 0 s.mockMatchingClient = &mocks.MatchingClient{} s.mockExecutionMgr = &mocks.ExecutionManager{} + s.mockShardManager = &mocks.ShardManager{} + s.shardClosedCh = make(chan int, 100) mockShard := &shardContextImpl{ - shardInfo: &persistence.ShardInfo{ShardID: shardID, RangeID: 1, TransferAckLevel: 0}, - transferSequenceNumber: 1, - executionManager: s.mockExecutionMgr, - logger: s.logger, + shardInfo: &persistence.ShardInfo{ShardID: shardID, RangeID: 1, TransferAckLevel: 0}, + transferSequenceNumber: 1, + executionManager: s.mockExecutionMgr, + shardManager: s.mockShardManager, + rangeSize: defaultRangeSize, + maxTransferSequenceNumber: 100000, + closeCh: s.shardClosedCh, + logger: s.logger, } txProcessor := newTransferQueueProcessor(mockShard, s.mockMatchingClient) @@ -84,6 +91,7 @@ func (s *engine2Suite) SetupTest() { func (s *engine2Suite) TearDownTest() { s.mockMatchingClient.AssertExpectations(s.T()) s.mockExecutionMgr.AssertExpectations(s.T()) + s.mockShardManager.AssertExpectations(s.T()) } func (s *engine2Suite) TestRecordDecisionTaskStartedIfNoExecution() { diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index daf324f73c8..bcb2a16a1ee 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -30,6 +30,8 @@ type ( mockHistoryEngine *historyEngineImpl mockMatchingClient *mocks.MatchingClient mockExecutionMgr *mocks.ExecutionManager + mockShardManager *mocks.ShardManager + shardClosedCh chan int logger bark.Logger } ) @@ -59,12 +61,18 @@ func (s *engineSuite) SetupTest() { shardID := 0 s.mockMatchingClient = &mocks.MatchingClient{} s.mockExecutionMgr = &mocks.ExecutionManager{} + s.mockShardManager = &mocks.ShardManager{} + s.shardClosedCh = make(chan int, 100) mockShard := &shardContextImpl{ - shardInfo: &persistence.ShardInfo{ShardID: shardID, RangeID: 1, TransferAckLevel: 0}, - transferSequenceNumber: 1, - executionManager: s.mockExecutionMgr, - logger: s.logger, + shardInfo: &persistence.ShardInfo{ShardID: shardID, RangeID: 1, TransferAckLevel: 0}, + transferSequenceNumber: 1, + executionManager: s.mockExecutionMgr, + shardManager: s.mockShardManager, + rangeSize: defaultRangeSize, + maxTransferSequenceNumber: 100000, + closeCh: s.shardClosedCh, + logger: s.logger, } txProcessor := newTransferQueueProcessor(mockShard, s.mockMatchingClient) @@ -84,6 +92,7 @@ func (s *engineSuite) SetupTest() { func (s *engineSuite) TearDownTest() { s.mockMatchingClient.AssertExpectations(s.T()) s.mockExecutionMgr.AssertExpectations(s.T()) + s.mockShardManager.AssertExpectations(s.T()) } func (s *engineSuite) TestRespondDecisionTaskCompletedInvalidToken() { diff --git a/service/history/loggingHelpers.go b/service/history/loggingHelpers.go index b92e97956e8..3e397201309 100644 --- a/service/history/loggingHelpers.go +++ b/service/history/loggingHelpers.go @@ -15,6 +15,25 @@ const ( persistentStoreErrorEventID = 2000 historySerializationErrorEventID = 2001 duplicateTaskEventID = 2002 + + // Shard lifecycle events + shardRangeUpdatedEventID = 3000 + + // ShardController events + shardControllerStarted = 4000 + shardControllerShutdown = 4001 + shardControllerShuttingDown = 4002 + shardControllerShutdownTimedout = 4003 + ringMembershipChangedEvent = 4004 + shardItemCreated = 4010 + shardItemRemoved = 4011 + shardEngineCreating = 4020 + shardEngineCreated = 4021 + shardEngineStopping = 4022 + shardEngineStopped = 4023 + + // General purpose events + operationFailed = 9000 ) const ( @@ -34,6 +53,7 @@ const ( tagValueHistoryEngineComponent = "history-engine" tagValueTransferQueueComponent = "transfer-queue-processor" tagValueTimerQueueComponent = "timer-queue-processor" + tagValueShardController = "shard-controller" // tagHistoryBuilderAction values tagValueActionWorkflowStarted = "add-workflowexecution-started-event" @@ -91,3 +111,99 @@ func logDuplicateTaskEvent(lg bark.Logger, taskType int, taskID int64, requestID }).Debugf("Potentially duplicate task. TaskID: %v, TaskType: %v, RequestID: %v, scheduleID: %v, startedID: %v, isRunning: %v", taskID, taskType, requestID, scheduleID, startedID, isRunning) } + +func logShardRangeUpdatedEvent(logger bark.Logger, shardID int, rangeID, startSequence, endSequence int64) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardRangeUpdatedEventID, + }) + logger.Infof("Range updated for shardID '%v'. RangeID: %v, StartSequence: %v, EndSequence: %v", shardID, + rangeID, startSequence, endSequence) +} + +func logShardControllerStartedEvent(logger bark.Logger, host string) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardControllerStarted, + }) + logger.Infof("ShardController started on host: %v", host) +} + +func logShardControllerShutdownEvent(logger bark.Logger, host string) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardControllerShutdown, + }) + logger.Infof("ShardController stopped on host: %v", host) +} + +func logShardControllerShuttingDownEvent(logger bark.Logger, host string) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardControllerShuttingDown, + }) + logger.Infof("ShardController stopping on host: %v", host) +} + +func logShardControllerShutdownTimedoutEvent(logger bark.Logger, host string) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardControllerShutdownTimedout, + }) + logger.Warnf("ShardController timed out during shutdown on host: %v", host) +} + +func logRingMembershipChangedEvent(logger bark.Logger, host string, added, removed, updated int) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: ringMembershipChangedEvent, + }) + logger.Infof("ShardController on host '%v' received ring membership changed event: {Added: %v, Removed: %v, Updated: %v}", + host, added, removed, updated) +} + +func logShardItemCreatedEvent(logger bark.Logger, host string, shardID int) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardItemCreated, + tagHistoryShardID: shardID, + }) + logger.Infof("ShardController on host '%v' created a shard item for shardID '%v'.", host, shardID) +} + +func logShardItemRemovedEvent(logger bark.Logger, host string, shardID int, remainingShards int) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardItemRemoved, + tagHistoryShardID: shardID, + }) + logger.Infof("ShardController on host '%v' removed shard item for shardID '%v'. Remaining number of shards: %v", + host, shardID, remainingShards) +} + +func logShardEngineCreatingEvent(logger bark.Logger, host string, shardID int) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardEngineCreating, + }) + logger.Infof("ShardController on host '%v' creating engine for shardID '%v'.", host, shardID) +} + +func logShardEngineCreatedEvent(logger bark.Logger, host string, shardID int) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardEngineCreated, + }) + logger.Infof("ShardController on host '%v' created engine for shardID '%v'.", host, shardID) +} + +func logShardEngineStoppingEvent(logger bark.Logger, host string, shardID int) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardEngineStopping, + }) + logger.Infof("ShardController on host '%v' stopping engine for shardID '%v'.", host, shardID) +} + +func logShardEngineStoppedEvent(logger bark.Logger, host string, shardID int) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: shardEngineStopped, + }) + logger.Infof("ShardController on host '%v' stopped engine for shardID '%v'.", host, shardID) +} + +func logOperationFailedEvent(logger bark.Logger, msg string, err error) { + logger.WithFields(bark.Fields{ + tagWorkflowEventID: operationFailed, + }) + logger.Warnf("%v. Error: %v", msg, err) +} diff --git a/service/history/shardContext.go b/service/history/shardContext.go index 869383105a4..024716788bc 100644 --- a/service/history/shardContext.go +++ b/service/history/shardContext.go @@ -1,6 +1,7 @@ package history import ( + "sync" "sync/atomic" "code.uber.internal/devexp/minions/common/persistence" @@ -8,26 +9,39 @@ import ( "github.com/uber-common/bark" ) +const ( + defaultRangeSize = 20 // 20 bits for sequencer, 2^20 sequence number for any range +) + type ( // ShardContext represents a history engine shard ShardContext interface { GetExecutionManager() persistence.ExecutionManager - GetTransferTaskID() int64 - GetRangeID() int64 + GetNextTransferTaskID() (int64, error) + GetTransferSequenceNumber() int64 GetTransferAckLevel() int64 - GetTimerSequenceNumber() int64 UpdateAckLevel(ackLevel int64) error - GetTransferSequenceNumber() int64 + GetTimerSequenceNumber() int64 + CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) ( + *persistence.CreateWorkflowExecutionResponse, error) + UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error GetLogger() bark.Logger } shardContextImpl struct { - shardManager persistence.ShardManager - shardInfo *persistence.ShardInfo - executionManager persistence.ExecutionManager - transferSequenceNumber int64 - timerSequeceNumber int64 - logger bark.Logger + shardID int + shardManager persistence.ShardManager + executionManager persistence.ExecutionManager + timerSequeceNumber int64 + rangeSize uint + closeCh chan<- int + isClosed int32 + logger bark.Logger + + sync.RWMutex + shardInfo *persistence.ShardInfo + transferSequenceNumber int64 + maxTransferSequenceNumber int64 } ) @@ -37,42 +51,164 @@ func (s *shardContextImpl) GetExecutionManager() persistence.ExecutionManager { return s.executionManager } -func (s *shardContextImpl) GetTimerSequenceNumber() int64 { - return atomic.AddInt64(&s.timerSequeceNumber, 1) -} +func (s *shardContextImpl) GetNextTransferTaskID() (int64, error) { + s.Lock() + defer s.Unlock() -func (s *shardContextImpl) GetTransferTaskID() int64 { - return atomic.AddInt64(&s.transferSequenceNumber, 1) + if err := s.updateRangeIfNeededLocked(); err != nil { + return -1, err + } + + taskID := s.transferSequenceNumber + s.transferSequenceNumber++ + + return taskID, nil } -func (s *shardContextImpl) GetRangeID() int64 { - return atomic.LoadInt64(&s.shardInfo.RangeID) +func (s *shardContextImpl) GetTransferSequenceNumber() int64 { + s.RLock() + defer s.RUnlock() + + return s.transferSequenceNumber - 1 } func (s *shardContextImpl) GetTransferAckLevel() int64 { - return atomic.LoadInt64(&s.shardInfo.TransferAckLevel) -} + s.RLock() + defer s.RUnlock() -func (s *shardContextImpl) GetLogger() bark.Logger { - return s.logger + return s.shardInfo.TransferAckLevel } func (s *shardContextImpl) UpdateAckLevel(ackLevel int64) error { - atomic.StoreInt64(&s.shardInfo.TransferAckLevel, ackLevel) + s.Lock() + s.shardInfo.TransferAckLevel = ackLevel + s.shardInfo.StolenSinceRenew = 0 updatedShardInfo := copyShardInfo(s.shardInfo) - updatedShardInfo.StolenSinceRenew = 0 + s.Unlock() + return s.shardManager.UpdateShard(&persistence.UpdateShardRequest{ ShardInfo: updatedShardInfo, - PreviousRangeID: updatedShardInfo.RangeID, + PreviousRangeID: s.shardInfo.RangeID, }) } -func (s *shardContextImpl) GetTransferSequenceNumber() int64 { - return atomic.LoadInt64(&s.transferSequenceNumber) +func (s *shardContextImpl) GetTimerSequenceNumber() int64 { + return atomic.AddInt64(&s.timerSequeceNumber, 1) +} + +func (s *shardContextImpl) CreateWorkflowExecution(request *persistence.CreateWorkflowExecutionRequest) ( + *persistence.CreateWorkflowExecutionResponse, error) { +Create_Loop: + for attempt := 0; attempt < conditionalRetryCount; attempt++ { + currentRangeID := s.getRangeID() + request.RangeID = currentRangeID + response, err := s.executionManager.CreateWorkflowExecution(request) + if err != nil { + if _, ok := err.(*persistence.ShardOwnershipLostError); ok { + // RangeID might have been renewed by the same host while this update was in flight + // Retry the operation if we still have the shard ownership + if currentRangeID != s.getRangeID() { + continue Create_Loop + } else { + // Shard is stolen, trigger shutdown of history engine + s.close() + } + } + } + + return response, err + } + + return nil, ErrMaxAttemptsExceeded +} + +func (s *shardContextImpl) UpdateWorkflowExecution(request *persistence.UpdateWorkflowExecutionRequest) error { +Update_Loop: + for attempt := 0; attempt < conditionalRetryCount; attempt++ { + currentRangeID := s.getRangeID() + request.RangeID = currentRangeID + err := s.executionManager.UpdateWorkflowExecution(request) + if err != nil { + if _, ok := err.(*persistence.ShardOwnershipLostError); ok { + // RangeID might have been renewed by the same host while this update was in flight + // Retry the operation if we still have the shard ownership + if currentRangeID != s.getRangeID() { + continue Update_Loop + } else { + // Shard is stolen, trigger shutdown of history engine + s.close() + } + } + } + + return err + } + + return ErrMaxAttemptsExceeded +} + +func (s *shardContextImpl) GetLogger() bark.Logger { + return s.logger +} + +func (s *shardContextImpl) getRangeID() int64 { + s.RLock() + defer s.RUnlock() + + return s.shardInfo.RangeID +} + +func (s *shardContextImpl) close() { + if !atomic.CompareAndSwapInt32(&s.isClosed, 0, 1) { + return + } + + if s.closeCh != nil { + // This is the channel passed in by shard controller to monitor if a shard needs to be unloaded + // It will trigger the HistoryEngine unload and removal of engine from shard controller + s.closeCh <- s.shardID + } +} + +func (s *shardContextImpl) updateRangeIfNeededLocked() error { + if s.transferSequenceNumber < s.maxTransferSequenceNumber { + return nil + } + + return s.renewRangeLocked(false) +} + +func (s *shardContextImpl) renewRangeLocked(isStealing bool) error { + updatedShardInfo := copyShardInfo(s.shardInfo) + updatedShardInfo.RangeID++ + if isStealing { + updatedShardInfo.StolenSinceRenew++ + } + + err := s.shardManager.UpdateShard(&persistence.UpdateShardRequest{ + ShardInfo: updatedShardInfo, + PreviousRangeID: s.shardInfo.RangeID}) + if err != nil { + // Shard is stolen, trigger history engine shutdown + if _, ok := err.(*persistence.ShardOwnershipLostError); ok { + s.close() + } + return err + } + + // Range is successfully updated in cassandra now update shard context to reflect new range + s.transferSequenceNumber = updatedShardInfo.RangeID << s.rangeSize + s.maxTransferSequenceNumber = (updatedShardInfo.RangeID + 1) << s.rangeSize + s.shardInfo = updatedShardInfo + + logShardRangeUpdatedEvent(s.logger, s.shardInfo.ShardID, s.shardInfo.RangeID, s.transferSequenceNumber, + s.maxTransferSequenceNumber) + + return nil } func acquireShard(shardID int, shardManager persistence.ShardManager, executionMgr persistence.ExecutionManager, - owner string, logger bark.Logger) (ShardContext, error) { + owner string, closeCh chan<- int, logger bark.Logger) (ShardContext, error) { response, err0 := shardManager.GetShard(&persistence.GetShardRequest{ShardID: shardID}) if err0 != nil { return nil, err0 @@ -80,32 +216,28 @@ func acquireShard(shardID int, shardManager persistence.ShardManager, executionM shardInfo := response.ShardInfo updatedShardInfo := copyShardInfo(shardInfo) - updatedShardInfo.RangeID++ - updatedShardInfo.StolenSinceRenew++ updatedShardInfo.Owner = owner - - err1 := shardManager.UpdateShard(&persistence.UpdateShardRequest{ - ShardInfo: updatedShardInfo, - PreviousRangeID: shardInfo.RangeID}) - if err1 != nil { - return nil, err1 - } - context := &shardContextImpl{ - shardManager: shardManager, - shardInfo: updatedShardInfo, - executionManager: executionMgr, - transferSequenceNumber: updatedShardInfo.RangeID << 24, + shardManager: shardManager, + executionManager: executionMgr, + shardInfo: updatedShardInfo, + rangeSize: defaultRangeSize, + closeCh: closeCh, } context.logger = logger.WithFields(bark.Fields{ tagHistoryShardID: shardID, }) + err1 := context.renewRangeLocked(true) + if err1 != nil { + return nil, err1 + } + return context, nil } func copyShardInfo(shardInfo *persistence.ShardInfo) *persistence.ShardInfo { - copy := &persistence.ShardInfo{ + shardInfoCopy := &persistence.ShardInfo{ ShardID: shardInfo.ShardID, Owner: shardInfo.Owner, RangeID: shardInfo.RangeID, @@ -113,5 +245,5 @@ func copyShardInfo(shardInfo *persistence.ShardInfo) *persistence.ShardInfo { TransferAckLevel: atomic.LoadInt64(&shardInfo.TransferAckLevel), } - return copy + return shardInfoCopy } diff --git a/service/history/shardController.go b/service/history/shardController.go index a860bea0b04..9e4a043289d 100644 --- a/service/history/shardController.go +++ b/service/history/shardController.go @@ -28,6 +28,7 @@ type ( shardMgr persistence.ShardManager executionMgrFactory persistence.ExecutionManagerFactory engineFactory EngineFactory + shardClosedCh chan int isStarted int32 isStopped int32 shutdownWG sync.WaitGroup @@ -36,6 +37,7 @@ type ( sync.RWMutex historyShards map[int]*historyShardsItem + isStopping bool } historyShardsItem struct { @@ -65,8 +67,11 @@ func newShardController(numberOfShards int, host *membership.HostInfo, resolver executionMgrFactory: executionMgrFactory, engineFactory: factory, historyShards: make(map[int]*historyShardsItem), + shardClosedCh: make(chan int, numberOfShards), shutdownCh: make(chan struct{}), - logger: logger, + logger: logger.WithFields(bark.Fields{ + tagWorkflowComponent: tagValueShardController, + }), } } @@ -78,7 +83,9 @@ func newHistoryShardsItem(shardID int, shardMgr persistence.ShardManager, execut executionMgrFactory: executionMgrFactory, engineFactory: factory, host: host, - logger: logger, + logger: logger.WithFields(bark.Fields{ + tagHistoryShardID: shardID, + }), } } @@ -90,11 +97,11 @@ func (c *shardController) Start() { c.acquireShards() c.shutdownWG.Add(1) - go c.acquireShardsPump() + go c.shardManagementPump() c.hServiceResolver.AddListener(shardControllerMembershipUpdateListenerName, c.membershipUpdateCh) - c.logger.Info("ShardController started.") + logShardControllerStartedEvent(c.logger, c.host.Identity()) } func (c *shardController) Stop() { @@ -102,18 +109,22 @@ func (c *shardController) Stop() { return } + c.Lock() + c.isStopping = true + c.Unlock() + if atomic.LoadInt32(&c.isStarted) == 1 { if err := c.hServiceResolver.RemoveListener(shardControllerMembershipUpdateListenerName); err != nil { - c.logger.Warnf("Error removing membership update listerner: %v", err) + logOperationFailedEvent(c.logger, "Error removing membership update listerner", err) } close(c.shutdownCh) } if success := common.AwaitWaitGroup(&c.shutdownWG, time.Minute); !success { - c.logger.Warn("ShardController timed out on shutdown.") + logShardControllerShutdownTimedoutEvent(c.logger, c.host.Identity()) } - c.logger.Info("ShardController stopped.") + logShardControllerShutdownEvent(c.logger, c.host.Identity()) } func (c *shardController) GetEngine(workflowID string) (Engine, error) { @@ -127,7 +138,7 @@ func (c *shardController) getEngineForShard(shardID int) (Engine, error) { return nil, err } - return item.getOrCreateEngine() + return item.getOrCreateEngine(c.shardClosedCh) } func (c *shardController) removeEngineForShard(shardID int) { @@ -152,15 +163,18 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar return item, nil } + if c.isStopping { + return nil, fmt.Errorf("ShardController for host '%v' shutting down.", c.host.Identity()) + } info, err := c.hServiceResolver.Lookup(string(shardID)) if err != nil { return nil, err } if info.Identity() == c.host.Identity() { - c.logger.Infof("Creating new history shard item. Host: %v, ShardID: %v", info.Identity(), shardID) shardItem := newHistoryShardsItem(shardID, c.shardMgr, c.executionMgrFactory, c.engineFactory, c.host, c.logger) c.historyShards[shardID] = shardItem + logShardItemCreatedEvent(c.logger, info.Identity(), shardID) return shardItem, nil } @@ -170,18 +184,19 @@ func (c *shardController) getOrCreateHistoryShardItem(shardID int) (*historyShar func (c *shardController) removeHistoryShardItem(shardID int) (*historyShardsItem, error) { c.Lock() defer c.Unlock() + item, ok := c.historyShards[shardID] - if ok { - delete(c.historyShards, shardID) - c.logger.Infof("Removing history shard item. Host: %v, ShardID: %v, Count: %v", c.host.Identity(), shardID, - len(c.historyShards)) - return item, nil + if !ok { + return nil, fmt.Errorf("No item found to remove for shard: %v", shardID) } - return nil, fmt.Errorf("No item found to remove for shard: %v", shardID) + delete(c.historyShards, shardID) + logShardItemRemovedEvent(c.logger, c.host.Identity(), shardID, len(c.historyShards)) + + return item, nil } -func (c *shardController) acquireShardsPump() { +func (c *shardController) shardManagementPump() { defer c.shutdownWG.Done() acquireTicker := time.NewTicker(c.acquireInterval) @@ -189,12 +204,11 @@ func (c *shardController) acquireShardsPump() { for { select { case <-c.shutdownCh: - c.logger.Info("ShardController shutting down.") + logShardControllerShuttingDownEvent(c.logger, c.host.Identity()) c.Lock() defer c.Unlock() - for shardID, item := range c.historyShards { - c.logger.Infof("Shutting down engine for shardID: %v", shardID) + for _, item := range c.historyShards { item.stopEngine() } c.historyShards = nil @@ -202,9 +216,11 @@ func (c *shardController) acquireShardsPump() { case <-acquireTicker.C: c.acquireShards() case changedEvent := <-c.membershipUpdateCh: - c.logger.Infof("Updating shards due to membership changed event: {Added: %v, Removed: %v, Updated: %v}", - len(changedEvent.HostsAdded), len(changedEvent.HostsRemoved), len(changedEvent.HostsUpdated)) + logRingMembershipChangedEvent(c.logger, c.host.Identity(), len(changedEvent.HostsAdded), + len(changedEvent.HostsRemoved), len(changedEvent.HostsUpdated)) c.acquireShards() + case shardID := <-c.shardClosedCh: + c.removeEngineForShard(shardID) } } } @@ -214,14 +230,15 @@ AcquireLoop: for shardID := 0; shardID < c.numberOfShards; shardID++ { info, err := c.hServiceResolver.Lookup(string(shardID)) if err != nil { - c.logger.Warnf("Error looking up host for shardID: %v, Err: %v", shardID, err) + logOperationFailedEvent(c.logger, fmt.Sprintf("Error looking up host for shardID: %v", shardID), err) continue AcquireLoop } if info.Identity() == c.host.Identity() { _, err1 := c.getEngineForShard(shardID) if err1 != nil { - c.logger.Warnf("Unable to create history shard engine: %v, Err: %v", shardID, err1) + logOperationFailedEvent(c.logger, fmt.Sprintf("Unable to create history shard engine: %v", shardID), + err1) continue AcquireLoop } } else { @@ -237,10 +254,10 @@ func (i *historyShardsItem) getEngine() Engine { return i.engine } -func (i *historyShardsItem) getOrCreateEngine() (Engine, error) { +func (i *historyShardsItem) getOrCreateEngine(shardClosedCh chan<- int) (Engine, error) { i.RLock() if i.engine != nil { - i.RUnlock() + defer i.RUnlock() return i.engine, nil } i.RUnlock() @@ -257,12 +274,13 @@ func (i *historyShardsItem) getOrCreateEngine() (Engine, error) { return nil, err } - context, err := acquireShard(i.shardID, i.shardMgr, executionMgr, i.host.Identity(), i.logger) + context, err := acquireShard(i.shardID, i.shardMgr, executionMgr, i.host.Identity(), shardClosedCh, i.logger) if err != nil { return nil, err } - i.logger.Infof("Creating new engine for shardID: %v", i.shardID) + logShardEngineCreatingEvent(i.logger, i.host.Identity(), i.shardID) + defer logShardEngineCreatedEvent(i.logger, i.host.Identity(), i.shardID) i.engine = i.engineFactory.CreateEngine(context) i.engine.Start() @@ -270,11 +288,22 @@ func (i *historyShardsItem) getOrCreateEngine() (Engine, error) { } func (i *historyShardsItem) stopEngine() { + logShardEngineStoppingEvent(i.logger, i.host.Identity(), i.shardID) + defer logShardEngineStoppedEvent(i.logger, i.host.Identity(), i.shardID) i.Lock() defer i.Unlock() if i.engine != nil { - i.logger.Infof("Stopping engine for shardID: %v", i.shardID) i.engine.Stop() + i.engine = nil } } + +func isShardOwnershiptLostError(err error) bool { + switch err.(type) { + case *persistence.ShardOwnershipLostError: + return true + } + + return false +} diff --git a/service/history/shardController_test.go b/service/history/shardController_test.go index b4ed827f358..cc56b8728b4 100644 --- a/service/history/shardController_test.go +++ b/service/history/shardController_test.go @@ -11,6 +11,9 @@ import ( "errors" + "sync" + "time" + "code.uber.internal/devexp/minions/common/membership" mmocks "code.uber.internal/devexp/minions/common/mocks" "code.uber.internal/devexp/minions/common/persistence" @@ -196,3 +199,225 @@ func (s *shardControllerSuite) TestAcquireShardRenewLookupFailed() { s.NotNil(s.controller.getEngineForShard(shardID)) } } + +func (s *shardControllerSuite) TestHistoryEngineClosed() { + numShards := 4 + s.controller = newShardController(numShards, s.hostInfo, s.mockServiceResolver, s.mockShardManager, + s.mockExecutionMgrFactory, s.mockEngineFactory, s.logger) + historyEngines := make(map[int]*MockHistoryEngine) + for shardID := 0; shardID < numShards; shardID++ { + mockEngine := &MockHistoryEngine{} + historyEngines[shardID] = mockEngine + s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6) + } + + s.mockServiceResolver.On("AddListener", shardControllerMembershipUpdateListenerName, + mock.Anything).Return(nil) + s.controller.Start() + var workerWG sync.WaitGroup + for w := 0; w < 10; w++ { + workerWG.Add(1) + go func() { + for attempt := 0; attempt < 10; attempt++ { + for shardID := 0; shardID < numShards; shardID++ { + engine, err := s.controller.getEngineForShard(shardID) + s.Nil(err) + s.NotNil(engine) + } + } + workerWG.Done() + }() + } + + workerWG.Wait() + + differentHostInfo := membership.NewHostInfo("another-host", nil) + for shardID := 0; shardID < 2; shardID++ { + mockEngine := historyEngines[shardID] + mockEngine.On("Stop").Return().Once() + s.mockServiceResolver.On("Lookup", string(shardID)).Return(differentHostInfo, nil) + s.controller.shardClosedCh <- shardID + } + + for w := 0; w < 10; w++ { + workerWG.Add(1) + go func() { + for attempt := 0; attempt < 10; attempt++ { + for shardID := 2; shardID < numShards; shardID++ { + engine, err := s.controller.getEngineForShard(shardID) + s.Nil(err) + s.NotNil(engine) + time.Sleep(20 * time.Millisecond) + } + } + workerWG.Done() + }() + } + + for w := 0; w < 10; w++ { + workerWG.Add(1) + go func() { + shardLost := false + for attempt := 0; !shardLost && attempt < 10; attempt++ { + for shardID := 0; shardID < 2; shardID++ { + _, err := s.controller.getEngineForShard(shardID) + if err != nil { + s.logger.Errorf("ShardLost: %v", err) + shardLost = true + } + time.Sleep(20 * time.Millisecond) + } + } + + s.True(shardLost) + workerWG.Done() + }() + } + + workerWG.Wait() + + for _, mockEngine := range historyEngines { + mockEngine.AssertExpectations(s.T()) + } +} + +func (s *shardControllerSuite) TestRingUpdated() { + numShards := 4 + s.controller = newShardController(numShards, s.hostInfo, s.mockServiceResolver, s.mockShardManager, + s.mockExecutionMgrFactory, s.mockEngineFactory, s.logger) + historyEngines := make(map[int]*MockHistoryEngine) + for shardID := 0; shardID < numShards; shardID++ { + mockEngine := &MockHistoryEngine{} + historyEngines[shardID] = mockEngine + s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6) + } + + s.mockServiceResolver.On("AddListener", shardControllerMembershipUpdateListenerName, + mock.Anything).Return(nil) + s.controller.Start() + + differentHostInfo := membership.NewHostInfo("another-host", nil) + for shardID := 0; shardID < 2; shardID++ { + mockEngine := historyEngines[shardID] + mockEngine.On("Stop").Return().Once() + s.mockServiceResolver.On("Lookup", string(shardID)).Return(differentHostInfo, nil) + } + s.mockServiceResolver.On("Lookup", string(2)).Return(s.hostInfo, nil) + s.mockServiceResolver.On("Lookup", string(3)).Return(s.hostInfo, nil) + s.controller.membershipUpdateCh <- &membership.ChangedEvent{} + + var workerWG sync.WaitGroup + for w := 0; w < 10; w++ { + workerWG.Add(1) + go func() { + for attempt := 0; attempt < 10; attempt++ { + for shardID := 2; shardID < numShards; shardID++ { + engine, err := s.controller.getEngineForShard(shardID) + s.Nil(err) + s.NotNil(engine) + time.Sleep(20 * time.Millisecond) + } + } + workerWG.Done() + }() + } + + for w := 0; w < 10; w++ { + workerWG.Add(1) + go func() { + shardLost := false + for attempt := 0; !shardLost && attempt < 10; attempt++ { + for shardID := 0; shardID < 2; shardID++ { + _, err := s.controller.getEngineForShard(shardID) + if err != nil { + s.logger.Errorf("ShardLost: %v", err) + shardLost = true + } + time.Sleep(20 * time.Millisecond) + } + } + + s.True(shardLost) + workerWG.Done() + }() + } + + workerWG.Wait() + + for _, mockEngine := range historyEngines { + mockEngine.AssertExpectations(s.T()) + } +} + +func (s *shardControllerSuite) TestShardControllerClosed() { + numShards := 4 + s.controller = newShardController(numShards, s.hostInfo, s.mockServiceResolver, s.mockShardManager, + s.mockExecutionMgrFactory, s.mockEngineFactory, s.logger) + historyEngines := make(map[int]*MockHistoryEngine) + for shardID := 0; shardID < numShards; shardID++ { + mockEngine := &MockHistoryEngine{} + historyEngines[shardID] = mockEngine + s.setupMocksForAcquireShard(shardID, mockEngine, 5, 6) + } + + s.mockServiceResolver.On("AddListener", shardControllerMembershipUpdateListenerName, + mock.Anything).Return(nil) + s.controller.Start() + + var workerWG sync.WaitGroup + for w := 0; w < 10; w++ { + workerWG.Add(1) + go func() { + shardLost := false + for attempt := 0; !shardLost && attempt < 10; attempt++ { + for shardID := 0; shardID < numShards; shardID++ { + _, err := s.controller.getEngineForShard(shardID) + if err != nil { + s.logger.Errorf("ShardLost: %v", err) + shardLost = true + } + time.Sleep(20 * time.Millisecond) + } + } + + s.True(shardLost) + workerWG.Done() + }() + } + + s.mockServiceResolver.On("RemoveListener", shardControllerMembershipUpdateListenerName).Return(nil) + for shardID := 0; shardID < numShards; shardID++ { + mockEngine := historyEngines[shardID] + mockEngine.On("Stop").Return().Once() + s.mockServiceResolver.On("Lookup", string(shardID)).Return(s.hostInfo, nil) + } + s.controller.Stop() + workerWG.Wait() +} + +func (s *shardControllerSuite) setupMocksForAcquireShard(shardID int, mockEngine *MockHistoryEngine, currentRangeID, + newRangeID int64) { + mockExecutionMgr := &mmocks.ExecutionManager{} + s.mockExecutionMgrFactory.On("CreateExecutionManager", shardID).Return(mockExecutionMgr, nil).Once() + mockEngine.On("Start").Return().Once() + s.mockServiceResolver.On("Lookup", string(shardID)).Return(s.hostInfo, nil).Twice() + s.mockEngineFactory.On("CreateEngine", mock.Anything).Return(mockEngine).Once() + s.mockShardManager.On("GetShard", &persistence.GetShardRequest{ShardID: shardID}).Return( + &persistence.GetShardResponse{ + ShardInfo: &persistence.ShardInfo{ + ShardID: shardID, + Owner: s.hostInfo.Identity(), + RangeID: currentRangeID, + }, + }, nil).Once() + s.mockShardManager.On("UpdateShard", &persistence.UpdateShardRequest{ + ShardInfo: &persistence.ShardInfo{ + ShardID: shardID, + Owner: s.hostInfo.Identity(), + RangeID: newRangeID, + StolenSinceRenew: 1, + TransferAckLevel: 0, + }, + PreviousRangeID: currentRangeID, + }).Return(nil).Once() +} diff --git a/service/history/timerQueueProcessor.go b/service/history/timerQueueProcessor.go index 1263b84d99d..46e510d90e7 100644 --- a/service/history/timerQueueProcessor.go +++ b/service/history/timerQueueProcessor.go @@ -483,7 +483,14 @@ Update_History_Loop: if scheduleNewDecision { // Schedule a new decision. - id := t.historyService.tracker.getNextTaskID() + id, err := t.historyService.tracker.getNextTaskID() + if err != nil { + if isShardOwnershiptLostError(err) { + // Shard is stolen. Stop timer processing to reduce duplicates + t.Stop() + } + return err + } defer t.historyService.tracker.completeTask(id) newDecisionEvent := builder.ScheduleDecisionTask() transferTasks = []persistence.Task{&persistence.DecisionTask{ diff --git a/service/history/timerQueueProcessor_test.go b/service/history/timerQueueProcessor_test.go index c122db88aaa..1da72b94148 100644 --- a/service/history/timerQueueProcessor_test.go +++ b/service/history/timerQueueProcessor_test.go @@ -20,8 +20,10 @@ type ( timerQueueProcessorSuite struct { suite.Suite persistence.TestBase - engineImpl *historyEngineImpl - logger bark.Logger + engineImpl *historyEngineImpl + mockShardManager *mocks.ShardManager + shardClosedCh chan int + logger bark.Logger } ) @@ -42,12 +44,22 @@ func (s *timerQueueProcessorSuite) SetupSuite() { s.logger = bark.NewLoggerFromLogrus(log2) shardID := 0 + s.mockShardManager = &mocks.ShardManager{} resp, err := s.ShardMgr.GetShard(&persistence.GetShardRequest{ShardID: shardID}) if err != nil { log.Fatal(err) } - shard := &shardContextImpl{shardInfo: resp.ShardInfo, executionManager: s.WorkflowMgr, logger: s.logger} + shard := &shardContextImpl{ + shardInfo: resp.ShardInfo, + transferSequenceNumber: 1, + executionManager: s.WorkflowMgr, + shardManager: s.mockShardManager, + rangeSize: defaultRangeSize, + maxTransferSequenceNumber: 100000, + closeCh: s.shardClosedCh, + logger: s.logger, + } txProcessor := newTransferQueueProcessor(shard, &mocks.MatchingClient{}) tracker := newPendingTaskTracker(shard, txProcessor, s.logger) s.engineImpl = &historyEngineImpl{ @@ -64,6 +76,10 @@ func (s *timerQueueProcessorSuite) TearDownSuite() { s.TearDownWorkflowStore() } +func (s *timerQueueProcessorSuite) TearDownTest() { + s.mockShardManager.AssertExpectations(s.T()) +} + func (s *timerQueueProcessorSuite) getHistoryAndTimers(timeOuts []int32) ([]byte, []persistence.Task) { // Generate first decision task event. logger := bark.NewLoggerFromLogrus(log.New()) diff --git a/service/history/transferQueueProcessor.go b/service/history/transferQueueProcessor.go index 871f05d38cb..0b1c363960e 100644 --- a/service/history/transferQueueProcessor.go +++ b/service/history/transferQueueProcessor.go @@ -40,10 +40,12 @@ type ( // Outstanding tasks map uses the task id sequencer as the key, which is used by updateAckLevel to move the ack level // for the shard when all preceding tasks are acknowledged. ackManager struct { - shard ShardContext - executionMgr persistence.ExecutionManager - logger bark.Logger - lk sync.RWMutex + processor transferQueueProcessor + shard ShardContext + executionMgr persistence.ExecutionManager + logger bark.Logger + + sync.RWMutex outstandingTasks map[int64]bool readLevel int64 ackLevel int64 @@ -59,8 +61,7 @@ type ( func newTransferQueueProcessor(shard ShardContext, matching matching.Client) transferQueueProcessor { executionManager := shard.GetExecutionManager() logger := shard.GetLogger() - return &transferQueueProcessorImpl{ - ackMgr: newAckManager(shard, executionManager, logger), + processor := &transferQueueProcessorImpl{ executionManager: executionManager, matchingClient: matching, shutdownCh: make(chan struct{}), @@ -68,11 +69,16 @@ func newTransferQueueProcessor(shard ShardContext, matching matching.Client) tra tagWorkflowComponent: tagValueTransferQueueComponent, }), } + processor.ackMgr = newAckManager(processor, shard, executionManager, logger) + + return processor } -func newAckManager(shard ShardContext, executionMgr persistence.ExecutionManager, logger bark.Logger) *ackManager { +func newAckManager(processor transferQueueProcessor, shard ShardContext, executionMgr persistence.ExecutionManager, + logger bark.Logger) *ackManager { ackLevel := shard.GetTransferAckLevel() return &ackManager{ + processor: processor, shard: shard, executionMgr: executionMgr, outstandingTasks: make(map[int64]bool), @@ -235,11 +241,14 @@ ProcessRetryLoop: } func (a *ackManager) readTransferTasks() ([]*persistence.TransferTaskInfo, error) { + a.RLock() + rLevel := a.readLevel + mLevel := a.maxAllowedReadLevel + a.RUnlock() response, err := a.executionMgr.GetTransferTasks(&persistence.GetTransferTasksRequest{ - ReadLevel: atomic.LoadInt64(&a.readLevel), - MaxReadLevel: atomic.LoadInt64(&a.maxAllowedReadLevel), + ReadLevel: rLevel, + MaxReadLevel: mLevel, BatchSize: transferTaskBatchSize, - RangeID: a.shard.GetRangeID(), }) if err != nil { @@ -251,7 +260,7 @@ func (a *ackManager) readTransferTasks() ([]*persistence.TransferTaskInfo, error return tasks, nil } - a.lk.Lock() + a.Lock() for _, task := range tasks { if a.readLevel >= task.TaskID { a.logger.Fatalf("Next task ID is less than current read level. TaskID: %v, ReadLevel: %v", task.TaskID, @@ -261,22 +270,22 @@ func (a *ackManager) readTransferTasks() ([]*persistence.TransferTaskInfo, error a.readLevel = task.TaskID a.outstandingTasks[a.readLevel] = false } - a.lk.Unlock() + a.Unlock() return tasks, nil } func (a *ackManager) completeTask(taskID int64) { - a.lk.Lock() + a.Lock() if _, ok := a.outstandingTasks[taskID]; ok { a.outstandingTasks[taskID] = true } - a.lk.Unlock() + a.Unlock() } func (a *ackManager) updateAckLevel() { - updatedAckLevel := int64(-1) - a.lk.Lock() + updatedAckLevel := a.ackLevel + a.Lock() MoveAckLevelLoop: for current := a.ackLevel + 1; current <= a.readLevel; current++ { if acked, ok := a.outstandingTasks[current]; ok { @@ -307,20 +316,25 @@ MoveAckLevelLoop: } } } - a.lk.Unlock() + a.Unlock() - if updatedAckLevel != -1 { - a.shard.UpdateAckLevel(updatedAckLevel) + // Always update ackLevel to detect if the shared is stolen + if err := a.shard.UpdateAckLevel(updatedAckLevel); err != nil { + if isShardOwnershiptLostError(err) { + // Shard is stolen, stop the processor + a.processor.Stop() + } } + } func (a *ackManager) updateMaxAllowedReadLevel(maxAllowedReadLevel int64) { - a.lk.Lock() + a.Lock() a.logger.Debugf("Updating max allowed read level for transfer tasks: %v", maxAllowedReadLevel) - if maxAllowedReadLevel > atomic.LoadInt64(&a.maxAllowedReadLevel) { - atomic.StoreInt64(&a.maxAllowedReadLevel, maxAllowedReadLevel) + if maxAllowedReadLevel > a.maxAllowedReadLevel { + a.maxAllowedReadLevel = maxAllowedReadLevel } - a.lk.Unlock() + a.Unlock() } func minDuration(x, y time.Duration) time.Duration {