Skip to content

Commit

Permalink
Added sql visibility persistence (cadence-workflow#1196)
Browse files Browse the repository at this point in the history
  • Loading branch information
mfateev authored Oct 31, 2018
1 parent dee4c74 commit 73b753a
Show file tree
Hide file tree
Showing 23 changed files with 538 additions and 149 deletions.
5 changes: 4 additions & 1 deletion common/cache/domainCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package cache

import (
"fmt"
"hash/fnv"
"sort"
"strconv"
Expand Down Expand Up @@ -350,7 +351,9 @@ func (c *domainCache) loadDomain(name string, id string) (*persistence.GetDomain
// if this actually happen, just discard the result
// since we need to guarantee that domainNotificationVersion > all notification versions
// inside the cache
return nil, &workflow.EntityNotExistsError{}
return nil, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Domain: %v", name),
}
}
}
return resp, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ func (m *cassandraMetadataPersistence) GetDomain(request *p.GetDomainRequest) (*
ConfigVersion: configVersion,
FailoverVersion: failoverVersion,
NotificationVersion: dbVersion,
TableVersion: p.DomainTableVersionV1,
}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,7 @@ func (m *cassandraMetadataPersistenceV2) GetDomain(request *p.GetDomainRequest)
FailoverVersion: failoverVersion,
FailoverNotificationVersion: failoverNotificationVersion,
NotificationVersion: notificationVersion,
TableVersion: p.DomainTableVersionV2,
}, nil
}

Expand All @@ -349,6 +350,7 @@ func (m *cassandraMetadataPersistenceV2) ListDomains(request *p.ListDomainsReque
Info: &p.DomainInfo{},
Config: &p.DomainConfig{},
ReplicationConfig: &p.DomainReplicationConfig{},
TableVersion: p.DomainTableVersionV2,
}
var replicationClusters []map[string]interface{}
response := &p.ListDomainsResponse{}
Expand All @@ -371,6 +373,7 @@ func (m *cassandraMetadataPersistenceV2) ListDomains(request *p.ListDomainsReque
Info: &p.DomainInfo{},
Config: &p.DomainConfig{},
ReplicationConfig: &p.DomainReplicationConfig{},
TableVersion: p.DomainTableVersionV2,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,8 @@ func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution(
query := v.session.Query(templateGetClosedWorkflowExecution,
request.DomainUUID,
domainPartition,
*execution.WorkflowId,
*execution.RunId)
execution.GetWorkflowId(),
execution.GetRunId())

iter := query.Iter()
if iter == nil {
Expand All @@ -567,7 +567,7 @@ func (v *cassandraVisibilityPersistence) GetClosedWorkflowExecution(
if !has {
return nil, &workflow.EntityNotExistsError{
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v, RunId: %v",
*execution.WorkflowId, *execution.RunId),
execution.GetWorkflowId(), execution.GetRunId()),
}
}

Expand Down
32 changes: 19 additions & 13 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,12 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionBrandNew() {

_, err = s.ExecutionManager.CreateWorkflowExecution(req)
s.NotNil(err)
s.IsType(&p.WorkflowExecutionAlreadyStartedError{}, err)
alreadyStartedErr, ok := err.(*p.WorkflowExecutionAlreadyStartedError)
s.True(ok, "err is not WorkflowExecutionAlreadyStartedError")
s.Equal(req.RequestID, alreadyStartedErr.StartRequestID)
s.Equal(workflowExecution.GetRunId(), alreadyStartedErr.RunID)
s.Equal(0, alreadyStartedErr.CloseStatus)
s.Equal(p.WorkflowStateRunning, alreadyStartedErr.State)
}

// TestCreateWorkflowExecutionRunIDReuseWithReplication test
Expand Down Expand Up @@ -244,18 +249,19 @@ func (s *ExecutionManagerSuite) TestCreateWorkflowExecutionRunIDReuseWithoutRepl
// this create should work since we are relying the business logic in history engine
// to check whether the existing running workflow has finished
_, err3 := s.ExecutionManager.CreateWorkflowExecution(&p.CreateWorkflowExecutionRequest{
RequestID: uuid.New(),
DomainID: domainID,
Execution: newExecution,
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
DecisionTimeoutValue: decisionTimeout,
NextEventID: nextEventID,
LastProcessedEvent: lastProcessedEventID,
RangeID: s.ShardInfo.RangeID,
CreateWorkflowMode: p.CreateWorkflowModeWorkflowIDReuse,
PreviousRunID: workflowExecution.GetRunId(),
RequestID: uuid.New(),
DomainID: domainID,
Execution: newExecution,
TaskList: tasklist,
WorkflowTypeName: workflowType,
WorkflowTimeout: workflowTimeout,
DecisionTimeoutValue: decisionTimeout,
NextEventID: nextEventID,
LastProcessedEvent: lastProcessedEventID,
RangeID: s.ShardInfo.RangeID,
CreateWorkflowMode: p.CreateWorkflowModeWorkflowIDReuse,
PreviousRunID: workflowExecution.GetRunId(),
PreviousLastWriteVersion: common.EmptyVersion,
})
s.NoError(err3)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ func (m *MetadataPersistenceSuiteV2) TestListDomains() {
IsGlobalDomain: true,
ConfigVersion: 133,
FailoverVersion: 266,
TableVersion: p.DomainTableVersionV2,
},
{
Info: &p.DomainInfo{
Expand All @@ -797,6 +798,7 @@ func (m *MetadataPersistenceSuiteV2) TestListDomains() {
IsGlobalDomain: false,
ConfigVersion: 400,
FailoverVersion: 667,
TableVersion: p.DomainTableVersionV2,
},
}
for _, domain := range inputDomains {
Expand Down
14 changes: 6 additions & 8 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewTestBaseWithCassandra(options *TestBaseOptions) TestBase {
options.DBName = "test_" + GenerateRandomDBName(10)
}
testCluster := cassandra.NewTestCluster(options.Cassandra.DBPort, options.DBName, options.Cassandra.SchemaDir)
return newTestBase(options, testCluster, testCluster)
return newTestBase(options, testCluster)
}

// NewTestBaseWithSQL returns a new persistence test base backed by SQL
Expand All @@ -121,21 +121,19 @@ func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {
options.DBName = GenerateRandomDBName(10)
}
sqlOpts := options.SQL
defaultCluster := sql.NewTestCluster(sqlOpts.DBPort, options.DBName, sqlOpts.SchemaDir, sqlOpts.DriverName)
visibilityCluster := cassandra.NewTestCluster(options.Cassandra.DBPort, options.DBName, options.Cassandra.SchemaDir)
return newTestBase(options, defaultCluster, visibilityCluster)
testCluster := sql.NewTestCluster(sqlOpts.DBPort, options.DBName, sqlOpts.SchemaDir, sqlOpts.DriverName)
return newTestBase(options, testCluster)
}

func newTestBase(options *TestBaseOptions,
defaultCluster PersistenceTestCluster, visibilityCluster PersistenceTestCluster) TestBase {
func newTestBase(options *TestBaseOptions, testCluster PersistenceTestCluster) TestBase {
metadata := options.ClusterMetadata
if metadata == nil {
metadata = cluster.GetTestClusterMetadata(options.EnableGlobalDomain, options.IsMasterCluster)
}
options.ClusterMetadata = metadata
return TestBase{
DefaultTestCluster: defaultCluster,
VisibilityTestCluster: visibilityCluster,
DefaultTestCluster: testCluster,
VisibilityTestCluster: testCluster,
ClusterMetadata: metadata,
}
}
Expand Down
7 changes: 7 additions & 0 deletions common/persistence/persistence-tests/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,10 @@ func TestSQLExecutionManagerSuite(t *testing.T) {
s.TestBase.Setup()
suite.Run(t, s)
}

func TestSQLVisibilityPersistenceSuite(t *testing.T) {
s := new(VisibilityPersistenceSuite)
s.TestBase = NewTestBaseWithSQL(&TestBaseOptions{})
s.TestBase.Setup()
suite.Run(t, s)
}
36 changes: 22 additions & 14 deletions common/persistence/persistence-tests/visibilityPersistenceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {
})
s.Nil(err2)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution2.WorkflowId, resp.Executions[0].Execution.WorkflowId)
s.Equal(workflowExecution2.GetWorkflowId(), resp.Executions[0].GetExecution().GetWorkflowId())

// Use token to get the second one
resp, err3 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
Expand All @@ -169,18 +169,23 @@ func (s *VisibilityPersistenceSuite) TestVisibilityPagination() {
})
s.Nil(err3)
s.Equal(1, len(resp.Executions))
s.Equal(workflowExecution1.WorkflowId, resp.Executions[0].Execution.WorkflowId)
s.Equal(workflowExecution1.GetWorkflowId(), resp.Executions[0].GetExecution().GetWorkflowId())

// Now should get empty result by using token
resp, err4 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
EarliestStartTime: startTime1.UnixNano(),
LatestStartTime: startTime2.UnixNano(),
NextPageToken: resp.NextPageToken,
})
s.Nil(err4)
s.Equal(0, len(resp.Executions))
// TODO: See if it is possible in Cassandra to not return non empty token which is going to return empty result
if s.ExecutionManager.GetName() == "cassandra" {
// Now should get empty result by using token
resp, err4 := s.VisibilityMgr.ListOpenWorkflowExecutions(&p.ListWorkflowExecutionsRequest{
DomainUUID: testDomainUUID,
PageSize: 1,
EarliestStartTime: startTime1.UnixNano(),
LatestStartTime: startTime2.UnixNano(),
NextPageToken: resp.NextPageToken,
})
s.Nil(err4)
s.Equal(0, len(resp.Executions))
} else {
s.Equal(0, len(resp.NextPageToken))
}
}

// TestFilteringByType test
Expand Down Expand Up @@ -423,11 +428,14 @@ func (s *VisibilityPersistenceSuite) TestGetClosedExecution() {
})
s.Nil(err0)

_, err1 := s.VisibilityMgr.GetClosedWorkflowExecution(&p.GetClosedWorkflowExecutionRequest{
closedResp, err1 := s.VisibilityMgr.GetClosedWorkflowExecution(&p.GetClosedWorkflowExecutionRequest{
DomainUUID: testDomainUUID,
Execution: workflowExecution,
})
s.NotNil(err1)
s.Error(err1)
_, ok := err1.(*gen.EntityNotExistsError)
s.True(ok, "EntityNotExistsError")
s.Nil(closedResp)

err2 := s.VisibilityMgr.RecordWorkflowExecutionClosed(&p.RecordWorkflowExecutionClosedRequest{
DomainUUID: testDomainUUID,
Expand Down
6 changes: 6 additions & 0 deletions common/persistence/sql/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func (m *sqlStore) GetName() string {
return m.db.DriverName()
}

func (m *sqlStore) Close() {
if m.db != nil {
m.db.Close()
}
}

func (m *sqlStore) txExecute(operation string, f func(tx *sqlx.Tx) error) error {
tx, err := m.db.Beginx()
if err != nil {
Expand Down
8 changes: 2 additions & 6 deletions common/persistence/sql/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
package sql

import (
"errors"

"fmt"
"sync"

Expand All @@ -47,8 +45,6 @@ type (
}
)

var errNoVisibility = errors.New("visibility not supported by SQL datastore")

// NewFactory returns an instance of a factory object which can be used to create
// datastores backed by any kind of SQL store
func NewFactory(cfg config.SQL, clusterName string, logger bark.Logger) *Factory {
Expand Down Expand Up @@ -101,7 +97,7 @@ func (f *Factory) NewExecutionStore(shardID int) (p.ExecutionStore, error) {

// NewVisibilityStore returns a visibility store
func (f *Factory) NewVisibilityStore() (p.VisibilityStore, error) {
return nil, errNoVisibility
return NewSQLVisibilityStore(f.cfg, f.logger)
}

// Close closes the factory
Expand Down Expand Up @@ -143,7 +139,7 @@ func newExecutionStoreFactory(cfg config.SQL, logger bark.Logger) (*executionSto
}

func (f *executionStoreFactory) new(shardID int) (p.ExecutionStore, error) {
return NewSQLMatchingPersistence(f.db, f.logger, shardID)
return NewSQLExecutionStore(f.db, f.logger, shardID)
}

// close closes the factory
Expand Down
Loading

0 comments on commit 73b753a

Please sign in to comment.