Skip to content

Commit

Permalink
Add persistence layer for multi-cursor queue processing (updated) (ca…
Browse files Browse the repository at this point in the history
…dence-workflow#3468)

* Add persistence layer for multi-cursor queue processing

* Addressing PR comments and fixing sqlShardManager issue

* Gofmt sqlShardManager.go

* Update the idls to the latest
  • Loading branch information
emrahs authored Aug 21, 2020
1 parent 4f49f61 commit acb6b62
Show file tree
Hide file tree
Showing 15 changed files with 1,441 additions and 91 deletions.
857 changes: 845 additions & 12 deletions .gen/go/history/history.go

Large diffs are not rendered by default.

206 changes: 189 additions & 17 deletions .gen/go/sqlblobs/sqlblobs.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions common/persistence/cassandra/cassandraPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ const (
`timer_ack_level: ?, ` +
`cluster_transfer_ack_level: ?, ` +
`cluster_timer_ack_level: ?, ` +
`transfer_processing_queue_states: ?, ` +
`transfer_processing_queue_states_encoding: ?, ` +
`timer_processing_queue_states: ?, ` +
`timer_processing_queue_states_encoding: ?, ` +
`domain_notification_version: ?, ` +
`cluster_replication_level: ?, ` +
`replication_dlq_ack_level: ?, ` +
Expand Down Expand Up @@ -986,6 +990,8 @@ func (d *cassandraPersistence) CreateShard(request *p.CreateShardRequest) error
cqlNowTimestamp := p.UnixNanoToDBTimestamp(time.Now().UnixNano())
shardInfo := request.ShardInfo
markerData, markerEncoding := p.FromDataBlob(shardInfo.PendingFailoverMarkers)
transferPQS, transferPQSEncoding := p.FromDataBlob(shardInfo.TransferProcessingQueueStates)
timerPQS, timerPQSEncoding := p.FromDataBlob(shardInfo.TimerProcessingQueueStates)
query := d.session.Query(templateCreateShardQuery,
shardInfo.ShardID,
rowTypeShard,
Expand All @@ -1004,6 +1010,10 @@ func (d *cassandraPersistence) CreateShard(request *p.CreateShardRequest) error
shardInfo.TimerAckLevel,
shardInfo.ClusterTransferAckLevel,
shardInfo.ClusterTimerAckLevel,
transferPQS,
transferPQSEncoding,
timerPQS,
timerPQSEncoding,
shardInfo.DomainNotificationVersion,
shardInfo.ClusterReplicationLevel,
shardInfo.ReplicationDLQAckLevel,
Expand Down Expand Up @@ -1072,6 +1082,8 @@ func (d *cassandraPersistence) UpdateShard(request *p.UpdateShardRequest) error
cqlNowTimestamp := p.UnixNanoToDBTimestamp(time.Now().UnixNano())
shardInfo := request.ShardInfo
markerData, markerEncoding := p.FromDataBlob(shardInfo.PendingFailoverMarkers)
transferPQS, transferPQSEncoding := p.FromDataBlob(shardInfo.TransferProcessingQueueStates)
timerPQS, timerPQSEncoding := p.FromDataBlob(shardInfo.TimerProcessingQueueStates)

query := d.session.Query(templateUpdateShardQuery,
shardInfo.ShardID,
Expand All @@ -1084,6 +1096,10 @@ func (d *cassandraPersistence) UpdateShard(request *p.UpdateShardRequest) error
shardInfo.TimerAckLevel,
shardInfo.ClusterTransferAckLevel,
shardInfo.ClusterTimerAckLevel,
transferPQS,
transferPQSEncoding,
timerPQS,
timerPQSEncoding,
shardInfo.DomainNotificationVersion,
shardInfo.ClusterReplicationLevel,
shardInfo.ReplicationDLQAckLevel,
Expand Down
20 changes: 20 additions & 0 deletions common/persistence/cassandra/cassandraPersistenceUtil.go
Original file line number Diff line number Diff line change
Expand Up @@ -1775,6 +1775,10 @@ func createShardInfo(

var pendingFailoverMarkersRawData []byte
var pendingFailoverMarkersEncoding string
var transferProcessingQueueStatesRawData []byte
var transferProcessingQueueStatesEncoding string
var timerProcessingQueueStatesRawData []byte
var timerProcessingQueueStatesEncoding string
info := &p.ShardInfo{}
for k, v := range result {
switch k {
Expand All @@ -1798,6 +1802,14 @@ func createShardInfo(
info.ClusterTransferAckLevel = v.(map[string]int64)
case "cluster_timer_ack_level":
info.ClusterTimerAckLevel = v.(map[string]time.Time)
case "transfer_processing_queue_states":
transferProcessingQueueStatesRawData = v.([]byte)
case "transfer_processing_queue_states_encoding":
transferProcessingQueueStatesEncoding = v.(string)
case "timer_processing_queue_states":
timerProcessingQueueStatesRawData = v.([]byte)
case "timer_processing_queue_states_encoding":
timerProcessingQueueStatesEncoding = v.(string)
case "domain_notification_version":
info.DomainNotificationVersion = v.(int64)
case "cluster_replication_level":
Expand Down Expand Up @@ -1831,6 +1843,14 @@ func createShardInfo(
pendingFailoverMarkersRawData,
common.EncodingType(pendingFailoverMarkersEncoding),
)
info.TransferProcessingQueueStates = p.NewDataBlob(
transferProcessingQueueStatesRawData,
common.EncodingType(transferProcessingQueueStatesEncoding),
)
info.TimerProcessingQueueStates = p.NewDataBlob(
timerProcessingQueueStatesRawData,
common.EncodingType(timerProcessingQueueStatesEncoding),
)

return info
}
Expand Down
34 changes: 18 additions & 16 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,22 +242,24 @@ type (

// ShardInfo describes a shard
ShardInfo struct {
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"`
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *DataBlob `json:"transfer_processing_queue_states"`
TimerProcessingQueueStates *DataBlob `json:"timer_processing_queue_states"`
TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"`
}

// TransferFailoverLevel contains corresponding start / end level
Expand Down
121 changes: 113 additions & 8 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ import (
"testing"
"time"

"github.com/uber/cadence/common/checksum"

"github.com/pborman/uuid"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"

"github.com/uber/cadence/.gen/go/history"
gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/checksum"
"github.com/uber/cadence/common/cluster"
p "github.com/uber/cadence/common/persistence"
)
Expand Down Expand Up @@ -5134,6 +5134,9 @@ func (s *ExecutionManagerSuite) TestCreateGetShardBackfill() {
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], TimePrecision))
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], TimePrecision))
s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano())
s.Nil(resp.ShardInfo.TransferProcessingQueueStates)
s.Nil(resp.ShardInfo.TimerProcessingQueueStates)

resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel
resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt
resp.ShardInfo.ClusterTimerAckLevel = shardInfo.ClusterTimerAckLevel
Expand All @@ -5152,6 +5155,18 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
currentClusterTimerAck := timestampConvertor(time.Now().Add(-10 * time.Second))
alternativeClusterTimerAck := timestampConvertor(time.Now().Add(-20 * time.Second))
domainNotificationVersion := int64(8192)
transferPQS := createTransferPQS(cluster.TestCurrentClusterName, 0, currentClusterTransferAck,
cluster.TestAlternativeClusterName, 1, alternativeClusterTransferAck)
transferPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates(
&transferPQS,
common.EncodingTypeThriftRW,
)
timerPQS := createTimerPQS(cluster.TestCurrentClusterName, 0, currentClusterTimerAck,
cluster.TestAlternativeClusterName, 1, alternativeClusterTimerAck)
timerPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates(
&timerPQS,
common.EncodingTypeThriftRW,
)
shardInfo := &p.ShardInfo{
ShardID: shardID,
Owner: "some random owner",
Expand All @@ -5169,9 +5184,11 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
DomainNotificationVersion: domainNotificationVersion,
ClusterReplicationLevel: map[string]int64{},
ReplicationDLQAckLevel: map[string]int64{},
TransferProcessingQueueStates: transferPQSBlob,
TimerProcessingQueueStates: timerPQSBlob,
DomainNotificationVersion: domainNotificationVersion,
ClusterReplicationLevel: map[string]int64{},
ReplicationDLQAckLevel: map[string]int64{},
}
createRequest := &p.CreateShardRequest{
ShardInfo: shardInfo,
Expand All @@ -5183,9 +5200,20 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], TimePrecision))
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], TimePrecision))
s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano())
s.Equal(shardInfo.TransferProcessingQueueStates, resp.ShardInfo.TransferProcessingQueueStates)
s.Equal(shardInfo.TimerProcessingQueueStates, resp.ShardInfo.TimerProcessingQueueStates)
deserializedTransferPQS, err := s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TransferProcessingQueueStates)
s.Nil(err)
s.Equal(&transferPQS, deserializedTransferPQS)
deserializedTimerPQS, err := s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TimerProcessingQueueStates)
s.Nil(err)
s.Equal(&timerPQS, deserializedTimerPQS)

resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel
resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt
resp.ShardInfo.ClusterTimerAckLevel = shardInfo.ClusterTimerAckLevel
resp.ShardInfo.TransferProcessingQueueStates = shardInfo.TransferProcessingQueueStates
resp.ShardInfo.TimerProcessingQueueStates = shardInfo.TimerProcessingQueueStates
s.Equal(shardInfo, resp.ShardInfo)

// test update && get
Expand All @@ -5195,6 +5223,18 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
currentClusterTimerAck = timestampConvertor(time.Now().Add(-100 * time.Second))
alternativeClusterTimerAck = timestampConvertor(time.Now().Add(-200 * time.Second))
domainNotificationVersion = int64(16384)
transferPQS = createTransferPQS(cluster.TestCurrentClusterName, 0, currentClusterTransferAck,
cluster.TestAlternativeClusterName, 1, alternativeClusterTransferAck)
transferPQSBlob, _ = s.PayloadSerializer.SerializeProcessingQueueStates(
&transferPQS,
common.EncodingTypeThriftRW,
)
timerPQS = createTimerPQS(cluster.TestCurrentClusterName, 0, currentClusterTimerAck,
cluster.TestAlternativeClusterName, 1, alternativeClusterTimerAck)
timerPQSBlob, _ = s.PayloadSerializer.SerializeProcessingQueueStates(
&timerPQS,
common.EncodingTypeThriftRW,
)
shardInfo = &p.ShardInfo{
ShardID: shardID,
Owner: "some random owner",
Expand All @@ -5212,9 +5252,11 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
DomainNotificationVersion: domainNotificationVersion,
ClusterReplicationLevel: map[string]int64{cluster.TestAlternativeClusterName: 12345},
ReplicationDLQAckLevel: map[string]int64{},
TransferProcessingQueueStates: transferPQSBlob,
TimerProcessingQueueStates: timerPQSBlob,
DomainNotificationVersion: domainNotificationVersion,
ClusterReplicationLevel: map[string]int64{cluster.TestAlternativeClusterName: 12345},
ReplicationDLQAckLevel: map[string]int64{},
}
updateRequest := &p.UpdateShardRequest{
ShardInfo: shardInfo,
Expand All @@ -5228,9 +5270,20 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestCurrentClusterName], TimePrecision))
s.True(timeComparator(shardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], resp.ShardInfo.ClusterTimerAckLevel[cluster.TestAlternativeClusterName], TimePrecision))
s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano())
s.Equal(shardInfo.TransferProcessingQueueStates, resp.ShardInfo.TransferProcessingQueueStates)
s.Equal(shardInfo.TimerProcessingQueueStates, resp.ShardInfo.TimerProcessingQueueStates)
deserializedTransferPQS, err = s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TransferProcessingQueueStates)
s.Nil(err)
s.Equal(&transferPQS, deserializedTransferPQS)
deserializedTimerPQS, err = s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TimerProcessingQueueStates)
s.Nil(err)
s.Equal(&timerPQS, deserializedTimerPQS)

resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt
resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel
resp.ShardInfo.ClusterTimerAckLevel = shardInfo.ClusterTimerAckLevel
resp.ShardInfo.TransferProcessingQueueStates = shardInfo.TransferProcessingQueueStates
resp.ShardInfo.TimerProcessingQueueStates = shardInfo.TimerProcessingQueueStates
s.Equal(shardInfo, resp.ShardInfo)
}

Expand Down Expand Up @@ -5388,3 +5441,55 @@ func copyReplicationInfo(sourceInfo *p.ReplicationInfo) *p.ReplicationInfo {
LastEventID: sourceInfo.LastEventID,
}
}

func createTransferPQS(cluster1 string, level1 int32, ackLevel1 int64, cluster2 string, level2 int32, ackLevel2 int64) history.ProcessingQueueStates {
domainFilter := &history.DomainFilter{
DomainIDs: nil,
ReverseMatch: common.BoolPtr(true),
}
processingQueueStateMap := map[string][]*history.ProcessingQueueState{
cluster1: {
&history.ProcessingQueueState{
Level: common.Int32Ptr(level1),
AckLevel: common.Int64Ptr(ackLevel1),
MaxLevel: common.Int64Ptr(ackLevel1),
DomainFilter: domainFilter,
},
},
cluster2: {
&history.ProcessingQueueState{
Level: common.Int32Ptr(level2),
AckLevel: common.Int64Ptr(ackLevel2),
MaxLevel: common.Int64Ptr(ackLevel2),
DomainFilter: domainFilter,
},
},
}
return history.ProcessingQueueStates{StatesByCluster: processingQueueStateMap}
}

func createTimerPQS(cluster1 string, level1 int32, ackLevel1 time.Time, cluster2 string, level2 int32, ackLevel2 time.Time) history.ProcessingQueueStates {
domainFilter := &history.DomainFilter{
DomainIDs: []string{},
ReverseMatch: common.BoolPtr(true),
}
processingQueueStateMap := map[string][]*history.ProcessingQueueState{
cluster1: {
&history.ProcessingQueueState{
Level: common.Int32Ptr(level1),
AckLevel: common.Int64Ptr(ackLevel1.UnixNano()),
MaxLevel: common.Int64Ptr(ackLevel1.UnixNano()),
DomainFilter: domainFilter,
},
},
cluster2: {
&history.ProcessingQueueState{
Level: common.Int32Ptr(level2),
AckLevel: common.Int64Ptr(ackLevel2.UnixNano()),
MaxLevel: common.Int64Ptr(ackLevel2.UnixNano()),
DomainFilter: domainFilter,
},
},
}
return history.ProcessingQueueStates{StatesByCluster: processingQueueStateMap}
}
Loading

0 comments on commit acb6b62

Please sign in to comment.