Skip to content

Commit

Permalink
persistence: API changes to support tasklist cleanup (cadence-workflo…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Feb 28, 2019
1 parent 6db357f commit c62d51c
Show file tree
Hide file tree
Showing 19 changed files with 735 additions and 95 deletions.
9 changes: 9 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,16 @@ const (
PersistenceGetTasksScope
// PersistenceCompleteTaskScope tracks CompleteTask calls made by service to persistence layer
PersistenceCompleteTaskScope
// PersistenceCompleteTasksLessThanScope is the metric scope for persistence.TaskManager.PersistenceCompleteTasksLessThan API
PersistenceCompleteTasksLessThanScope
// PersistenceLeaseTaskListScope tracks LeaseTaskList calls made by service to persistence layer
PersistenceLeaseTaskListScope
// PersistenceUpdateTaskListScope tracks PersistenceUpdateTaskListScope calls made by service to persistence layer
PersistenceUpdateTaskListScope
// PersistenceListTaskListScope is the metric scope for persistence.TaskManager.ListTaskList API
PersistenceListTaskListScope
// PersistenceDeleteTaskListScope is the metric scope for persistence.TaskManager.DeleteTaskList API
PersistenceDeleteTaskListScope
// PersistenceAppendHistoryEventsScope tracks AppendHistoryEvents calls made by service to persistence layer
PersistenceAppendHistoryEventsScope
// PersistenceGetWorkflowExecutionHistoryScope tracks GetWorkflowExecutionHistory calls made by service to persistence layer
Expand Down Expand Up @@ -792,8 +798,11 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceCreateTaskScope: {operation: "CreateTask", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetTasksScope: {operation: "GetTasks", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCompleteTaskScope: {operation: "CompleteTask", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCompleteTasksLessThanScope: {operation: "CompleteTasksLessThan", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceLeaseTaskListScope: {operation: "LeaseTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateTaskListScope: {operation: "UpdateTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceListTaskListScope: {operation: "ListTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteTaskListScope: {operation: "DeleteTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceAppendHistoryEventsScope: {operation: "AppendHistoryEvents", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetWorkflowExecutionHistoryScope: {operation: "GetWorkflowExecutionHistory", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteWorkflowExecutionHistoryScope: {operation: "DeleteWorkflowExecutionHistory", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
Expand Down
83 changes: 81 additions & 2 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ const (
`name: ?, ` +
`type: ?, ` +
`ack_level: ?, ` +
`kind: ? ` +
`kind: ?, ` +
`last_updated: ? ` +
`}`

templateTaskType = `{` +
Expand Down Expand Up @@ -827,6 +828,13 @@ workflow_state = ? ` +
`and type = ? ` +
`and task_id = ?`

templateCompleteTasksLessThanQuery = `DELETE FROM tasks ` +
`WHERE domain_id = ? ` +
`AND task_list_name = ? ` +
`AND task_list_type = ? ` +
`AND type = ? ` +
`AND task_id <= ? `

templateGetTaskList = `SELECT ` +
`range_id, ` +
`task_list ` +
Expand Down Expand Up @@ -866,6 +874,14 @@ workflow_state = ? ` +
`range_id, ` +
`task_list ` +
`) VALUES (?, ?, ?, ?, ?, ?, ` + templateTaskListType + `) USING TTL ?`

templateDeleteTaskListQuery = `DELETE FROM tasks ` +
`WHERE domain_id = ? ` +
`AND task_list_name = ? ` +
`AND task_list_type = ? ` +
`AND type = ? ` +
`AND task_id = ? ` +
`IF range_id = ?`
)

var (
Expand Down Expand Up @@ -2516,6 +2532,7 @@ func (d *cassandraPersistence) LeaseTaskList(request *p.LeaseTaskListRequest) (*
Message: fmt.Sprintf("LeaseTaskList requires non empty task list"),
}
}
now := time.Now()
query := d.session.Query(templateGetTaskList,
request.DomainID,
request.TaskList,
Expand All @@ -2540,6 +2557,7 @@ func (d *cassandraPersistence) LeaseTaskList(request *p.LeaseTaskListRequest) (*
request.TaskType,
0,
request.TaskListKind,
now,
)
} else if isThrottlingError(err) {
return nil, &workflow.ServiceBusyError{
Expand All @@ -2562,6 +2580,7 @@ func (d *cassandraPersistence) LeaseTaskList(request *p.LeaseTaskListRequest) (*
request.TaskType,
ackLevel,
taskListKind,
now,
request.DomainID,
&request.TaskList,
request.TaskType,
Expand All @@ -2588,7 +2607,15 @@ func (d *cassandraPersistence) LeaseTaskList(request *p.LeaseTaskListRequest) (*
Msg: fmt.Sprintf("LeaseTaskList failed to apply. db rangeID %v", previousRangeID),
}
}
tli := &p.TaskListInfo{DomainID: request.DomainID, Name: request.TaskList, TaskType: request.TaskType, RangeID: rangeID + 1, AckLevel: ackLevel, Kind: request.TaskListKind}
tli := &p.TaskListInfo{
DomainID: request.DomainID,
Name: request.TaskList,
TaskType: request.TaskType,
RangeID: rangeID + 1,
AckLevel: ackLevel,
Kind: request.TaskListKind,
LastUpdated: now,
}
return &p.LeaseTaskListResponse{TaskListInfo: tli}, nil
}

Expand All @@ -2609,6 +2636,7 @@ func (d *cassandraPersistence) UpdateTaskList(request *p.UpdateTaskListRequest)
tli.TaskType,
tli.AckLevel,
tli.Kind,
time.Now(),
stickyTaskListTTL,
)
err := query.Exec()
Expand All @@ -2632,6 +2660,7 @@ func (d *cassandraPersistence) UpdateTaskList(request *p.UpdateTaskListRequest)
tli.TaskType,
tli.AckLevel,
tli.Kind,
time.Now(),
tli.DomainID,
&tli.Name,
tli.TaskType,
Expand Down Expand Up @@ -2668,6 +2697,35 @@ func (d *cassandraPersistence) UpdateTaskList(request *p.UpdateTaskListRequest)
return &p.UpdateTaskListResponse{}, nil
}

func (d *cassandraPersistence) ListTaskList(request *p.ListTaskListRequest) (*p.ListTaskListResponse, error) {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("unsupported operation"),
}
}

func (d *cassandraPersistence) DeleteTaskList(request *p.DeleteTaskListRequest) error {
query := d.session.Query(templateDeleteTaskListQuery,
request.DomainID, request.TaskListName, request.TaskListType, rowTypeTaskList, taskListTaskID, request.RangeID)
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
if isThrottlingError(err) {
return &workflow.ServiceBusyError{
Message: fmt.Sprintf("DeleteTaskList operation failed. Error: %v", err),
}
}
return &workflow.InternalServiceError{
Message: fmt.Sprintf("DeleteTaskList operation failed. Error: %v", err),
}
}
if !applied {
return &p.ConditionFailedError{
Msg: fmt.Sprintf("DeleteTaskList operation failed: expected_range_id=%v but found %+v", request.RangeID, previous),
}
}
return nil
}

// From TaskManager interface
func (d *cassandraPersistence) CreateTasks(request *p.CreateTasksRequest) (*p.CreateTasksResponse, error) {
batch := d.session.NewBatch(gocql.LoggedBatch)
Expand Down Expand Up @@ -2713,6 +2771,7 @@ func (d *cassandraPersistence) CreateTasks(request *p.CreateTasksRequest) (*p.Cr
taskListType,
ackLevel,
taskListKind,
time.Now(),
domainID,
taskList,
taskListType,
Expand Down Expand Up @@ -2818,6 +2877,26 @@ func (d *cassandraPersistence) CompleteTask(request *p.CompleteTaskRequest) erro
return nil
}

// CompleteTasksLessThan deletes all tasks less than or equal to the given task id. This API ignores the
// Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will
// be returned to the caller
func (d *cassandraPersistence) CompleteTasksLessThan(request *p.CompleteTasksLessThanRequest) (int, error) {
query := d.session.Query(templateCompleteTasksLessThanQuery,
request.DomainID, request.TaskListName, request.TaskType, rowTypeTask, request.TaskID)
err := query.Exec()
if err != nil {
if isThrottlingError(err) {
return 0, &workflow.ServiceBusyError{
Message: fmt.Sprintf("CompleteTasksLessThan operation failed. Error: %v", err),
}
}
return 0, &workflow.InternalServiceError{
Message: fmt.Sprintf("CompleteTasksLessThan operation failed. Error: %v", err),
}
}
return p.UnknownNumRowsAffected, nil
}

func (d *cassandraPersistence) GetTimerIndexTasks(request *p.GetTimerIndexTasksRequest) (*p.GetTimerIndexTasksResponse,
error) {
// Reading timer tasks need to be quorum level consistent, otherwise we could loose task
Expand Down
58 changes: 52 additions & 6 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ const (
TaskTypeArchiveHistoryEvent
)

// UnknownNumRowsAffected is returned when the number of rows that an API affected cannot be determined
const UnknownNumRowsAffected = -1

// Types of workflow backoff timeout
const (
WorkflowBackoffTimeoutTypeRetry = iota
Expand Down Expand Up @@ -333,12 +336,14 @@ type (

// TaskListInfo describes a state of a task list implementation.
TaskListInfo struct {
DomainID string
Name string
TaskType int
RangeID int64
AckLevel int64
Kind int
DomainID string
Name string
TaskType int
RangeID int64
AckLevel int64
Kind int
Expiry time.Time
LastUpdated time.Time
}

// TaskInfo describes either activity or decision task
Expand Down Expand Up @@ -895,6 +900,26 @@ type (
UpdateTaskListResponse struct {
}

// ListTaskListRequest contains the request params needed to invoke ListTaskList API
ListTaskListRequest struct {
PageSize int
PageToken []byte
}

// ListTaskListResponse is the response from ListTaskList API
ListTaskListResponse struct {
Items []TaskListInfo
NextPageToken []byte
}

// DeleteTaskListRequest contains the request params needed to invoke DeleteTaskList API
DeleteTaskListRequest struct {
DomainID string
TaskListName string
TaskListType int
RangeID int64
}

// CreateTasksRequest is used to create a new task for a workflow exectution
CreateTasksRequest struct {
TaskListInfo *TaskListInfo
Expand Down Expand Up @@ -934,6 +959,15 @@ type (
TaskID int64
}

// CompleteTasksLessThanRequest contains the request params needed to invoke CompleteTasksLessThan API
CompleteTasksLessThanRequest struct {
DomainID string
TaskListName string
TaskType int
TaskID int64 // Tasks less than or equal to this ID will be completed
Limit int // Limit on the max number of tasks that can be completed. Required param
}

// GetTimerIndexTasksRequest is the request for GetTimerIndexTasks
// TODO: replace this with an iterator that can configure min and max index.
GetTimerIndexTasksRequest struct {
Expand Down Expand Up @@ -1352,9 +1386,21 @@ type (
GetName() string
LeaseTaskList(request *LeaseTaskListRequest) (*LeaseTaskListResponse, error)
UpdateTaskList(request *UpdateTaskListRequest) (*UpdateTaskListResponse, error)
ListTaskList(request *ListTaskListRequest) (*ListTaskListResponse, error)
DeleteTaskList(request *DeleteTaskListRequest) error
CreateTasks(request *CreateTasksRequest) (*CreateTasksResponse, error)
GetTasks(request *GetTasksRequest) (*GetTasksResponse, error)
CompleteTask(request *CompleteTaskRequest) error
// CompleteTasksLessThan completes tasks less than or equal to the given task id
// This API takes a limit parameter which specifies the count of maxRows that
// can be deleted. This parameter may be ignored by the underlying storage, but
// its mandatory to specify it. On success this method returns the number of rows
// actually deleted. If the underlying storage doesn't support "limit", all rows
// less than or equal to taskID will be deleted.
// On success, this method returns:
// - number of rows actually deleted, if limit is honored
// - UnknownNumRowsDeleted, when all rows below value are deleted
CompleteTasksLessThan(request *CompleteTasksLessThanRequest) (int, error)
}

// HistoryManager is used to manage Workflow Execution HistoryEventBatch
Expand Down
Loading

0 comments on commit c62d51c

Please sign in to comment.