Skip to content

Commit

Permalink
Return deserialization tasks as is from persistence layer (temporalio…
Browse files Browse the repository at this point in the history
…#2042)

* Return tasks as is, keeping the type information
* Rename GetTimerIndexTasks to GetTimerTasks
* Modify persistence layer accordingly
* Add temporarily conversion back to original transfer / timer / replication / visibility info type to minimize number of changes
  • Loading branch information
wxing1292 authored Oct 26, 2021
1 parent b3731a4 commit d83e601
Show file tree
Hide file tree
Showing 30 changed files with 1,409 additions and 1,098 deletions.
593 changes: 331 additions & 262 deletions api/persistence/v1/executions.pb.go

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,8 @@ const (
PersistenceRangeDeleteReplicationTaskFromDLQScope
// PersistenceGetTimerTaskScope tracks GetTimerTask calls made by service to persistence layer
PersistenceGetTimerTaskScope
// PersistenceGetTimerIndexTasksScope tracks GetTimerIndexTasks calls made by service to persistence layer
PersistenceGetTimerIndexTasksScope
// PersistenceGetTimerTasksScope tracks GetTimerTasks calls made by service to persistence layer
PersistenceGetTimerTasksScope
// PersistenceCompleteTimerTaskScope tracks CompleteTimerTasks calls made by service to persistence layer
PersistenceCompleteTimerTaskScope
// PersistenceRangeCompleteTimerTaskScope tracks CompleteTimerTasks calls made by service to persistence layer
Expand Down Expand Up @@ -1131,7 +1131,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceDeleteReplicationTaskFromDLQScope: {operation: "DeleteReplicationTaskFromDLQ"},
PersistenceRangeDeleteReplicationTaskFromDLQScope: {operation: "RangeDeleteReplicationTaskFromDLQ"},
PersistenceGetTimerTaskScope: {operation: "GetTimerTask"},
PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"},
PersistenceGetTimerTasksScope: {operation: "GetTimerTasks"},
PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"},
PersistenceRangeCompleteTimerTaskScope: {operation: "RangeCompleteTimerTask"},
PersistenceCreateTaskScope: {operation: "CreateTask"},
Expand Down
102 changes: 33 additions & 69 deletions common/persistence/cassandra/mutable_state_task_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func (d *MutableStateTaskStore) AddTasks(

func (d *MutableStateTaskStore) GetTransferTask(
request *p.GetTransferTaskRequest,
) (*p.GetTransferTaskResponse, error) {
) (*p.InternalGetTransferTaskResponse, error) {
shardID := request.ShardID
taskID := request.TaskID
query := d.Session.Query(templateGetTransferTaskQuery,
Expand All @@ -383,19 +383,12 @@ func (d *MutableStateTaskStore) GetTransferTask(
if err := query.Scan(&data, &encoding); err != nil {
return nil, gocql.ConvertError("GetTransferTask", err)
}

info, err := serialization.TransferTaskInfoFromBlob(data, encoding)

if err != nil {
return nil, gocql.ConvertError("GetTransferTask", err)
}

return &p.GetTransferTaskResponse{TransferTaskInfo: info}, nil
return &p.InternalGetTransferTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil
}

func (d *MutableStateTaskStore) GetTransferTasks(
request *p.GetTransferTasksRequest,
) (*p.GetTransferTasksResponse, error) {
) (*p.InternalGetTransferTasksResponse, error) {

// Reading transfer tasks need to be quorum level consistent, otherwise we could lose task
query := d.Session.Query(templateGetTransferTasksQuery,
Expand All @@ -410,17 +403,15 @@ func (d *MutableStateTaskStore) GetTransferTasks(
)
iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.GetTransferTasksResponse{}
response := &p.InternalGetTransferTasksResponse{}
var data []byte
var encoding string

for iter.Scan(&data, &encoding) {
t, err := serialization.TransferTaskInfoFromBlob(data, encoding)
if err != nil {
return nil, gocql.ConvertError("GetTransferTasks", err)
}
response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding))

response.Tasks = append(response.Tasks, t)
data = nil
encoding = ""
}
if len(iter.PageState()) > 0 {
response.NextPageToken = iter.PageState()
Expand Down Expand Up @@ -469,7 +460,7 @@ func (d *MutableStateTaskStore) RangeCompleteTransferTask(

func (d *MutableStateTaskStore) GetTimerTask(
request *p.GetTimerTaskRequest,
) (*p.GetTimerTaskResponse, error) {
) (*p.InternalGetTimerTaskResponse, error) {
shardID := request.ShardID
taskID := request.TaskID
visibilityTs := request.VisibilityTimestamp
Expand All @@ -488,18 +479,12 @@ func (d *MutableStateTaskStore) GetTimerTask(
return nil, gocql.ConvertError("GetTimerTask", err)
}

info, err := serialization.TimerTaskInfoFromBlob(data, encoding)

if err != nil {
return nil, gocql.ConvertError("GetTimerTask", err)
}

return &p.GetTimerTaskResponse{TimerTaskInfo: info}, nil
return &p.InternalGetTimerTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil
}

func (d *MutableStateTaskStore) GetTimerIndexTasks(
request *p.GetTimerIndexTasksRequest,
) (*p.GetTimerIndexTasksResponse, error) {
func (d *MutableStateTaskStore) GetTimerTasks(
request *p.GetTimerTasksRequest,
) (*p.InternalGetTimerTasksResponse, error) {
// Reading timer tasks need to be quorum level consistent, otherwise we could lose tasks
minTimestamp := p.UnixMilliseconds(request.MinTimestamp)
maxTimestamp := p.UnixMilliseconds(request.MaxTimestamp)
Expand All @@ -514,25 +499,22 @@ func (d *MutableStateTaskStore) GetTimerIndexTasks(
)
iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.GetTimerIndexTasksResponse{}
response := &p.InternalGetTimerTasksResponse{}
var data []byte
var encoding string

for iter.Scan(&data, &encoding) {
t, err := serialization.TimerTaskInfoFromBlob(data, encoding)

if err != nil {
return nil, gocql.ConvertError("GetTimerIndexTasks", err)
}
response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding))

response.Timers = append(response.Timers, t)
data = nil
encoding = ""
}
if len(iter.PageState()) > 0 {
response.NextPageToken = iter.PageState()
}

if err := iter.Close(); err != nil {
return nil, gocql.ConvertError("GetTimerIndexTasks", err)
return nil, gocql.ConvertError("GetTimerTasks", err)
}

return response, nil
Expand Down Expand Up @@ -576,7 +558,7 @@ func (d *MutableStateTaskStore) RangeCompleteTimerTask(

func (d *MutableStateTaskStore) GetReplicationTask(
request *p.GetReplicationTaskRequest,
) (*p.GetReplicationTaskResponse, error) {
) (*p.InternalGetReplicationTaskResponse, error) {
shardID := request.ShardID
taskID := request.TaskID
query := d.Session.Query(templateGetReplicationTaskQuery,
Expand All @@ -594,18 +576,12 @@ func (d *MutableStateTaskStore) GetReplicationTask(
return nil, gocql.ConvertError("GetReplicationTask", err)
}

info, err := serialization.ReplicationTaskInfoFromBlob(data, encoding)

if err != nil {
return nil, gocql.ConvertError("GetReplicationTask", err)
}

return &p.GetReplicationTaskResponse{ReplicationTaskInfo: info}, nil
return &p.InternalGetReplicationTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil
}

func (d *MutableStateTaskStore) GetReplicationTasks(
request *p.GetReplicationTasksRequest,
) (*p.GetReplicationTasksResponse, error) {
) (*p.InternalGetReplicationTasksResponse, error) {

// Reading replication tasks need to be quorum level consistent, otherwise we could lose task
query := d.Session.Query(templateGetReplicationTasksQuery,
Expand Down Expand Up @@ -686,7 +662,7 @@ func (d *MutableStateTaskStore) PutReplicationTaskToDLQ(

func (d *MutableStateTaskStore) GetReplicationTasksFromDLQ(
request *p.GetReplicationTasksFromDLQRequest,
) (*p.GetReplicationTasksFromDLQResponse, error) {
) (*p.InternalGetReplicationTasksResponse, error) {
// Reading replication tasks need to be quorum level consistent, otherwise we could lose tasks
query := d.Session.Query(templateGetReplicationTasksQuery,
request.ShardID,
Expand Down Expand Up @@ -741,7 +717,7 @@ func (d *MutableStateTaskStore) RangeDeleteReplicationTaskFromDLQ(

func (d *MutableStateTaskStore) GetVisibilityTask(
request *p.GetVisibilityTaskRequest,
) (*p.GetVisibilityTaskResponse, error) {
) (*p.InternalGetVisibilityTaskResponse, error) {
shardID := request.ShardID
taskID := request.TaskID
query := d.Session.Query(templateGetVisibilityTaskQuery,
Expand All @@ -758,19 +734,12 @@ func (d *MutableStateTaskStore) GetVisibilityTask(
if err := query.Scan(&data, &encoding); err != nil {
return nil, gocql.ConvertError("GetVisibilityTask", err)
}

info, err := serialization.VisibilityTaskInfoFromBlob(data, encoding)

if err != nil {
return nil, gocql.ConvertError("GetVisibilityTask", err)
}

return &p.GetVisibilityTaskResponse{VisibilityTaskInfo: info}, nil
return &p.InternalGetVisibilityTaskResponse{Task: *p.NewDataBlob(data, encoding)}, nil
}

func (d *MutableStateTaskStore) GetVisibilityTasks(
request *p.GetVisibilityTasksRequest,
) (*p.GetVisibilityTasksResponse, error) {
) (*p.InternalGetVisibilityTasksResponse, error) {

// Reading Visibility tasks need to be quorum level consistent, otherwise we could lose task
query := d.Session.Query(templateGetVisibilityTasksQuery,
Expand All @@ -785,17 +754,15 @@ func (d *MutableStateTaskStore) GetVisibilityTasks(
)
iter := query.PageSize(request.BatchSize).PageState(request.NextPageToken).Iter()

response := &p.GetVisibilityTasksResponse{}
response := &p.InternalGetVisibilityTasksResponse{}
var data []byte
var encoding string

for iter.Scan(&data, &encoding) {
t, err := serialization.VisibilityTaskInfoFromBlob(data, encoding)
if err != nil {
return nil, gocql.ConvertError("GetVisibilityTasks", err)
}
response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding))

response.Tasks = append(response.Tasks, t)
data = nil
encoding = ""
}
if len(iter.PageState()) > 0 {
response.NextPageToken = iter.PageState()
Expand Down Expand Up @@ -845,21 +812,18 @@ func (d *MutableStateTaskStore) RangeCompleteVisibilityTask(
func (d *MutableStateTaskStore) populateGetReplicationTasksResponse(
query gocql.Query,
operation string,
) (*p.GetReplicationTasksResponse, error) {
) (*p.InternalGetReplicationTasksResponse, error) {
iter := query.Iter()

response := &p.GetReplicationTasksResponse{}
response := &p.InternalGetReplicationTasksResponse{}
var data []byte
var encoding string

for iter.Scan(&data, &encoding) {
t, err := serialization.ReplicationTaskInfoFromBlob(data, encoding)

if err != nil {
return nil, gocql.ConvertError(operation, err)
}
response.Tasks = append(response.Tasks, *p.NewDataBlob(data, encoding))

response.Tasks = append(response.Tasks, t)
data = nil
encoding = ""
}
if len(iter.PageState()) > 0 {
response.NextPageToken = iter.PageState()
Expand Down
22 changes: 11 additions & 11 deletions common/persistence/client/fault_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func (e *FaultInjectionExecutionStore) AddTasks(request *persistence.InternalAdd
}

func (e *FaultInjectionExecutionStore) GetTransferTask(request *persistence.GetTransferTaskRequest) (
*persistence.GetTransferTaskResponse,
*persistence.InternalGetTransferTaskResponse,
error,
) {
if err := e.ErrorGenerator.Generate(); err != nil {
Expand All @@ -464,7 +464,7 @@ func (e *FaultInjectionExecutionStore) GetTransferTask(request *persistence.GetT
}

func (e *FaultInjectionExecutionStore) GetTransferTasks(request *persistence.GetTransferTasksRequest) (
*persistence.GetTransferTasksResponse,
*persistence.InternalGetTransferTasksResponse,
error,
) {
if err := e.ErrorGenerator.Generate(); err != nil {
Expand All @@ -488,7 +488,7 @@ func (e *FaultInjectionExecutionStore) RangeCompleteTransferTask(request *persis
}

func (e *FaultInjectionExecutionStore) GetTimerTask(request *persistence.GetTimerTaskRequest) (
*persistence.GetTimerTaskResponse,
*persistence.InternalGetTimerTaskResponse,
error,
) {
if err := e.ErrorGenerator.Generate(); err != nil {
Expand All @@ -497,14 +497,14 @@ func (e *FaultInjectionExecutionStore) GetTimerTask(request *persistence.GetTime
return e.baseExecutionStore.GetTimerTask(request)
}

func (e *FaultInjectionExecutionStore) GetTimerIndexTasks(request *persistence.GetTimerIndexTasksRequest) (
*persistence.GetTimerIndexTasksResponse,
func (e *FaultInjectionExecutionStore) GetTimerTasks(request *persistence.GetTimerTasksRequest) (
*persistence.InternalGetTimerTasksResponse,
error,
) {
if err := e.ErrorGenerator.Generate(); err != nil {
return nil, err
}
return e.baseExecutionStore.GetTimerIndexTasks(request)
return e.baseExecutionStore.GetTimerTasks(request)
}

func (e *FaultInjectionExecutionStore) CompleteTimerTask(request *persistence.CompleteTimerTaskRequest) error {
Expand All @@ -522,7 +522,7 @@ func (e *FaultInjectionExecutionStore) RangeCompleteTimerTask(request *persisten
}

func (e *FaultInjectionExecutionStore) GetReplicationTask(request *persistence.GetReplicationTaskRequest) (
*persistence.GetReplicationTaskResponse,
*persistence.InternalGetReplicationTaskResponse,
error,
) {
if err := e.ErrorGenerator.Generate(); err != nil {
Expand All @@ -532,7 +532,7 @@ func (e *FaultInjectionExecutionStore) GetReplicationTask(request *persistence.G
}

func (e *FaultInjectionExecutionStore) GetReplicationTasks(request *persistence.GetReplicationTasksRequest) (
*persistence.GetReplicationTasksResponse,
*persistence.InternalGetReplicationTasksResponse,
error,
) {
if err := e.ErrorGenerator.Generate(); err != nil {
Expand Down Expand Up @@ -563,7 +563,7 @@ func (e *FaultInjectionExecutionStore) PutReplicationTaskToDLQ(request *persiste
}

func (e *FaultInjectionExecutionStore) GetReplicationTasksFromDLQ(request *persistence.GetReplicationTasksFromDLQRequest) (
*persistence.GetReplicationTasksFromDLQResponse,
*persistence.InternalGetReplicationTasksFromDLQResponse,
error,
) {
if err := e.ErrorGenerator.Generate(); err != nil {
Expand All @@ -587,7 +587,7 @@ func (e *FaultInjectionExecutionStore) RangeDeleteReplicationTaskFromDLQ(request
}

func (e *FaultInjectionExecutionStore) GetVisibilityTask(request *persistence.GetVisibilityTaskRequest) (
*persistence.GetVisibilityTaskResponse,
*persistence.InternalGetVisibilityTaskResponse,
error,
) {
if err := e.ErrorGenerator.Generate(); err != nil {
Expand All @@ -597,7 +597,7 @@ func (e *FaultInjectionExecutionStore) GetVisibilityTask(request *persistence.Ge
}

func (e *FaultInjectionExecutionStore) GetVisibilityTasks(request *persistence.GetVisibilityTasksRequest) (
*persistence.GetVisibilityTasksResponse,
*persistence.InternalGetVisibilityTasksResponse,
error,
) {
if err := e.ErrorGenerator.Generate(); err != nil {
Expand Down
Loading

0 comments on commit d83e601

Please sign in to comment.