diff --git a/common/persistence/cassandraVisibilityPersistence.go b/common/persistence/cassandraVisibilityPersistence.go new file mode 100644 index 00000000000..387b2c795bb --- /dev/null +++ b/common/persistence/cassandraVisibilityPersistence.go @@ -0,0 +1,205 @@ +package persistence + +import ( + "fmt" + "time" + + "github.com/gocql/gocql" + "github.com/uber-common/bark" + + workflow "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" +) + +// Fixed domain values for now +const ( + domainPartition = 0 +) + +const ( + templateCreateWorkflowExecutionStarted = `INSERT INTO open_executions (` + + `domain_id, domain_partition, workflow_id, run_id, start_time, workflow_type_name) ` + + `VALUES (?, ?, ?, ?, ?, ?)` + + templateDeleteWorkflowExecutionStarted = `DELETE FROM open_executions ` + + `WHERE domain_id = ? ` + + `AND domain_partition = ? ` + + `AND workflow_id = ? ` + + `AND run_id = ? ` + + templateCreateWorkflowExecutionClosed = `INSERT INTO closed_executions (` + + `domain_id, domain_partition, workflow_id, run_id, start_time, close_time, workflow_type_name) ` + + `VALUES (?, ?, ?, ?, ?, ?, ?) using TTL ?` + + templateGetOpenWorkflowExecutions = `SELECT workflow_id, run_id, start_time, workflow_type_name ` + + `FROM open_executions ` + + `WHERE domain_id = ? ` + + `AND domain_partition IN (?) ` + + templateGetClosedWorkflowExecutions = `SELECT workflow_id, run_id, start_time, close_time, workflow_type_name ` + + `FROM closed_executions ` + + `WHERE domain_id = ? ` + + `AND domain_partition IN (?) ` +) + +type ( + cassandraVisibilityPersistence struct { + session *gocql.Session + lowConslevel gocql.Consistency + logger bark.Logger + } +) + +// NewCassandraVisibilityPersistence is used to create an instance of VisibilityManager implementation +func NewCassandraVisibilityPersistence( + hosts string, dc string, keyspace string, logger bark.Logger) (VisibilityManager, error) { + cluster := common.NewCassandraCluster(hosts, dc) + cluster.Keyspace = keyspace + cluster.ProtoVersion = cassandraProtoVersion + cluster.Consistency = gocql.LocalQuorum + cluster.SerialConsistency = gocql.LocalSerial + cluster.Timeout = defaultSessionTimeout + + session, err := cluster.CreateSession() + if err != nil { + return nil, err + } + + return &cassandraVisibilityPersistence{session: session, lowConslevel: gocql.One, logger: logger}, nil +} + +func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionStarted( + request *RecordWorkflowExecutionStartedRequest) error { + query := v.session.Query(templateCreateWorkflowExecutionStarted, + request.DomainUUID, + domainPartition, + request.Execution.GetWorkflowId(), + request.Execution.GetRunId(), + request.StartTime, + request.WorkflowTypeName, + ) + err := query.Exec() + if err != nil { + return &workflow.InternalServiceError{ + Message: fmt.Sprintf("RecordWorkflowExecutionStarted operation failed. Error: %v", err), + } + } + + return nil +} + +func (v *cassandraVisibilityPersistence) RecordWorkflowExecutionClosed( + request *RecordWorkflowExecutionClosedRequest) error { + batch := v.session.NewBatch(gocql.LoggedBatch) + + // First, remove execution from the open table + batch.Query(templateDeleteWorkflowExecutionStarted, + request.DomainUUID, + domainPartition, + request.Execution.GetWorkflowId(), + request.Execution.GetRunId(), + ) + + // Next, add a row in the closed table. This row is kepy for defaultDeleteTTLSeconds + batch.Query(templateCreateWorkflowExecutionClosed, + request.DomainUUID, + domainPartition, + request.Execution.GetWorkflowId(), + request.Execution.GetRunId(), + request.StartTime, + request.CloseTime, + request.WorkflowTypeName, + defaultDeleteTTLSeconds, + ) + err := v.session.ExecuteBatch(batch) + if err != nil { + return &workflow.InternalServiceError{ + Message: fmt.Sprintf("RecordWorkflowExecutionClosed operation failed. Error: %v", err), + } + } + return nil +} + +func (v *cassandraVisibilityPersistence) ListOpenWorkflowExecutions( + request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { + query := v.session.Query(templateGetOpenWorkflowExecutions, request.DomainUUID, domainPartition) + iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() + if iter == nil { + // TODO: should return a bad request error if the token is invalid + return nil, &workflow.InternalServiceError{ + Message: "ListOpenWorkflowExecutions operation failed. Not able to create query iterator.", + } + } + + response := &ListWorkflowExecutionsResponse{} + response.Executions = make([]*WorkflowExecutionRecord, 0) + rec := make(map[string]interface{}) + for iter.MapScan(rec) { + wfexecution := createWorkflowExecutionRecord(rec) + response.Executions = append(response.Executions, wfexecution) + } + + nextPageToken := iter.PageState() + response.NextPageToken = make([]byte, len(nextPageToken)) + copy(response.NextPageToken, nextPageToken) + if err := iter.Close(); err != nil { + return nil, &workflow.InternalServiceError{ + Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err), + } + } + + return response, nil +} + +func (v *cassandraVisibilityPersistence) ListClosedWorkflowExecutions( + request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) { + query := v.session.Query(templateGetClosedWorkflowExecutions, request.DomainUUID, domainPartition) + iter := query.PageSize(request.PageSize).PageState(request.NextPageToken).Iter() + if iter == nil { + // TODO: should return a bad request error if the token is invalid + return nil, &workflow.InternalServiceError{ + Message: "ListOpenWorkflowExecutions operation failed. Not able to create query iterator.", + } + } + + response := &ListWorkflowExecutionsResponse{} + response.Executions = make([]*WorkflowExecutionRecord, 0) + rec := make(map[string]interface{}) + for iter.MapScan(rec) { + wfexecution := createWorkflowExecutionRecord(rec) + response.Executions = append(response.Executions, wfexecution) + } + + nextPageToken := iter.PageState() + response.NextPageToken = make([]byte, len(nextPageToken)) + copy(response.NextPageToken, nextPageToken) + if err := iter.Close(); err != nil { + return nil, &workflow.InternalServiceError{ + Message: fmt.Sprintf("ListOpenWorkflowExecutions operation failed. Error: %v", err), + } + } + + return response, nil +} + +func createWorkflowExecutionRecord(result map[string]interface{}) *WorkflowExecutionRecord { + record := &WorkflowExecutionRecord{} + for k, v := range result { + switch k { + case "workflow_id": + record.Execution.WorkflowId = common.StringPtr(v.(string)) + case "run_id": + record.Execution.RunId = common.StringPtr(v.(gocql.UUID).String()) + case "workflow_type_name": + record.WorkflowTypeName = v.(string) + case "start_time": + record.StartTime = v.(time.Time) + case "close_time": + record.CloseTime = v.(time.Time) + default: + // Unknown field, could happen due to schema update + } + } + + return record +} diff --git a/common/persistence/cassandraVisibilityPersistence_test.go b/common/persistence/cassandraVisibilityPersistence_test.go new file mode 100644 index 00000000000..1f8328bd72d --- /dev/null +++ b/common/persistence/cassandraVisibilityPersistence_test.go @@ -0,0 +1,152 @@ +package persistence + +import ( + "os" + "testing" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + gen "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" +) + +type ( + visibilityPersistenceSuite struct { + suite.Suite + TestBase + // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, + // not merely log an error + *require.Assertions + } +) + +func TestVisibilityPersistenceSuite(t *testing.T) { + s := new(visibilityPersistenceSuite) + suite.Run(t, s) +} + +func (s *visibilityPersistenceSuite) SetupSuite() { + if testing.Verbose() { + log.SetOutput(os.Stdout) + } + + s.SetupWorkflowStore() +} + +func (s *visibilityPersistenceSuite) SetupTest() { + // Have to define our overridden assertions in the test setup. If we did it earlier, s.T() will return nil + s.Assertions = require.New(s.T()) +} + +func (s *visibilityPersistenceSuite) TearDownSuite() { + s.TearDownWorkflowStore() +} + +func (s *visibilityPersistenceSuite) TestBasicVisibility() { + testDomainUUID := uuid.New() + + workflowExecution := gen.WorkflowExecution{ + WorkflowId: common.StringPtr("visibility-workflow-test"), + RunId: common.StringPtr("fb15e4b5-356f-466d-8c6d-a29223e5c536"), + } + + startTime := time.Now().Add(time.Second * -5) + err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&RecordWorkflowExecutionStartedRequest{ + DomainUUID: testDomainUUID, + Execution: workflowExecution, + WorkflowTypeName: "visibility-workflow", + StartTime: startTime, + }) + s.Nil(err0) + + resp, err1 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{ + DomainUUID: testDomainUUID, + PageSize: 1, + }) + s.Nil(err1) + s.Equal(1, len(resp.Executions)) + s.Equal(workflowExecution.GetWorkflowId(), resp.Executions[0].Execution.GetWorkflowId()) + + err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&RecordWorkflowExecutionClosedRequest{ + DomainUUID: testDomainUUID, + Execution: workflowExecution, + WorkflowTypeName: "visibility-workflow", + StartTime: startTime, + CloseTime: time.Now(), + }) + s.Nil(err2) + + resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{ + DomainUUID: testDomainUUID, + PageSize: 1, + }) + s.Nil(err3) + s.Equal(0, len(resp.Executions)) + + resp, err4 := s.VisibilityMgr.ListClosedWorkflowExecutions(&ListWorkflowExecutionsRequest{ + DomainUUID: testDomainUUID, + PageSize: 1, + }) + s.Nil(err4) + s.Equal(1, len(resp.Executions)) +} + +func (s *visibilityPersistenceSuite) TestVisibilityPagination() { + testDomainUUID := uuid.New() + // Create 2 executions + workflowExecution1 := gen.WorkflowExecution{ + WorkflowId: common.StringPtr("visibility-pagination-test1"), + RunId: common.StringPtr("fb15e4b5-356f-466d-8c6d-a29223e5c536"), + } + err0 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&RecordWorkflowExecutionStartedRequest{ + DomainUUID: testDomainUUID, + Execution: workflowExecution1, + WorkflowTypeName: "visibility-workflow", + StartTime: time.Now(), + }) + s.Nil(err0) + + workflowExecution2 := gen.WorkflowExecution{ + WorkflowId: common.StringPtr("visibility-pagination-test2"), + RunId: common.StringPtr("fb15e4b5-356f-466d-8c6d-a29223e5c536"), + } + err1 := s.VisibilityMgr.RecordWorkflowExecutionStarted(&RecordWorkflowExecutionStartedRequest{ + DomainUUID: testDomainUUID, + Execution: workflowExecution2, + WorkflowTypeName: "visibility-workflow", + StartTime: time.Now(), + }) + s.Nil(err1) + + // Get the first one + resp, err2 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{ + DomainUUID: testDomainUUID, + PageSize: 1, + }) + s.Nil(err2) + s.Equal(1, len(resp.Executions)) + s.Equal(workflowExecution1.GetWorkflowId(), resp.Executions[0].Execution.GetWorkflowId()) + + // Use token to get the second one + resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{ + DomainUUID: testDomainUUID, + PageSize: 1, + NextPageToken: resp.NextPageToken, + }) + s.Nil(err3) + s.Equal(1, len(resp.Executions)) + s.Equal(workflowExecution2.GetWorkflowId(), resp.Executions[0].Execution.GetWorkflowId()) + + // Now should get empty result by using token + resp, err4 := s.VisibilityMgr.ListOpenWorkflowExecutions(&ListWorkflowExecutionsRequest{ + DomainUUID: testDomainUUID, + PageSize: 1, + NextPageToken: resp.NextPageToken, + }) + s.Nil(err4) + s.Equal(0, len(resp.Executions)) +} diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index de3452833c4..10700aecc59 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -6,6 +6,7 @@ import ( workflow "github.com/uber/cadence/.gen/go/shared" ) +// Domain status const ( DomainStatusRegistered = iota DomainStatusDeprecated @@ -14,7 +15,7 @@ const ( // Workflow execution states const ( - WorkflowStateCreated = iota + WorkflowStateCreated = iota WorkflowStateRunning WorkflowStateCompleted ) @@ -27,7 +28,7 @@ const ( // Transfer task types const ( - TransferTaskTypeDecisionTask = iota + TransferTaskTypeDecisionTask = iota TransferTaskTypeActivityTask TransferTaskTypeDeleteExecution ) diff --git a/common/persistence/persistenceTestBase.go b/common/persistence/persistenceTestBase.go index 2d8d5b5badb..716142ba50f 100644 --- a/common/persistence/persistenceTestBase.go +++ b/common/persistence/persistenceTestBase.go @@ -43,6 +43,7 @@ type ( TaskMgr TaskManager HistoryMgr HistoryManager MetadataManager MetadataManager + VisibilityMgr VisibilityManager ShardInfo *ShardInfo ShardContext *testShardContext readLevel int64 @@ -193,6 +194,12 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) { if err != nil { log.Fatal(err) } + + s.VisibilityMgr, err = NewCassandraVisibilityPersistence(options.ClusterHost, options.Datacenter, s.CassandraTestCluster.keyspace, log) + if err != nil { + log.Fatal(err) + } + // Create a shard for test s.readLevel = 0 s.ShardInfo = &ShardInfo{ @@ -604,6 +611,7 @@ func (s *CassandraTestCluster) setupTestCluster(keySpace string, dropKeySpace bo s.createCluster(testWorkflowClusterHosts, testDatacenter, gocql.Consistency(1), keySpace) s.createKeyspace(1, dropKeySpace) s.loadSchema("workflow_test.cql", schemaDir) + s.loadSchema("visibility_test.cql", schemaDir) } func (s *CassandraTestCluster) tearDownTestCluster() { diff --git a/common/persistence/visibilityInterfaces.go b/common/persistence/visibilityInterfaces.go new file mode 100644 index 00000000000..66e7850324d --- /dev/null +++ b/common/persistence/visibilityInterfaces.go @@ -0,0 +1,68 @@ +package persistence + +import ( + "time" + + s "github.com/uber/cadence/.gen/go/shared" +) + +// Interfaces for the Visibility Store. +// This is a secondary store that is eventually consistent with the main +// executions store, and stores workflow execution records for visibility +// purposes. + +type ( + + // WorkflowExecutionRecord contains info about workflow execution + WorkflowExecutionRecord struct { + Execution s.WorkflowExecution + WorkflowTypeName string + StartTime time.Time + CloseTime time.Time + } + + // RecordWorkflowExecutionStartedRequest is used to add a record of a newly + // started execution + RecordWorkflowExecutionStartedRequest struct { + DomainUUID string + Execution s.WorkflowExecution + WorkflowTypeName string + StartTime time.Time + } + + // RecordWorkflowExecutionClosedRequest is used to add a record of a newly + // closed execution + RecordWorkflowExecutionClosedRequest struct { + DomainUUID string + Execution s.WorkflowExecution + WorkflowTypeName string + StartTime time.Time + CloseTime time.Time + } + + // ListWorkflowExecutionsRequest is used to list executions in a domain + ListWorkflowExecutionsRequest struct { + DomainUUID string + // Maximum number of workflow executions per page + PageSize int + // Token to continue reading next page of workflow executions. + // Pass in empty slice for first page. + NextPageToken []byte + } + + // ListWorkflowExecutionsResponse is the response to ListWorkflowExecutionsRequest + ListWorkflowExecutionsResponse struct { + Executions []*WorkflowExecutionRecord + // Token to read next page if there are more workflow executions beyond page size. + // Use this to set NextPageToken on ListWorkflowExecutionsRequest to read the next page. + NextPageToken []byte + } + + // VisibilityManager is used to manage the visibility store + VisibilityManager interface { + RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error + RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error + ListOpenWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) + ListClosedWorkflowExecutions(request *ListWorkflowExecutionsRequest) (*ListWorkflowExecutionsResponse, error) + } +) diff --git a/schema/visibility_test.cql b/schema/visibility_test.cql new file mode 100644 index 00000000000..b93fae8389d --- /dev/null +++ b/schema/visibility_test.cql @@ -0,0 +1,27 @@ +CREATE TABLE open_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), workflow_id, run_id) +); + +CREATE INDEX open_by_start_time ON open_executions (start_time); +CREATE INDEX open_by_type ON open_executions (workflow_type_name); + +CREATE TABLE closed_executions ( + domain_id uuid, + domain_partition int, + workflow_id text, + run_id uuid, + start_time timestamp, + close_time timestamp, + workflow_type_name text, + PRIMARY KEY ((domain_id, domain_partition), workflow_id, run_id) +); + +CREATE INDEX closed_by_start_time ON closed_executions (start_time); +CREATE INDEX closed_by_close_time ON closed_executions (close_time); +CREATE INDEX closed_by_type ON closed_executions (workflow_type_name); \ No newline at end of file