Skip to content

Commit

Permalink
Properly handle throttling errors from Cassandra (cadence-workflow#316)
Browse files Browse the repository at this point in the history
Handle overload errors returned from Cassandra differently from timeouts and internal service errors.
This is because in the case of throttling errors we know that the query never executed, so we do not have to invoke the cache consistency maintenance part.
Issue cadence-workflow#300
  • Loading branch information
Tamer Eldeeb authored Aug 18, 2017
1 parent 215baf5 commit df9ac3e
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 22 deletions.
2 changes: 2 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,7 @@ const (
PersistenceErrShardOwnershipLostCounter
PersistenceErrConditionFailedCounter
PersistenceErrTimeoutCounter
PersistenceErrBusyCounter

NumCommonMetrics
)
Expand Down Expand Up @@ -523,6 +524,7 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
PersistenceErrShardOwnershipLostCounter: {metricName: "persistence.errors.shard-ownership-lost", metricType: Counter},
PersistenceErrConditionFailedCounter: {metricName: "persistence.errors.condition-failed", metricType: Counter},
PersistenceErrTimeoutCounter: {metricName: "persistence.errors.timeout", metricType: Counter},
PersistenceErrBusyCounter: {metricName: "persistence.errors.busy", metricType: Counter},
},
Frontend: {},
History: {
Expand Down
11 changes: 10 additions & 1 deletion common/persistence/cassandraHistoryPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ func (h *cassandraHistoryPersistence) AppendHistoryEvents(request *AppendHistory
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
if _, ok := err.(*gocql.RequestErrWriteTimeout); ok {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("AppendHistoryEvents operation failed. Error: %v", err),
}
} else if isTimeoutError(err) {
// Write may have succeeded, but we don't know
// return this info to the caller so they have the option of trying to find out by executing a read
return &TimeoutError{Msg: fmt.Sprintf("AppendHistoryEvents timed out. Error: %v", err)}
Expand Down Expand Up @@ -189,6 +193,11 @@ func (h *cassandraHistoryPersistence) DeleteWorkflowExecutionHistory(

err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("DeleteWorkflowExecutionHistory operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("DeleteWorkflowExecutionHistory operation failed. Error: %v", err),
}
Expand Down
87 changes: 85 additions & 2 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,11 @@ func (d *cassandraPersistence) CreateShard(request *CreateShardRequest) error {
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("CreateShard operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("CreateShard operation failed. Error : %v", err),
}
Expand Down Expand Up @@ -628,6 +633,10 @@ func (d *cassandraPersistence) GetShard(request *GetShardRequest) (*GetShardResp
return nil, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Shard not found. ShardId: %v", shardID),
}
} else if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("GetShard operation failed. Error: %v", err),
}
}

return nil, &workflow.InternalServiceError{
Expand Down Expand Up @@ -665,6 +674,11 @@ func (d *cassandraPersistence) UpdateShard(request *UpdateShardRequest) error {
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("UpdateShard operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("UpdateShard operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -724,6 +738,10 @@ func (d *cassandraPersistence) CreateWorkflowExecution(request *CreateWorkflowEx
// Write may have succeeded, but we don't know
// return this info to the caller so they have the option of trying to find out by executing a read
return nil, &TimeoutError{Msg: fmt.Sprintf("CreateWorkflowExecution timed out. Error: %v", err)}
} else if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("CreateWorkflowExecution operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("CreateWorkflowExecution operation failed. Error: %v", err),
Expand Down Expand Up @@ -887,6 +905,10 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v, RunId: %v",
execution.GetWorkflowId(), execution.GetRunId()),
}
} else if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("GetWorkflowExecution operation failed. Error: %v", err),
}
}

return nil, &workflow.InternalServiceError{
Expand Down Expand Up @@ -1035,6 +1057,10 @@ func (d *cassandraPersistence) UpdateWorkflowExecution(request *UpdateWorkflowEx
// Write may have succeeded, but we don't know
// return this info to the caller so they have the option of trying to find out by executing a read
return &TimeoutError{Msg: fmt.Sprintf("UpdateWorkflowExecution timed out. Error: %v", err)}
} else if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("UpdateWorkflowExecution operation failed. Error: %v", err),
Expand Down Expand Up @@ -1109,6 +1135,11 @@ func (d *cassandraPersistence) DeleteWorkflowExecution(request *DeleteWorkflowEx

err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("DeleteWorkflowExecution operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("DeleteWorkflowExecution operation failed. Error: %v", err),
}
Expand All @@ -1135,6 +1166,10 @@ func (d *cassandraPersistence) GetCurrentExecution(request *GetCurrentExecutionR
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v",
request.WorkflowID),
}
} else if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("GetCurrentExecution operation failed. Error: %v", err),
}
}

return nil, &workflow.InternalServiceError{
Expand Down Expand Up @@ -1197,6 +1232,11 @@ func (d *cassandraPersistence) CompleteTransferTask(request *CompleteTransferTas

err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("CompleteTransferTask operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("CompleteTransferTask operation failed. Error: %v", err),
}
Expand All @@ -1218,6 +1258,11 @@ func (d *cassandraPersistence) CompleteTimerTask(request *CompleteTimerTaskReque

err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("CompleteTimerTask operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("CompleteTimerTask operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -1256,9 +1301,14 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
request.TaskList,
request.TaskType,
0)
} else if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("LeaseTaskList operation failed. TaskList: %v, TaskType: %v, Error: %v",
request.TaskList, request.TaskType, err),
}
} else {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("LeaseTaskList operation failed. TaskList: %v, TaskType: %v, Error : %v",
Message: fmt.Sprintf("LeaseTaskList operation failed. TaskList: %v, TaskType: %v, Error: %v",
request.TaskList, request.TaskType, err),
}
}
Expand All @@ -1281,6 +1331,11 @@ func (d *cassandraPersistence) LeaseTaskList(request *LeaseTaskListRequest) (*Le
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("LeaseTaskList operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("LeaseTaskList operation failed. Error : %v", err),
}
Expand Down Expand Up @@ -1316,6 +1371,11 @@ func (d *cassandraPersistence) UpdateTaskList(request *UpdateTaskListRequest) (*
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("UpdateTaskList operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("UpdateTaskList operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -1390,8 +1450,13 @@ func (d *cassandraPersistence) CreateTasks(request *CreateTasksRequest) (*Create
previous := make(map[string]interface{})
applied, _, err := d.session.MapExecuteBatchCAS(batch, previous)
if err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("CreateTasks operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("CreateTask operation failed. Error : %v", err),
Message: fmt.Sprintf("CreateTasks operation failed. Error : %v", err),
}
}
if !applied {
Expand Down Expand Up @@ -1466,6 +1531,11 @@ func (d *cassandraPersistence) CompleteTask(request *CompleteTaskRequest) error

err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("CompleteTask operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("CompleteTask operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -1507,6 +1577,11 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq
}

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("GetTimerTasks operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetTimerTasks operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -2035,6 +2110,14 @@ func isTimeoutError(err error) bool {
return ok
}

func isThrottlingError(err error) bool {
if req, ok := err.(gocql.RequestError); ok {
// gocql does not expose the constant errOverloaded = 0x1001
return req.Code() == 0x1001
}
return false
}

// GetVisibilityTSFrom - helper method to get visibility timestamp
func GetVisibilityTSFrom(task Task) time.Time {
switch task.GetType() {
Expand Down
54 changes: 52 additions & 2 deletions common/persistence/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted(
query = query.WithTimestamp(common.UnixNanoToCQLTimestamp(request.StartTimestamp))
err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("RecordWorkflowExecutionStarted operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("RecordWorkflowExecutionStarted operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -204,6 +209,11 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
batch = batch.WithTimestamp(common.UnixNanoToCQLTimestamp(request.CloseTimestamp))
err := v.session.ExecuteBatch(batch)
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("RecordWorkflowExecutionClosed operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("RecordWorkflowExecutionClosed operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -238,6 +248,11 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err),
}
Expand All @@ -257,7 +272,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListOpenWorkflowExecutions operation failed. Not able to create query iterator.",
Message: "ListClosedWorkflowExecutions operation failed. Not able to create query iterator.",
}
}

Expand All @@ -273,8 +288,13 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListClosedWorkflowExecutions operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err),
Message: fmt.Sprintf("ListClosedWorkflowExecutions operation failed. Error: %v", err),
}
}

Expand Down Expand Up @@ -309,6 +329,11 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByType(
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByType operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByType operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -345,6 +370,11 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByType(
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByType operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByType operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -381,6 +411,11 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByWorkflowID(
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -417,6 +452,11 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByWorkflowI
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -453,6 +493,11 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus(
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByStatus operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByStatus operation failed. Error: %v", err),
}
Expand Down Expand Up @@ -486,6 +531,11 @@ func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution(
}

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Message: fmt.Sprintf("GetClosedWorkflowExecution operation failed. Error: %v", err),
}
}
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetClosedWorkflowExecution operation failed. Error: %v", err),
}
Expand Down
3 changes: 3 additions & 0 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ func (p *workflowExecutionPersistenceClient) updateErrorMetric(scope int, err er
case *TimeoutError:
p.metricClient.IncCounter(scope, metrics.PersistenceErrTimeoutCounter)
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
case *workflow.ServiceBusyError:
p.metricClient.IncCounter(scope, metrics.PersistenceErrBusyCounter)
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
default:
p.metricClient.IncCounter(scope, metrics.PersistenceFailures)
}
Expand Down
Loading

0 comments on commit df9ac3e

Please sign in to comment.