Skip to content

Commit

Permalink
persistence: fixes for mysql (cadence-workflow#1184)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Oct 18, 2018
1 parent 36851f2 commit 4eb6818
Show file tree
Hide file tree
Showing 15 changed files with 504 additions and 170 deletions.
33 changes: 2 additions & 31 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions common/persistence/executionStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package persistence

import (
"github.com/prometheus/common/log"
"github.com/uber-common/bark"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
Expand Down Expand Up @@ -354,7 +353,7 @@ func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos(infos []*Child
newInfos := make([]*InternalChildExecutionInfo, 0)
for _, v := range infos {
if v.InitiatedEvent == nil {
log.Fatalf("nil InitiatedEvent for %v", v.InitiatedID)
m.logger.Fatalf("nil InitiatedEvent for %v", v.InitiatedID)
}
initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding)
if err != nil {
Expand Down Expand Up @@ -382,7 +381,7 @@ func (m *executionManagerImpl) SerializeUpsertActivityInfos(infos []*ActivityInf
newInfos := make([]*InternalActivityInfo, 0)
for _, v := range infos {
if v.ScheduledEvent == nil {
log.Fatal("SerializeUpsertActivityInfos ScheduledEvent is required")
m.logger.Fatal("SerializeUpsertActivityInfos ScheduledEvent is required")
}
scheduledEvent, err := m.serializer.SerializeEvent(v.ScheduledEvent, encoding)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package persistencetests
import (
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
"time"
Expand All @@ -35,7 +36,6 @@ import (
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
p "github.com/uber/cadence/common/persistence"
"sync"
)

type (
Expand Down Expand Up @@ -2675,7 +2675,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
s.Equal(int64(2333), ti.Version)
s.Equal("t1", ti.TimerID)
s.Equal(int64(1), ti.StartedID)
s.Equal(expiryTime.Unix(), ti.ExpiryTime.Unix())
s.EqualTimes(expiryTime, ti.ExpiryTime)
s.Equal(int64(500), ti.TaskID)

ti, ok = state1.TimerInfos["t2"]
Expand All @@ -2684,7 +2684,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
s.Equal(int64(2333), ti.Version)
s.Equal("t2", ti.TimerID)
s.Equal(int64(2), ti.StartedID)
s.Equal(expiryTime.Unix(), ti.ExpiryTime.Unix())
s.EqualTimes(expiryTime, ti.ExpiryTime)
s.Equal(int64(501), ti.TaskID)

ti, ok = state1.TimerInfos["t3"]
Expand All @@ -2693,7 +2693,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
s.Equal(int64(2333), ti.Version)
s.Equal("t3", ti.TimerID)
s.Equal(int64(3), ti.StartedID)
s.Equal(expiryTime.Unix(), ti.ExpiryTime.Unix())
s.EqualTimes(expiryTime, ti.ExpiryTime)
s.Equal(int64(502), ti.TaskID)

s.Equal(1, len(state1.ChildExecutionInfos))
Expand Down Expand Up @@ -2840,7 +2840,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
s.Equal(int64(3333), ti.Version)
s.Equal("t1_new", ti.TimerID)
s.Equal(int64(1), ti.StartedID)
s.Equal(expiryTime.Unix(), ti.ExpiryTime.Unix())
s.EqualTimes(expiryTime, ti.ExpiryTime)
s.Equal(int64(600), ti.TaskID)

ti, ok = state4.TimerInfos["t2_new"]
Expand All @@ -2849,7 +2849,7 @@ func (s *ExecutionManagerSuite) TestResetMutableStateCurrentIsSelf() {
s.Equal(int64(3333), ti.Version)
s.Equal("t2_new", ti.TimerID)
s.Equal(int64(2), ti.StartedID)
s.Equal(expiryTime.Unix(), ti.ExpiryTime.Unix())
s.EqualTimes(expiryTime, ti.ExpiryTime)
s.Equal(int64(601), ti.TaskID)

s.Equal(1, len(state4.ChildExecutionInfos))
Expand Down Expand Up @@ -3039,6 +3039,8 @@ func (s *ExecutionManagerSuite) TestCreateGetShardBackfill() {
s.True(timeComparator(shardInfo.UpdatedAt, resp.ShardInfo.UpdatedAt, TimePrecision))
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], TimePrecision))
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], TimePrecision))
s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano())
resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel
resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt
resp.ShardInfo.ClusterTimerAckLevel = shardInfo.ClusterTimerAckLevel
s.Equal(shardInfo, resp.ShardInfo)
Expand Down Expand Up @@ -3084,6 +3086,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
s.True(timeComparator(shardInfo.UpdatedAt, resp.ShardInfo.UpdatedAt, TimePrecision))
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], TimePrecision))
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], TimePrecision))
s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano())
resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel
resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt
resp.ShardInfo.ClusterTimerAckLevel = shardInfo.ClusterTimerAckLevel
s.Equal(shardInfo, resp.ShardInfo)
Expand Down Expand Up @@ -3125,7 +3129,9 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
s.True(timeComparator(shardInfo.UpdatedAt, resp.ShardInfo.UpdatedAt, TimePrecision))
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], TimePrecision))
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], TimePrecision))
s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano())
resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt
resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel
resp.ShardInfo.ClusterTimerAckLevel = shardInfo.ClusterTimerAckLevel
s.Equal(shardInfo, resp.ShardInfo)
}
Expand Down
80 changes: 51 additions & 29 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
package persistencetests

import (
"github.com/stretchr/testify/suite"
"math"
"math/rand"
"sync/atomic"
"time"

"github.com/stretchr/testify/suite"

"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"github.com/uber-common/bark"
Expand All @@ -48,9 +49,15 @@ type (

// TestBaseOptions options to configure workflow test base.
TestBaseOptions struct {
DBPort int
DBName string
SchemaDir string
Cassandra struct {
DBPort int
SchemaDir string
}
SQL struct {
DBPort int
SchemaDir string
}
// TODO this is used for global domain test
// when crtoss DC is public, remove EnableGlobalDomain
EnableGlobalDomain bool // is global domain enabled
Expand All @@ -61,21 +68,22 @@ type (
// TestBase wraps the base setup needed to create workflows over persistence layer.
TestBase struct {
suite.Suite
ShardMgr p.ShardManager
ExecutionMgrFactory pfactory.Factory
ExecutionManager p.ExecutionManager
TaskMgr p.TaskManager
HistoryMgr p.HistoryManager
MetadataManager p.MetadataManager
MetadataManagerV2 p.MetadataManager
MetadataProxy p.MetadataManager
VisibilityMgr p.VisibilityManager
ShardInfo *p.ShardInfo
TaskIDGenerator TransferTaskIDGenerator
ClusterMetadata cluster.Metadata
ReadLevel int64
ReplicationReadLevel int64
PersistenceTestCluster PersistenceTestCluster
ShardMgr p.ShardManager
ExecutionMgrFactory pfactory.Factory
ExecutionManager p.ExecutionManager
TaskMgr p.TaskManager
HistoryMgr p.HistoryManager
MetadataManager p.MetadataManager
MetadataManagerV2 p.MetadataManager
MetadataProxy p.MetadataManager
VisibilityMgr p.VisibilityManager
ShardInfo *p.ShardInfo
TaskIDGenerator TransferTaskIDGenerator
ClusterMetadata cluster.Metadata
ReadLevel int64
ReplicationReadLevel int64
DefaultTestCluster PersistenceTestCluster
VisibilityTestCluster PersistenceTestCluster
}

// PersistenceTestCluster exposes management operations on a database
Expand All @@ -101,26 +109,32 @@ func NewTestBaseWithCassandra(options *TestBaseOptions) TestBase {
if options.DBName == "" {
options.DBName = GenerateRandomDBName(10)
}
testCluster := cassandra.NewTestCluster(options.DBPort, options.DBName, options.SchemaDir)
return newTestBase(options, testCluster)
testCluster := cassandra.NewTestCluster(options.Cassandra.DBPort, options.DBName, options.Cassandra.SchemaDir)
return newTestBase(options, testCluster, testCluster)
}

// NewTestBaseWithSQL returns a new persistence test base backed by SQL
func NewTestBaseWithSQL(options *TestBaseOptions) TestBase {
if options.DBName == "" {
options.DBName = GenerateRandomDBName(10)
}
testCluster := sql.NewTestCluster(options.DBPort, options.DBName, options.SchemaDir)
return newTestBase(options, testCluster)
defaultCluster := sql.NewTestCluster(options.SQL.DBPort, options.DBName, options.SQL.SchemaDir)
visibilityCluster := cassandra.NewTestCluster(options.Cassandra.DBPort, options.DBName, options.Cassandra.SchemaDir)
return newTestBase(options, defaultCluster, visibilityCluster)
}

func newTestBase(options *TestBaseOptions, testCluster PersistenceTestCluster) TestBase {
func newTestBase(options *TestBaseOptions,
defaultCluster PersistenceTestCluster, visibilityCluster PersistenceTestCluster) TestBase {
metadata := options.ClusterMetadata
if metadata == nil {
metadata = cluster.GetTestClusterMetadata(options.EnableGlobalDomain, options.IsMasterCluster)
}
options.ClusterMetadata = metadata
return TestBase{PersistenceTestCluster: testCluster, ClusterMetadata: metadata}
return TestBase{
DefaultTestCluster: defaultCluster,
VisibilityTestCluster: visibilityCluster,
ClusterMetadata: metadata,
}
}

// Setup sets up the test base, must be called as part of SetupSuite
Expand All @@ -130,9 +144,12 @@ func (s *TestBase) Setup() {
clusterName := s.ClusterMetadata.GetCurrentClusterName()
log := bark.NewLoggerFromLogrus(log.New())

s.PersistenceTestCluster.SetupTestDatabase()
s.DefaultTestCluster.SetupTestDatabase()
if s.VisibilityTestCluster != s.DefaultTestCluster {
s.VisibilityTestCluster.SetupTestDatabase()
}

cfg := s.PersistenceTestCluster.Config()
cfg := s.DefaultTestCluster.Config()
factory := pfactory.New(&cfg, clusterName, nil, log)

s.TaskMgr, err = factory.NewTaskManager()
Expand All @@ -157,10 +174,15 @@ func (s *TestBase) Setup() {
s.ExecutionManager, err = factory.NewExecutionManager(shardID)
s.fatalOnError("NewExecutionManager", err)

visibilityFactory := factory
if s.VisibilityTestCluster != s.DefaultTestCluster {
vCfg := s.VisibilityTestCluster.Config()
visibilityFactory = pfactory.New(&vCfg, clusterName, nil, log)
}
// SQL currently doesn't have support for visibility manager
s.VisibilityMgr, err = factory.NewVisibilityManager()
s.VisibilityMgr, err = visibilityFactory.NewVisibilityManager()
if err != nil {
log.Warn("testBase.Setup: error creating visibility manager: %v", err)
s.fatalOnError("NewVisibilityManager", err)
}

s.ReadLevel = 0
Expand Down Expand Up @@ -1075,7 +1097,7 @@ func (s *TestBase) CompleteTask(domainID, taskList string, taskType int, taskID

// TearDownWorkflowStore to cleanup
func (s *TestBase) TearDownWorkflowStore() {
s.PersistenceTestCluster.TearDownTestDatabase()
s.DefaultTestCluster.TearDownTestDatabase()
}

// GetNextSequenceNumber generates a unique sequence number for can be used for transfer queue taskId
Expand Down
1 change: 1 addition & 0 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func NewDataBlob(data []byte, encodingType common.EncodingType) *DataBlob {
}
}

// FromDataBlob decodes a datablob into a (payload, encodingType) tuple
func FromDataBlob(blob *DataBlob) ([]byte, string) {
if blob == nil || len(blob.Data) == 0 {
return nil, ""
Expand Down
Loading

0 comments on commit 4eb6818

Please sign in to comment.