Skip to content

Commit

Permalink
Visibility Store filtering operations (cadence-workflow#119)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tamer Eldeeb authored Apr 5, 2017
1 parent 49da962 commit b229cd6
Show file tree
Hide file tree
Showing 4 changed files with 446 additions and 56 deletions.
260 changes: 230 additions & 30 deletions common/persistence/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ const (
templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND workflow_id = ? ` +
`AND run_id = ? `
`AND start_time = ? ` +
`AND run_id = ?`

templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` +
`domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name) ` +
Expand All @@ -34,12 +34,48 @@ const (
templateGetOpenWorkflowExecutions = `SELECT workflow_id, run_id, start_time, workflow_type_name ` +
`FROM open_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition IN (?) `
`AND domain_partition IN (?) ` +
`AND start_time >= ? ` +
`AND start_time <= ? `

templateGetClosedWorkflowExecutions = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name ` +
`FROM closed_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition IN (?) `
`AND domain_partition IN (?) ` +
`AND start_time >= ? ` +
`AND start_time <= ? `

templateGetOpenWorkflowExecutionsByType = `SELECT workflow_id, run_id, start_time, workflow_type_name ` +
`FROM open_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND start_time >= ? ` +
`AND start_time <= ? ` +
`AND workflow_type_name = ? `

templateGetClosedWorkflowExecutionsByType = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name ` +
`FROM closed_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND start_time >= ? ` +
`AND start_time <= ? ` +
`AND workflow_type_name = ? `

templateGetOpenWorkflowExecutionsByID = `SELECT workflow_id, run_id, start_time, workflow_type_name ` +
`FROM open_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND start_time >= ? ` +
`AND start_time <= ? ` +
`AND workflow_id = ? `

templateGetClosedWorkflowExecutionsByID = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name ` +
`FROM closed_executions ` +
`WHERE domain_id = ? ` +
`AND domain_partition = ? ` +
`AND start_time >= ? ` +
`AND start_time <= ? ` +
`AND workflow_id = ? `
)

type (
Expand Down Expand Up @@ -96,7 +132,7 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(
batch.Query(templateDeleteWorkflowExecutionStarted,
request.DomainUUID,
domainPartition,
request.Execution.GetWorkflowId(),
request.StartTime,
request.Execution.GetRunId(),
)

Expand All @@ -122,7 +158,11 @@ func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed(

func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(
request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetOpenWorkflowExecutions, request.DomainUUID, domainPartition)
query := v.session.Query(templateGetOpenWorkflowExecutions,
request.DomainUUID,
domainPartition,
request.EarliestStartTime,
request.LatestStartTime)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
Expand All @@ -133,10 +173,10 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(

response := &ListWorkflowExecutionsResponse{}
response.Executions = make([]*WorkflowExecutionRecord, 0)
rec := make(map[string]interface{})
for iter.MapScan(rec) {
wfexecution := createWorkflowExecutionRecord(rec)
wfexecution, has := readOpenWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readOpenWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand All @@ -153,7 +193,11 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(

func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutions, request.DomainUUID, domainPartition)
query := v.session.Query(templateGetClosedWorkflowExecutions,
request.DomainUUID,
domainPartition,
request.EarliestStartTime,
request.LatestStartTime)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
Expand All @@ -164,10 +208,10 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(

response := &ListWorkflowExecutionsResponse{}
response.Executions = make([]*WorkflowExecutionRecord, 0)
rec := make(map[string]interface{})
for iter.MapScan(rec) {
wfexecution := createWorkflowExecutionRecord(rec)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
Expand All @@ -182,24 +226,180 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
return response, nil
}

func createWorkflowExecutionRecord(result map[string]interface{}) *WorkflowExecutionRecord {
record := &WorkflowExecutionRecord{}
for k, v := range result {
switch k {
case "workflow_id":
record.Execution.WorkflowId = common.StringPtr(v.(string))
case "run_id":
record.Execution.RunId = common.StringPtr(v.(gocql.UUID).String())
case "workflow_type_name":
record.WorkflowTypeName = v.(string)
case "start_time":
record.StartTime = v.(time.Time)
case "close_time":
record.CloseTime = v.(time.Time)
default:
// Unknown field, could happen due to schema update
func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByType(
request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetOpenWorkflowExecutionsByType,
request.DomainUUID,
domainPartition,
request.EarliestStartTime,
request.LatestStartTime,
request.WorkflowTypeName)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListOpenWorkflowExecutionsByType operation failed. Not able to create query iterator.",
}
}

response := &ListWorkflowExecutionsResponse{}
response.Executions = make([]*WorkflowExecutionRecord, 0)
wfexecution, has := readOpenWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readOpenWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByType operation failed. Error: %v", err),
}
}

return response, nil
}

func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByType(
request *ListWorkflowExecutionsByTypeRequest) (*ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByType,
request.DomainUUID,
domainPartition,
request.EarliestStartTime,
request.LatestStartTime,
request.WorkflowTypeName)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListClosedWorkflowExecutionsByType operation failed. Not able to create query iterator.",
}
}

return record
response := &ListWorkflowExecutionsResponse{}
response.Executions = make([]*WorkflowExecutionRecord, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByType operation failed. Error: %v", err),
}
}

return response, nil
}

func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByWorkflowID(
request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetOpenWorkflowExecutionsByID,
request.DomainUUID,
domainPartition,
request.EarliestStartTime,
request.LatestStartTime,
request.WorkflowID)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListOpenWorkflowExecutionsByWorkflowID operation failed. Not able to create query iterator.",
}
}

response := &ListWorkflowExecutionsResponse{}
response.Executions = make([]*WorkflowExecutionRecord, 0)
wfexecution, has := readOpenWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readOpenWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListOpenWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
}
}

return response, nil
}

func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByWorkflowID(
request *ListWorkflowExecutionsByWorkflowIDRequest) (*ListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByID,
request.DomainUUID,
domainPartition,
request.EarliestStartTime,
request.LatestStartTime,
request.WorkflowID)
iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter()
if iter == nil {
// TODO: should return a bad request error if the token is invalid
return nil, &workflow.InternalServiceError{
Message: "ListClosedWorkflowExecutionsByWorkflowID operation failed. Not able to create query iterator.",
}
}

response := &ListWorkflowExecutionsResponse{}
response.Executions = make([]*WorkflowExecutionRecord, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
wfexecution, has = readClosedWorkflowExecutionRecord(iter)
}

nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)
if err := iter.Close(); err != nil {
return nil, &workflow.InternalServiceError{
Message: fmt.Sprintf("ListClosedWorkflowExecutionsByWorkflowID operation failed. Error: %v", err),
}
}

return response, nil
}

func readOpenWorkflowExecutionRecord(iter *gocql.Iter) (*WorkflowExecutionRecord, bool) {
var workflowID string
var runID gocql.UUID
var typeName string
var startTime time.Time
if iter.Scan(&workflowID, &runID, &startTime, &typeName) {
record := &WorkflowExecutionRecord{}
record.Execution.WorkflowId = common.StringPtr(workflowID)
record.Execution.RunId = common.StringPtr(runID.String())
record.StartTime = startTime
record.WorkflowTypeName = typeName
return record, true
}
return nil, false
}

func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*WorkflowExecutionRecord, bool) {
var workflowID string
var runID gocql.UUID
var typeName string
var startTime time.Time
var closeTime time.Time
if iter.Scan(&workflowID, &runID, &startTime, &closeTime, &typeName) {
record := &WorkflowExecutionRecord{}
record.Execution.WorkflowId = common.StringPtr(workflowID)
record.Execution.RunId = common.StringPtr(runID.String())
record.StartTime = startTime
record.CloseTime = closeTime
record.WorkflowTypeName = typeName
return record, true
}
return nil, false
}
Loading

0 comments on commit b229cd6

Please sign in to comment.