Skip to content

Commit

Permalink
db & schema change for timer / transfer queue cross DC support (caden…
Browse files Browse the repository at this point in the history
  • Loading branch information
wxing1292 authored Mar 23, 2018
1 parent 8200f58 commit bdeb6d4
Show file tree
Hide file tree
Showing 15 changed files with 253 additions and 94 deletions.
5 changes: 3 additions & 2 deletions common/persistence/cassandraMetadataPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/stretchr/testify/suite"

gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/cluster"
)

type (
Expand Down Expand Up @@ -110,12 +111,12 @@ func (m *metadataPersistenceSuite) TestCreateDomain() {
m.Equal(owner, resp1.Info.OwnerEmail)
m.Equal(retention, resp1.Config.Retention)
m.Equal(emitMetric, resp1.Config.EmitMetric)
m.Equal(testCurrentClusterName, resp1.ReplicationConfig.ActiveClusterName)
m.Equal(cluster.TestCurrentClusterName, resp1.ReplicationConfig.ActiveClusterName)
m.Equal(1, len(resp1.ReplicationConfig.Clusters))
m.Equal(isGlobalDomain, resp1.IsGlobalDomain)
m.Equal(configVersion, resp1.ConfigVersion)
m.Equal(failoverVersion, resp1.FailoverVersion)
m.True(resp1.ReplicationConfig.Clusters[0].ClusterName == testCurrentClusterName)
m.True(resp1.ReplicationConfig.Clusters[0].ClusterName == cluster.TestCurrentClusterName)
m.Equal(int64(0), resp1.DBVersion)

resp2, err2 := m.CreateDomain(
Expand Down
51 changes: 38 additions & 13 deletions common/persistence/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,11 @@ const (
`range_id: ?, ` +
`stolen_since_renew: ?, ` +
`updated_at: ?, ` +
`transfer_ack_level: ?, ` +
`replication_ack_level: ?, ` +
`timer_ack_level: ?` +
`transfer_ack_level: ?, ` +
`timer_ack_level: ?, ` +
`cluster_transfer_ack_level: ?, ` +
`cluster_timer_ack_level: ?` +
`}`

templateWorkflowExecutionType = `{` +
Expand Down Expand Up @@ -663,15 +665,16 @@ var (

type (
cassandraPersistence struct {
session *gocql.Session
shardID int
logger bark.Logger
session *gocql.Session
shardID int
currentClusterName string
logger bark.Logger
}
)

// NewCassandraShardPersistence is used to create an instance of ShardManager implementation
func NewCassandraShardPersistence(hosts string, port int, user, password, dc string, keyspace string,
logger bark.Logger) (ShardManager, error) {
currentClusterName string, logger bark.Logger) (ShardManager, error) {
cluster := common.NewCassandraCluster(hosts, port, user, password, dc)
cluster.Keyspace = keyspace
cluster.ProtoVersion = cassandraProtoVersion
Expand All @@ -684,7 +687,7 @@ func NewCassandraShardPersistence(hosts string, port int, user, password, dc str
return nil, err
}

return &cassandraPersistence{shardID: -1, session: session, logger: logger}, nil
return &cassandraPersistence{shardID: -1, session: session, currentClusterName: currentClusterName, logger: logger}, nil
}

// NewCassandraWorkflowExecutionPersistence is used to create an instance of workflowExecutionManager implementation
Expand Down Expand Up @@ -733,9 +736,11 @@ func (d *cassandraPersistence) CreateShard(request *CreateShardRequest) error {
shardInfo.RangeID,
shardInfo.StolenSinceRenew,
cqlNowTimestamp,
shardInfo.TransferAckLevel,
shardInfo.ReplicationAckLevel,
shardInfo.TransferAckLevel,
shardInfo.TimerAckLevel,
shardInfo.ClusterTransferAckLevel,
shardInfo.ClusterTimerAckLevel,
shardInfo.RangeID)

previous := make(map[string]interface{})
Expand Down Expand Up @@ -790,7 +795,7 @@ func (d *cassandraPersistence) GetShard(request *GetShardRequest) (*GetShardResp
}
}

info := createShardInfo(result["shard"].(map[string]interface{}))
info := createShardInfo(d.currentClusterName, result["shard"].(map[string]interface{}))

return &GetShardResponse{ShardInfo: info}, nil
}
Expand All @@ -805,9 +810,11 @@ func (d *cassandraPersistence) UpdateShard(request *UpdateShardRequest) error {
shardInfo.RangeID,
shardInfo.StolenSinceRenew,
cqlNowTimestamp,
shardInfo.TransferAckLevel,
shardInfo.ReplicationAckLevel,
shardInfo.TransferAckLevel,
shardInfo.TimerAckLevel,
shardInfo.ClusterTransferAckLevel,
shardInfo.ClusterTimerAckLevel,
shardInfo.RangeID,
shardInfo.ShardID,
rowTypeShard,
Expand Down Expand Up @@ -2030,6 +2037,9 @@ func (d *cassandraPersistence) GetTimerIndexTasks(request *GetTimerIndexTasksReq

response.Timers = append(response.Timers, t)
}
nextPageToken := iter.PageState()
response.NextPageToken = make([]byte, len(nextPageToken))
copy(response.NextPageToken, nextPageToken)

if err := iter.Close(); err != nil {
if isThrottlingError(err) {
Expand Down Expand Up @@ -2468,7 +2478,7 @@ func (d *cassandraPersistence) updateBufferedEvents(batch *gocql.Batch, newBuffe
}
}

func createShardInfo(result map[string]interface{}) *ShardInfo {
func createShardInfo(currentCluster string, result map[string]interface{}) *ShardInfo {
info := &ShardInfo{}
for k, v := range result {
switch k {
Expand All @@ -2482,12 +2492,27 @@ func createShardInfo(result map[string]interface{}) *ShardInfo {
info.StolenSinceRenew = v.(int)
case "updated_at":
info.UpdatedAt = v.(time.Time)
case "transfer_ack_level":
info.TransferAckLevel = v.(int64)
case "replication_ack_level":
info.ReplicationAckLevel = v.(int64)
case "transfer_ack_level":
info.TransferAckLevel = v.(int64)
case "timer_ack_level":
info.TimerAckLevel = v.(time.Time)
case "cluster_transfer_ack_level":
info.ClusterTransferAckLevel = v.(map[string]int64)
case "cluster_timer_ack_level":
info.ClusterTimerAckLevel = v.(map[string]time.Time)
}
}

if info.ClusterTransferAckLevel == nil {
info.ClusterTransferAckLevel = map[string]int64{
currentCluster: info.TransferAckLevel,
}
}
if info.ClusterTimerAckLevel == nil {
info.ClusterTimerAckLevel = map[string]time.Time{
currentCluster: info.TimerAckLevel,
}
}

Expand Down
123 changes: 121 additions & 2 deletions common/persistence/cassandraPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
)

type (
Expand Down Expand Up @@ -872,9 +873,10 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 := s.UpdateWorkflowExecution(updatedInfo, []int64{int64(4)}, nil, int64(3), tasks, nil, nil, nil, nil, nil)
s.Nil(err2, "No error expected.")

timerTasks, err1 := s.GetTimerIndexTasks()
timerTasks, nextToken, err1 := s.GetTimerIndexTasks()
s.Nil(err1, "No error expected.")
s.NotNil(timerTasks, "expected valid list of tasks.")
s.Equal(0, len(nextToken))
s.Equal(3, len(timerTasks))
s.Equal(TaskTypeWorkflowTimeout, timerTasks[1].TaskType)
s.Equal(TaskTypeDeleteHistoryEvent, timerTasks[2].TaskType)
Expand All @@ -889,9 +891,10 @@ func (s *cassandraPersistenceSuite) TestTimerTasks() {
err2 = s.CompleteTimerTask(timerTasks[2].VisibilityTimestamp, timerTasks[2].TaskID)
s.Nil(err2, "No error expected.")

timerTasks2, err2 := s.GetTimerIndexTasks()
timerTasks2, nextToken, err2 := s.GetTimerIndexTasks()
s.Nil(err2, "No error expected.")
s.Empty(timerTasks2, "expected empty task list.")
s.Equal(0, len(nextToken))
}

func (s *cassandraPersistenceSuite) TestWorkflowMutableState_Activities() {
Expand Down Expand Up @@ -1587,6 +1590,122 @@ func copyWorkflowExecutionInfo(sourceInfo *WorkflowExecutionInfo) *WorkflowExecu
}
}

func (s *cassandraPersistenceSuite) TestCreateGetShard_Backfill() {
shardID := 4
rangeID := int64(59)

// test create && get
currentReplicationAck := int64(27)
currentClusterTransferAck := int64(21)
currentClusterTimerAck := timestampConvertor(time.Now().Add(-10 * time.Second))
shardInfo := &ShardInfo{
ShardID: shardID,
Owner: "some random owner",
RangeID: rangeID,
StolenSinceRenew: 12,
UpdatedAt: timestampConvertor(time.Now()),
ReplicationAckLevel: currentReplicationAck,
TransferAckLevel: currentClusterTransferAck,
TimerAckLevel: currentClusterTimerAck,
}
createRequest := &CreateShardRequest{
ShardInfo: shardInfo,
}
s.Nil(s.ShardMgr.CreateShard(createRequest))

shardInfo.ClusterTransferAckLevel = map[string]int64{
s.ClusterMetadata.GetCurrentClusterName(): currentClusterTransferAck,
}
shardInfo.ClusterTimerAckLevel = map[string]time.Time{
s.ClusterMetadata.GetCurrentClusterName(): currentClusterTimerAck,
}
resp, err := s.ShardMgr.GetShard(&GetShardRequest{ShardID: shardID})
s.Nil(err)
s.Equal(shardInfo, resp.ShardInfo)
}

func (s *cassandraPersistenceSuite) TestCreateGetUpdateGetShard() {
shardID := 8
rangeID := int64(59)

// test create && get
currentReplicationAck := int64(27)
currentClusterTransferAck := int64(21)
alternativeClusterTransferAck := int64(32)
currentClusterTimerAck := timestampConvertor(time.Now().Add(-10 * time.Second))
alternativeClusterTimerAck := timestampConvertor(time.Now().Add(-20 * time.Second))
shardInfo := &ShardInfo{
ShardID: shardID,
Owner: "some random owner",
RangeID: rangeID,
StolenSinceRenew: 12,
UpdatedAt: timestampConvertor(time.Now()),
ReplicationAckLevel: currentReplicationAck,
TransferAckLevel: currentClusterTransferAck,
TimerAckLevel: currentClusterTimerAck,
ClusterTransferAckLevel: map[string]int64{
cluster.TestCurrentClusterName: currentClusterTransferAck,
cluster.TestAlternativeClusterName: alternativeClusterTransferAck,
},
ClusterTimerAckLevel: map[string]time.Time{
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
}
createRequest := &CreateShardRequest{
ShardInfo: shardInfo,
}
s.Nil(s.ShardMgr.CreateShard(createRequest))

resp, err := s.ShardMgr.GetShard(&GetShardRequest{ShardID: shardID})
s.Nil(err)
s.Equal(shardInfo, resp.ShardInfo)

// test update && get
currentReplicationAck = int64(270)
currentClusterTransferAck = int64(210)
alternativeClusterTransferAck = int64(320)
currentClusterTimerAck = timestampConvertor(time.Now().Add(-100 * time.Second))
alternativeClusterTimerAck = timestampConvertor(time.Now().Add(-200 * time.Second))
shardInfo = &ShardInfo{
ShardID: shardID,
Owner: "some random owner",
RangeID: int64(28),
StolenSinceRenew: 4,
UpdatedAt: timestampConvertor(time.Now()),
ReplicationAckLevel: currentReplicationAck,
TransferAckLevel: currentClusterTransferAck,
TimerAckLevel: currentClusterTimerAck,
ClusterTransferAckLevel: map[string]int64{
cluster.TestCurrentClusterName: currentClusterTransferAck,
cluster.TestAlternativeClusterName: alternativeClusterTransferAck,
},
ClusterTimerAckLevel: map[string]time.Time{
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
}
updateRequest := &UpdateShardRequest{
ShardInfo: shardInfo,
PreviousRangeID: rangeID,
}
s.Nil(s.ShardMgr.UpdateShard(updateRequest))

resp, err = s.ShardMgr.GetShard(&GetShardRequest{ShardID: shardID})
s.Nil(err)
s.Equal(shardInfo, resp.ShardInfo)
}

// Note: cassandra only provide milisecond precision timestamp
// ref: https://docs.datastax.com/en/cql/3.3/cql/cql_reference/timestamp_type_r.html
// so to use equal function, we need to do conversion, getting rid of sub miliseconds
func timestampConvertor(t time.Time) time.Time {
return time.Unix(
0,
common.CQLTimestampToUnixNano(common.UnixNanoToCQLTimestamp(t.UnixNano())),
).UTC()
}

func copyReplicationState(sourceState *ReplicationState) *ReplicationState {
state := &ReplicationState{
CurrentVersion: sourceState.CurrentVersion,
Expand Down
21 changes: 12 additions & 9 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ type (

// ShardInfo describes a shard
ShardInfo struct {
ShardID int
Owner string
RangeID int64
StolenSinceRenew int
UpdatedAt time.Time
TransferAckLevel int64
ReplicationAckLevel int64
TimerAckLevel time.Time
ShardID int
Owner string
RangeID int64
StolenSinceRenew int
UpdatedAt time.Time
ReplicationAckLevel int64
TransferAckLevel int64 // TO BE DEPRECATED IN FAVOR OF ClusterTransferAckLevel
TimerAckLevel time.Time // TO BE DEPRECATED IN FAVOR OF ClusteerTimerAckLevel
ClusterTransferAckLevel map[string]int64
ClusterTimerAckLevel map[string]time.Time
}

// WorkflowExecutionInfo describes a workflow execution
Expand Down Expand Up @@ -642,7 +644,8 @@ type (

// GetTimerIndexTasksResponse is the response for GetTimerIndexTasks
GetTimerIndexTasksResponse struct {
Timers []*TimerTaskInfo
Timers []*TimerTaskInfo
NextPageToken []byte
}

// SerializedHistoryEventBatch represents a serialized batch of history events
Expand Down
28 changes: 10 additions & 18 deletions common/persistence/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,12 @@ import (
)

const (
testWorkflowClusterHosts = "127.0.0.1"
testPort = 0
testUser = ""
testPassword = ""
testDatacenter = ""
testSchemaDir = "../.."
testInitialFailoverVersion = int64(0)
testFailoverVersionIncrement = int64(10)
testCurrentClusterName = "current-cluster"
testAlternativeClusterName = "alternative-cluster"
)

var (
testAllClusterNames = []string{testCurrentClusterName, testAlternativeClusterName}
testWorkflowClusterHosts = "127.0.0.1"
testPort = 0
testUser = ""
testPassword = ""
testDatacenter = ""
testSchemaDir = "../.."
)

type (
Expand Down Expand Up @@ -131,7 +123,7 @@ func (s *TestBase) SetupWorkflowStoreWithOptions(options TestBaseOptions) {
shardID := 0
var err error
s.ShardMgr, err = NewCassandraShardPersistence(options.ClusterHost, options.ClusterPort, options.ClusterUser,
options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, log)
options.ClusterPassword, options.Datacenter, s.CassandraTestCluster.keyspace, s.ClusterMetadata.GetCurrentClusterName(), log)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -753,17 +745,17 @@ func (s *TestBase) CompleteReplicationTask(taskID int64) error {
}

// GetTimerIndexTasks is a utility method to get tasks from transfer task queue
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, error) {
func (s *TestBase) GetTimerIndexTasks() ([]*TimerTaskInfo, []byte, error) {
response, err := s.WorkflowMgr.GetTimerIndexTasks(&GetTimerIndexTasksRequest{
MinTimestamp: time.Time{},
MaxTimestamp: time.Unix(0, math.MaxInt64),
BatchSize: 10})

if err != nil {
return nil, err
return nil, nil, err
}

return response.Timers, nil
return response.Timers, response.NextPageToken, nil
}

// CompleteTimerTask is a utility method to complete a timer task
Expand Down
Loading

0 comments on commit bdeb6d4

Please sign in to comment.