Skip to content

Commit

Permalink
Separate execution store and execution manager types (cadence-workflo…
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Oct 7, 2020
1 parent d234ea2 commit bd6b3e8
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 29 deletions.
14 changes: 7 additions & 7 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -1334,7 +1334,7 @@ func (d *cassandraPersistence) CreateWorkflowExecution(

func (d *cassandraPersistence) GetWorkflowExecution(
_ context.Context,
request *p.GetWorkflowExecutionRequest,
request *p.InternalGetWorkflowExecutionRequest,
) (
*p.InternalGetWorkflowExecutionResponse, error) {
execution := request.Execution
Expand Down Expand Up @@ -2246,7 +2246,7 @@ func (d *cassandraPersistence) GetTransferTasks(
func (d *cassandraPersistence) GetReplicationTasks(
_ context.Context,
request *p.GetReplicationTasksRequest,
) (*p.GetReplicationTasksResponse, error) {
) (*p.InternalGetReplicationTasksResponse, error) {

// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
query := d.session.Query(templateGetReplicationTasksQuery,
Expand All @@ -2265,15 +2265,15 @@ func (d *cassandraPersistence) GetReplicationTasks(

func (d *cassandraPersistence) populateGetReplicationTasksResponse(
query *gocql.Query,
) (*p.GetReplicationTasksResponse, error) {
) (*p.InternalGetReplicationTasksResponse, error) {
iter := query.Iter()
if iter == nil {
return nil, &workflow.InternalServiceError{
Message: "GetReplicationTasks operation failed. Not able to create query iterator.",
}
}

response := &p.GetReplicationTasksResponse{}
response := &p.InternalGetReplicationTasksResponse{}
task := make(map[string]interface{})
for iter.MapScan(task) {
t := createReplicationTaskInfo(task["replication"].(map[string]interface{}))
Expand Down Expand Up @@ -2942,7 +2942,7 @@ func (d *cassandraPersistence) GetTimerIndexTasks(

func (d *cassandraPersistence) PutReplicationTaskToDLQ(
_ context.Context,
request *p.PutReplicationTaskToDLQRequest,
request *p.InternalPutReplicationTaskToDLQRequest,
) error {
task := request.TaskInfo

Expand All @@ -2968,7 +2968,7 @@ func (d *cassandraPersistence) PutReplicationTaskToDLQ(
task.NewRunBranchToken,
defaultVisibilityTimestamp,
defaultVisibilityTimestamp,
task.GetTaskID())
task.TaskID)

err := query.Exec()
if err != nil {
Expand All @@ -2988,7 +2988,7 @@ func (d *cassandraPersistence) PutReplicationTaskToDLQ(
func (d *cassandraPersistence) GetReplicationTasksFromDLQ(
_ context.Context,
request *p.GetReplicationTasksFromDLQRequest,
) (*p.GetReplicationTasksFromDLQResponse, error) {
) (*p.InternalGetReplicationTasksFromDLQResponse, error) {
// Reading replication tasks need to be quorum level consistent, otherwise we could loose task
query := d.session.Query(templateGetReplicationTasksQuery,
d.shardID,
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1851,9 +1851,9 @@ func createTransferTaskInfo(

func createReplicationTaskInfo(
result map[string]interface{},
) *p.ReplicationTaskInfo {
) *p.InternalReplicationTaskInfo {

info := &p.ReplicationTaskInfo{}
info := &p.InternalReplicationTaskInfo{}
for k, v := range result {
switch k {
case "domain_id":
Expand Down
93 changes: 89 additions & 4 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ func (m *executionManagerImpl) GetWorkflowExecution(
request *GetWorkflowExecutionRequest,
) (*GetWorkflowExecutionResponse, error) {

response, err := m.persistence.GetWorkflowExecution(ctx, request)
internalRequest := &InternalGetWorkflowExecutionRequest{
DomainID: request.DomainID,
Execution: request.Execution,
}
response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -845,7 +849,15 @@ func (m *executionManagerImpl) GetReplicationTasks(
ctx context.Context,
request *GetReplicationTasksRequest,
) (*GetReplicationTasksResponse, error) {
return m.persistence.GetReplicationTasks(ctx, request)
resp, err := m.persistence.GetReplicationTasks(ctx, request)
if err != nil {
return nil, err
}

return &GetReplicationTasksResponse{
Tasks: m.fromInternalReplicationTaskInfos(resp.Tasks),
NextPageToken: resp.NextPageToken,
}, nil
}

func (m *executionManagerImpl) CompleteReplicationTask(
Expand All @@ -866,14 +878,25 @@ func (m *executionManagerImpl) PutReplicationTaskToDLQ(
ctx context.Context,
request *PutReplicationTaskToDLQRequest,
) error {
return m.persistence.PutReplicationTaskToDLQ(ctx, request)
internalRequest := &InternalPutReplicationTaskToDLQRequest{
SourceClusterName: request.SourceClusterName,
TaskInfo: m.toInternalReplicationTaskInfo(request.TaskInfo),
}
return m.persistence.PutReplicationTaskToDLQ(ctx, internalRequest)
}

func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
ctx context.Context,
request *GetReplicationTasksFromDLQRequest,
) (*GetReplicationTasksFromDLQResponse, error) {
return m.persistence.GetReplicationTasksFromDLQ(ctx, request)
resp, err := m.persistence.GetReplicationTasksFromDLQ(ctx, request)
if err != nil {
return nil, err
}
return &GetReplicationTasksFromDLQResponse{
Tasks: m.fromInternalReplicationTaskInfos(resp.Tasks),
NextPageToken: resp.NextPageToken,
}, nil
}

func (m *executionManagerImpl) GetReplicationDLQSize(
Expand Down Expand Up @@ -930,6 +953,68 @@ func (m *executionManagerImpl) Close() {
m.persistence.Close()
}

func (m *executionManagerImpl) fromInternalReplicationTaskInfos(internalInfos []*InternalReplicationTaskInfo) []*ReplicationTaskInfo {
if internalInfos == nil {
return nil
}
infos := make([]*ReplicationTaskInfo, len(internalInfos), len(internalInfos))
for i := 0; i < len(internalInfos); i++ {
infos[i] = m.fromInternalReplicationTaskInfo(internalInfos[i])
}
return infos
}

func (m *executionManagerImpl) fromInternalReplicationTaskInfo(internalInfo *InternalReplicationTaskInfo) *ReplicationTaskInfo {
if internalInfo == nil {
return nil
}
return &ReplicationTaskInfo{
DomainID: internalInfo.DomainID,
WorkflowID: internalInfo.WorkflowID,
RunID: internalInfo.RunID,
TaskID: internalInfo.TaskID,
TaskType: internalInfo.TaskType,
FirstEventID: internalInfo.FirstEventID,
NextEventID: internalInfo.NextEventID,
Version: internalInfo.Version,
ScheduledID: internalInfo.ScheduledID,
BranchToken: internalInfo.BranchToken,
NewRunBranchToken: internalInfo.NewRunBranchToken,
CreationTime: internalInfo.CreationTime,
}
}

func (m *executionManagerImpl) toInternalReplicationTaskInfos(infos []*ReplicationTaskInfo) []*InternalReplicationTaskInfo {
if infos == nil {
return nil
}
internalInfos := make([]*InternalReplicationTaskInfo, len(infos), len(infos))
for i := 0; i < len(infos); i++ {
internalInfos[i] = m.toInternalReplicationTaskInfo(infos[i])
}
return internalInfos
}

func (m *executionManagerImpl) toInternalReplicationTaskInfo(info *ReplicationTaskInfo) *InternalReplicationTaskInfo {
if info == nil {
return nil
}
return &InternalReplicationTaskInfo{
DomainID: info.DomainID,
WorkflowID: info.WorkflowID,
RunID: info.RunID,
TaskID: info.TaskID,
TaskType: info.TaskType,
FirstEventID: info.FirstEventID,
NextEventID: info.NextEventID,
Version: info.Version,
ScheduledID: info.ScheduledID,
BranchToken: info.BranchToken,
NewRunBranchToken: info.NewRunBranchToken,
CreationTime: info.CreationTime,
}
}

func getStartVersion(
versionHistories *VersionHistories,
) (int64, error) {
Expand Down
47 changes: 42 additions & 5 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type (
GetName() string
GetShardID() int
//The below three APIs are related to serialization/deserialization
GetWorkflowExecution(ctx context.Context, request *GetWorkflowExecutionRequest) (*InternalGetWorkflowExecutionResponse, error)
GetWorkflowExecution(ctx context.Context, request *InternalGetWorkflowExecutionRequest) (*InternalGetWorkflowExecutionResponse, error)
UpdateWorkflowExecution(ctx context.Context, request *InternalUpdateWorkflowExecutionRequest) error
ConflictResolveWorkflowExecution(ctx context.Context, request *InternalConflictResolveWorkflowExecutionRequest) error
ResetWorkflowExecution(ctx context.Context, request *InternalResetWorkflowExecutionRequest) error
Expand All @@ -84,11 +84,11 @@ type (
RangeCompleteTransferTask(ctx context.Context, request *RangeCompleteTransferTaskRequest) error

// Replication task related methods
GetReplicationTasks(ctx context.Context, request *GetReplicationTasksRequest) (*GetReplicationTasksResponse, error)
GetReplicationTasks(ctx context.Context, request *GetReplicationTasksRequest) (*InternalGetReplicationTasksResponse, error)
CompleteReplicationTask(ctx context.Context, request *CompleteReplicationTaskRequest) error
RangeCompleteReplicationTask(ctx context.Context, request *RangeCompleteReplicationTaskRequest) error
PutReplicationTaskToDLQ(ctx context.Context, request *PutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(ctx context.Context, request *GetReplicationTasksFromDLQRequest) (*GetReplicationTasksFromDLQResponse, error)
PutReplicationTaskToDLQ(ctx context.Context, request *InternalPutReplicationTaskToDLQRequest) error
GetReplicationTasksFromDLQ(ctx context.Context, request *GetReplicationTasksFromDLQRequest) (*InternalGetReplicationTasksFromDLQResponse, error)
GetReplicationDLQSize(ctx context.Context, request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error)
DeleteReplicationTaskFromDLQ(ctx context.Context, request *DeleteReplicationTaskFromDLQRequest) error
RangeDeleteReplicationTaskFromDLQ(ctx context.Context, request *RangeDeleteReplicationTaskFromDLQRequest) error
Expand Down Expand Up @@ -190,6 +190,37 @@ type (
NewWorkflowSnapshot InternalWorkflowSnapshot
}

// InternalGetReplicationTasksResponse is the response to GetReplicationTask
InternalGetReplicationTasksResponse struct {
Tasks []*InternalReplicationTaskInfo
NextPageToken []byte
}

// InternalPutReplicationTaskToDLQRequest is used to put a replication task to dlq
InternalPutReplicationTaskToDLQRequest struct {
SourceClusterName string
TaskInfo *InternalReplicationTaskInfo
}

// InternalGetReplicationTasksFromDLQResponse is the response for GetReplicationTasksFromDLQ
InternalGetReplicationTasksFromDLQResponse = InternalGetReplicationTasksResponse

// InternalReplicationTaskInfo describes the replication task created for replication of history events
InternalReplicationTaskInfo struct {
DomainID string
WorkflowID string
RunID string
TaskID int64
TaskType int
FirstEventID int64
NextEventID int64
Version int64
ScheduledID int64
BranchToken []byte
NewRunBranchToken []byte
CreationTime int64
}

// InternalWorkflowExecutionInfo describes a workflow execution for Persistence Interface
InternalWorkflowExecutionInfo struct {
DomainID string
Expand Down Expand Up @@ -454,7 +485,13 @@ type (
ShardID int
}

// InternalGetWorkflowExecutionResponse is the response to GetworkflowExecution for Persistence Interface
// InternalGetWorkflowExecutionRequest is used to retrieve the info of a workflow execution
InternalGetWorkflowExecutionRequest struct {
DomainID string
Execution workflow.WorkflowExecution
}

// InternalGetWorkflowExecutionResponse is the response to GetWorkflowExecution for Persistence Interface
InternalGetWorkflowExecutionResponse struct {
State *InternalWorkflowMutableState
}
Expand Down
22 changes: 11 additions & 11 deletions common/persistence/sql/sqlExecutionManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (m *sqlExecutionManager) createWorkflowExecutionTx(

func (m *sqlExecutionManager) GetWorkflowExecution(
ctx context.Context,
request *p.GetWorkflowExecutionRequest,
request *p.InternalGetWorkflowExecutionRequest,
) (*p.InternalGetWorkflowExecutionResponse, error) {

domainID := sqlplugin.MustParseUUID(request.DomainID)
Expand Down Expand Up @@ -946,7 +946,7 @@ func (m *sqlExecutionManager) RangeCompleteTransferTask(
func (m *sqlExecutionManager) GetReplicationTasks(
ctx context.Context,
request *p.GetReplicationTasksRequest,
) (*p.GetReplicationTasksResponse, error) {
) (*p.InternalGetReplicationTasksResponse, error) {

readLevel, maxReadLevelInclusive, err := getReadLevels(request)
if err != nil {
Expand All @@ -966,7 +966,7 @@ func (m *sqlExecutionManager) GetReplicationTasks(
case nil:
return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
case sql.ErrNoRows:
return &p.GetReplicationTasksResponse{}, nil
return &p.InternalGetReplicationTasksResponse{}, nil
default:
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetReplicationTasks operation failed. Select failed: %v", err),
Expand All @@ -990,19 +990,19 @@ func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxR
func (m *sqlExecutionManager) populateGetReplicationTasksResponse(
rows []sqlplugin.ReplicationTasksRow,
requestMaxReadLevel int64,
) (*p.GetReplicationTasksResponse, error) {
) (*p.InternalGetReplicationTasksResponse, error) {
if len(rows) == 0 {
return &p.GetReplicationTasksResponse{}, nil
return &p.InternalGetReplicationTasksResponse{}, nil
}

var tasks = make([]*p.ReplicationTaskInfo, len(rows))
var tasks = make([]*p.InternalReplicationTaskInfo, len(rows))
for i, row := range rows {
info, err := m.parser.ReplicationTaskInfoFromBlob(row.Data, row.DataEncoding)
if err != nil {
return nil, err
}

tasks[i] = &p.ReplicationTaskInfo{
tasks[i] = &p.InternalReplicationTaskInfo{
TaskID: row.TaskID,
DomainID: sqlplugin.UUID(info.DomainID).String(),
WorkflowID: info.GetWorkflowID(),
Expand All @@ -1022,7 +1022,7 @@ func (m *sqlExecutionManager) populateGetReplicationTasksResponse(
if lastTaskID < requestMaxReadLevel {
nextPageToken = serializePageToken(lastTaskID)
}
return &p.GetReplicationTasksResponse{
return &p.InternalGetReplicationTasksResponse{
Tasks: tasks,
NextPageToken: nextPageToken,
}, nil
Expand Down Expand Up @@ -1063,7 +1063,7 @@ func (m *sqlExecutionManager) RangeCompleteReplicationTask(
func (m *sqlExecutionManager) GetReplicationTasksFromDLQ(
ctx context.Context,
request *p.GetReplicationTasksFromDLQRequest,
) (*p.GetReplicationTasksFromDLQResponse, error) {
) (*p.InternalGetReplicationTasksFromDLQResponse, error) {

readLevel, maxReadLevelInclusive, err := getReadLevels(&request.GetReplicationTasksRequest)
if err != nil {
Expand All @@ -1085,7 +1085,7 @@ func (m *sqlExecutionManager) GetReplicationTasksFromDLQ(
case nil:
return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel)
case sql.ErrNoRows:
return &p.GetReplicationTasksResponse{}, nil
return &p.InternalGetReplicationTasksResponse{}, nil
default:
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("GetReplicationTasks operation failed. Select failed: %v", err),
Expand Down Expand Up @@ -1307,7 +1307,7 @@ func (m *sqlExecutionManager) RangeCompleteTimerTask(

func (m *sqlExecutionManager) PutReplicationTaskToDLQ(
ctx context.Context,
request *p.PutReplicationTaskToDLQRequest,
request *p.InternalPutReplicationTaskToDLQRequest,
) error {
replicationTask := request.TaskInfo
blob, err := m.parser.ReplicationTaskInfoToBlob(&sqlblobs.ReplicationTaskInfo{
Expand Down

0 comments on commit bd6b3e8

Please sign in to comment.