Skip to content

Commit

Permalink
Separate visibility store from visibility manager (cadence-workflow#3606
Browse files Browse the repository at this point in the history
)
  • Loading branch information
andrewjdawson2016 authored Oct 8, 2020
1 parent 3fb129a commit d6c60dd
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 104 deletions.
38 changes: 19 additions & 19 deletions common/persistence/cassandra/cassandraVisibilityPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (v *cassandraVisibilityPersistence) UpsertWorkflowExecution(

func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(
_ context.Context,
request *p.ListWorkflowExecutionsRequest,
request *p.InternalListWorkflowExecutionsRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetOpenWorkflowExecutions,
request.DomainUUID,
Expand All @@ -364,7 +364,7 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readOpenWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -390,7 +390,7 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions(

func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
_ context.Context,
request *p.ListWorkflowExecutionsRequest,
request *p.InternalListWorkflowExecutionsRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutions,
request.DomainUUID,
Expand All @@ -406,7 +406,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -432,7 +432,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions(

func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByType(
_ context.Context,
request *p.ListWorkflowExecutionsByTypeRequest,
request *p.InternalListWorkflowExecutionsByTypeRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetOpenWorkflowExecutionsByType,
request.DomainUUID,
Expand All @@ -449,7 +449,7 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByType(
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readOpenWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -475,7 +475,7 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByType(

func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByType(
_ context.Context,
request *p.ListWorkflowExecutionsByTypeRequest,
request *p.InternalListWorkflowExecutionsByTypeRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByType,
request.DomainUUID,
Expand All @@ -492,7 +492,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByType(
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -518,7 +518,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByType(

func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByWorkflowID(
_ context.Context,
request *p.ListWorkflowExecutionsByWorkflowIDRequest,
request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetOpenWorkflowExecutionsByID,
request.DomainUUID,
Expand All @@ -535,7 +535,7 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByWorkflowID(
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readOpenWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -561,7 +561,7 @@ func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutionsByWorkflowID(

func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByWorkflowID(
_ context.Context,
request *p.ListWorkflowExecutionsByWorkflowIDRequest,
request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByID,
request.DomainUUID,
Expand All @@ -578,7 +578,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByWorkflowI
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -604,7 +604,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByWorkflowI

func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus(
_ context.Context,
request *p.ListClosedWorkflowExecutionsByStatusRequest,
request *p.InternalListClosedWorkflowExecutionsByStatusRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByStatus,
request.DomainUUID,
Expand All @@ -621,7 +621,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus(
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -647,7 +647,7 @@ func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutionsByStatus(

func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution(
_ context.Context,
request *p.GetClosedWorkflowExecutionRequest,
request *p.InternalGetClosedWorkflowExecutionRequest,
) (*p.InternalGetClosedWorkflowExecutionResponse, error) {
execution := request.Execution
query := v.session.Query(templateGetClosedWorkflowExecution,
Expand Down Expand Up @@ -715,7 +715,7 @@ func (v *cassandraVisibilityPersistence) CountWorkflowExecutions(
return nil, p.NewOperationNotSupportErrorForVis()
}

func readOpenWorkflowExecutionRecord(iter *gocql.Iter) (*p.VisibilityWorkflowExecutionInfo, bool) {
func readOpenWorkflowExecutionRecord(iter *gocql.Iter) (*p.InternalVisibilityWorkflowExecutionInfo, bool) {
var workflowID string
var runID gocql.UUID
var typeName string
Expand All @@ -725,7 +725,7 @@ func readOpenWorkflowExecutionRecord(iter *gocql.Iter) (*p.VisibilityWorkflowExe
var encoding string
var taskList string
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &typeName, &memo, &encoding, &taskList) {
record := &p.VisibilityWorkflowExecutionInfo{
record := &p.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID.String(),
TypeName: typeName,
Expand All @@ -739,7 +739,7 @@ func readOpenWorkflowExecutionRecord(iter *gocql.Iter) (*p.VisibilityWorkflowExe
return nil, false
}

func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*p.VisibilityWorkflowExecutionInfo, bool) {
func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*p.InternalVisibilityWorkflowExecutionInfo, bool) {
var workflowID string
var runID gocql.UUID
var typeName string
Expand All @@ -752,7 +752,7 @@ func readClosedWorkflowExecutionRecord(iter *gocql.Iter) (*p.VisibilityWorkflowE
var encoding string
var taskList string
if iter.Scan(&workflowID, &runID, &startTime, &executionTime, &closeTime, &typeName, &status, &historyLength, &memo, &encoding, &taskList) {
record := &p.VisibilityWorkflowExecutionInfo{
record := &p.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: workflowID,
RunID: runID.String(),
TypeName: typeName,
Expand Down
24 changes: 12 additions & 12 deletions common/persistence/cassandra/cassandraVisibilityPersistenceV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,35 +129,35 @@ func (v *cassandraVisibilityPersistenceV2) UpsertWorkflowExecution(

func (v *cassandraVisibilityPersistenceV2) ListOpenWorkflowExecutions(
ctx context.Context,
request *p.ListWorkflowExecutionsRequest,
request *p.InternalListWorkflowExecutionsRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
return v.persistence.ListOpenWorkflowExecutions(ctx, request)
}

func (v *cassandraVisibilityPersistenceV2) ListOpenWorkflowExecutionsByType(
ctx context.Context,
request *p.ListWorkflowExecutionsByTypeRequest,
request *p.InternalListWorkflowExecutionsByTypeRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
return v.persistence.ListOpenWorkflowExecutionsByType(ctx, request)
}

func (v *cassandraVisibilityPersistenceV2) ListOpenWorkflowExecutionsByWorkflowID(
ctx context.Context,
request *p.ListWorkflowExecutionsByWorkflowIDRequest,
request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
return v.persistence.ListOpenWorkflowExecutionsByWorkflowID(ctx, request)
}

func (v *cassandraVisibilityPersistenceV2) GetClosedWorkflowExecution(
ctx context.Context,
request *p.GetClosedWorkflowExecutionRequest,
request *p.InternalGetClosedWorkflowExecutionRequest,
) (*p.InternalGetClosedWorkflowExecutionResponse, error) {
return v.persistence.GetClosedWorkflowExecution(ctx, request)
}

func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutions(
_ context.Context,
request *p.ListWorkflowExecutionsRequest,
request *p.InternalListWorkflowExecutionsRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsV2,
request.DomainUUID,
Expand All @@ -173,7 +173,7 @@ func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutions(
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -199,7 +199,7 @@ func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutions(

func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByType(
_ context.Context,
request *p.ListWorkflowExecutionsByTypeRequest,
request *p.InternalListWorkflowExecutionsByTypeRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByTypeV2,
request.DomainUUID,
Expand All @@ -216,7 +216,7 @@ func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByType(
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -242,7 +242,7 @@ func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByType(

func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByWorkflowID(
_ context.Context,
request *p.ListWorkflowExecutionsByWorkflowIDRequest,
request *p.InternalListWorkflowExecutionsByWorkflowIDRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByIDV2,
request.DomainUUID,
Expand All @@ -259,7 +259,7 @@ func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByWorkflo
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand All @@ -285,7 +285,7 @@ func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByWorkflo

func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByStatus(
_ context.Context,
request *p.ListClosedWorkflowExecutionsByStatusRequest,
request *p.InternalListClosedWorkflowExecutionsByStatusRequest,
) (*p.InternalListWorkflowExecutionsResponse, error) {
query := v.session.Query(templateGetClosedWorkflowExecutionsByStatusV2,
request.DomainUUID,
Expand All @@ -302,7 +302,7 @@ func (v *cassandraVisibilityPersistenceV2) ListClosedWorkflowExecutionsByStatus(
}

response := &p.InternalListWorkflowExecutionsResponse{}
response.Executions = make([]*p.VisibilityWorkflowExecutionInfo, 0)
response.Executions = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0)
wfexecution, has := readClosedWorkflowExecutionRecord(iter)
for has {
response.Executions = append(response.Executions, wfexecution)
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/elasticsearch/decodeBench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func BenchmarkJSONDecodeToType(b *testing.B) {
for i := 0; i < b.N; i++ {
var source *visibilityRecord
json.Unmarshal(*bytes, &source)
record := &p.VisibilityWorkflowExecutionInfo{
record := &p.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: source.WorkflowID,
RunID: source.RunID,
TypeName: source.WorkflowType,
Expand Down Expand Up @@ -87,7 +87,7 @@ func BenchmarkJSONDecodeToMap(b *testing.B) {
closeStatus, _ := source[definition.CloseStatus].(json.Number).Int64()
historyLen, _ := source[definition.HistoryLength].(json.Number).Int64()

record := &p.VisibilityWorkflowExecutionInfo{
record := &p.InternalVisibilityWorkflowExecutionInfo{
WorkflowID: source[definition.WorkflowID].(string),
RunID: source[definition.RunID].(string),
TypeName: source[definition.WorkflowType].(string),
Expand Down
Loading

0 comments on commit d6c60dd

Please sign in to comment.