Skip to content

Commit

Permalink
Introduce QueueManager to make seperation between store and manager (c…
Browse files Browse the repository at this point in the history
…adence-workflow#3605)

* Introduce QueueManager to make seperation between store and manager
  • Loading branch information
anish531213 authored Oct 8, 2020
1 parent 7367ab6 commit a90f6ee
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 26 deletions.
12 changes: 6 additions & 6 deletions common/persistence/cassandra/cassandraQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ func (q *nosqlQueue) ReadMessages(
ctx context.Context,
lastMessageID int64,
maxCount int,
) ([]*persistence.QueueMessage, error) {
) ([]*persistence.InternalQueueMessage, error) {
messages, err := q.db.SelectMessagesFrom(ctx, q.queueType, lastMessageID, maxCount)
if err != nil {
return nil, err
}
var result []*persistence.QueueMessage
var result []*persistence.InternalQueueMessage
for _, msg := range messages {
result = append(result, &persistence.QueueMessage{
result = append(result, &persistence.InternalQueueMessage{
ID: msg.ID,
QueueType: q.queueType,
Payload: msg.Payload,
Expand All @@ -200,7 +200,7 @@ func (q *nosqlQueue) ReadMessagesFromDLQ(
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]*persistence.QueueMessage, []byte, error) {
) ([]*persistence.InternalQueueMessage, []byte, error) {
response, err := q.db.SelectMessagesBetween(ctx, nosqlplugin.SelectMessagesBetweenRequest{
QueueType: q.getDLQTypeFromQueueType(),
ExclusiveBeginMessageID: firstMessageID,
Expand All @@ -211,9 +211,9 @@ func (q *nosqlQueue) ReadMessagesFromDLQ(
if err != nil {
return nil, nil, err
}
var result []*persistence.QueueMessage
var result []*persistence.InternalQueueMessage
for _, msg := range response.Rows {
result = append(result, &persistence.QueueMessage{
result = append(result, &persistence.InternalQueueMessage{
ID: msg.ID,
QueueType: msg.QueueType,
Payload: msg.Payload,
Expand Down
4 changes: 3 additions & 1 deletion common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,12 @@ func (f *factoryImpl) NewVisibilityManager() (p.VisibilityManager, error) {

func (f *factoryImpl) NewDomainReplicationQueue() (p.DomainReplicationQueue, error) {
ds := f.datastores[storeTypeQueue]
result, err := ds.factory.NewQueue(p.DomainReplicationQueueType)
store, err := ds.factory.NewQueue(p.DomainReplicationQueueType)
if err != nil {
return nil, err
}

result := p.NewQueueManager(store)
if ds.ratelimit != nil {
result = p.NewQueuePersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
}
Expand Down
23 changes: 23 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,6 +1623,29 @@ type (
ListDomains(ctx context.Context, request *ListDomainsRequest) (*ListDomainsResponse, error)
GetMetadata(ctx context.Context) (*GetMetadataResponse, error)
}

// QueueManager is used to manage queue store
QueueManager interface {
Closeable
EnqueueMessage(ctx context.Context, messagePayload []byte) error
ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*QueueMessage, error)
DeleteMessagesBefore(ctx context.Context, messageID int64) error
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error
GetAckLevels(ctx context.Context) (map[string]int64, error)
EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte) (int64, error)
ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*QueueMessage, []byte, error)
DeleteMessageFromDLQ(ctx context.Context, messageID int64) error
RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error
UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error
GetDLQAckLevels(ctx context.Context) (map[string]int64, error)
}

// QueueMessage is the message that stores in the queue
QueueMessage struct {
ID int64 `json:"message_id"`
QueueType QueueType `json:"queue_type"`
Payload []byte `json:"message_payload"`
}
)

func (e *InvalidPersistenceRequestError) Error() string {
Expand Down
4 changes: 2 additions & 2 deletions common/persistence/domainReplicationQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ DomainReplicationQueue = (*domainReplicationQueueImpl)(nil)

// NewDomainReplicationQueue creates a new DomainReplicationQueue instance
func NewDomainReplicationQueue(
queue Queue,
queue QueueManager,
clusterName string,
metricsClient metrics.Client,
logger log.Logger,
Expand All @@ -67,7 +67,7 @@ func NewDomainReplicationQueue(

type (
domainReplicationQueueImpl struct {
queue Queue
queue QueueManager
clusterName string
metricsClient metrics.Client
logger log.Logger
Expand Down
6 changes: 3 additions & 3 deletions common/persistence/persistenceInterface.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,20 +173,20 @@ type (
Queue interface {
Closeable
EnqueueMessage(ctx context.Context, messagePayload []byte) error
ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*QueueMessage, error)
ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*InternalQueueMessage, error)
DeleteMessagesBefore(ctx context.Context, messageID int64) error
UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error
GetAckLevels(ctx context.Context) (map[string]int64, error)
EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte) (int64, error)
ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*QueueMessage, []byte, error)
ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*InternalQueueMessage, []byte, error)
DeleteMessageFromDLQ(ctx context.Context, messageID int64) error
RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error
UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error
GetDLQAckLevels(ctx context.Context) (map[string]int64, error)
}

// QueueMessage is the message that stores in the queue
QueueMessage struct {
InternalQueueMessage struct {
ID int64 `json:"message_id"`
QueueType QueueType `json:"queue_type"`
Payload []byte `json:"message_payload"`
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/persistenceMetricClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type (

queuePersistenceClient struct {
metricClient metrics.Client
persistence Queue
persistence QueueManager
logger log.Logger
}
)
Expand All @@ -79,7 +79,7 @@ var _ TaskManager = (*taskPersistenceClient)(nil)
var _ HistoryManager = (*historyV2PersistenceClient)(nil)
var _ MetadataManager = (*metadataPersistenceClient)(nil)
var _ VisibilityManager = (*visibilityPersistenceClient)(nil)
var _ Queue = (*queuePersistenceClient)(nil)
var _ QueueManager = (*queuePersistenceClient)(nil)

// NewShardPersistenceMetricsClient creates a client to manage shards
func NewShardPersistenceMetricsClient(
Expand Down Expand Up @@ -161,10 +161,10 @@ func NewVisibilityPersistenceMetricsClient(

// NewQueuePersistenceMetricsClient creates a client to manage queue
func NewQueuePersistenceMetricsClient(
persistence Queue,
persistence QueueManager,
metricClient metrics.Client,
logger log.Logger,
) Queue {
) QueueManager {
return &queuePersistenceClient{
persistence: persistence,
metricClient: metricClient,
Expand Down
8 changes: 4 additions & 4 deletions common/persistence/persistenceRateLimitedClients.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type (

queueRateLimitedPersistenceClient struct {
rateLimiter quotas.Limiter
persistence Queue
persistence QueueManager
logger log.Logger
}
)
Expand All @@ -85,7 +85,7 @@ var _ TaskManager = (*taskRateLimitedPersistenceClient)(nil)
var _ HistoryManager = (*historyV2RateLimitedPersistenceClient)(nil)
var _ MetadataManager = (*metadataRateLimitedPersistenceClient)(nil)
var _ VisibilityManager = (*visibilityRateLimitedPersistenceClient)(nil)
var _ Queue = (*queueRateLimitedPersistenceClient)(nil)
var _ QueueManager = (*queueRateLimitedPersistenceClient)(nil)

// NewShardPersistenceRateLimitedClient creates a client to manage shards
func NewShardPersistenceRateLimitedClient(
Expand Down Expand Up @@ -167,10 +167,10 @@ func NewVisibilityPersistenceRateLimitedClient(

// NewQueuePersistenceRateLimitedClient creates a client to manage queue
func NewQueuePersistenceRateLimitedClient(
persistence Queue,
persistence QueueManager,
rateLimiter quotas.Limiter,
logger log.Logger,
) Queue {
) QueueManager {
return &queueRateLimitedPersistenceClient{
persistence: persistence,
rateLimiter: rateLimiter,
Expand Down
116 changes: 116 additions & 0 deletions common/persistence/queueManager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// The MIT License (MIT)
//
// Copyright (c) 2017-2020 Uber Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package persistence

import (
"context"
)

type (
queueManager struct {
persistence Queue
}
)

var _ QueueManager = (*queueManager)(nil)

// NewQueueManager returns a new QueueManager
func NewQueueManager(
persistence Queue,
) QueueManager {
return &queueManager{
persistence: persistence,
}
}

func (q *queueManager) Close() {
q.persistence.Close()
}

func (q *queueManager) EnqueueMessage(ctx context.Context, messagePayload []byte) error {
return q.persistence.EnqueueMessage(ctx, messagePayload)
}

func (q *queueManager) ReadMessages(ctx context.Context, lastMessageID int64, maxCount int) ([]*QueueMessage, error) {
resp, err := q.persistence.ReadMessages(ctx, lastMessageID, maxCount)
if err != nil {
return nil, err
}
var output []*QueueMessage
for _, message := range resp {
output = append(output, q.fromInternalQueueMessage(message))
}
return output, nil
}

func (q *queueManager) DeleteMessagesBefore(ctx context.Context, messageID int64) error {
return q.persistence.DeleteMessagesBefore(ctx, messageID)
}

func (q *queueManager) UpdateAckLevel(ctx context.Context, messageID int64, clusterName string) error {
return q.persistence.UpdateAckLevel(ctx, messageID, clusterName)
}

func (q *queueManager) GetAckLevels(ctx context.Context) (map[string]int64, error) {
return q.persistence.GetAckLevels(ctx)
}

func (q *queueManager) EnqueueMessageToDLQ(ctx context.Context, messagePayload []byte) (int64, error) {
return q.persistence.EnqueueMessageToDLQ(ctx, messagePayload)
}

func (q *queueManager) ReadMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte) ([]*QueueMessage, []byte, error) {
resp, data, err := q.persistence.ReadMessagesFromDLQ(ctx, firstMessageID, lastMessageID, pageSize, pageToken)
if resp == nil {
return nil, data, err
}
var output []*QueueMessage
for _, message := range resp {
output = append(output, q.fromInternalQueueMessage(message))
}
return output, data, err
}

func (q *queueManager) DeleteMessageFromDLQ(ctx context.Context, messageID int64) error {
return q.persistence.DeleteMessageFromDLQ(ctx, messageID)
}

func (q *queueManager) RangeDeleteMessagesFromDLQ(ctx context.Context, firstMessageID int64, lastMessageID int64) error {
return q.persistence.RangeDeleteMessagesFromDLQ(ctx, firstMessageID, lastMessageID)
}

func (q *queueManager) UpdateDLQAckLevel(ctx context.Context, messageID int64, clusterName string) error {
return q.persistence.UpdateDLQAckLevel(ctx, messageID, clusterName)
}

func (q *queueManager) GetDLQAckLevels(ctx context.Context) (map[string]int64, error) {
return q.persistence.GetDLQAckLevels(ctx)
}

func (q *queueManager) fromInternalQueueMessage(message *InternalQueueMessage) *QueueMessage {
return &QueueMessage{
ID: message.ID,
QueueType: message.QueueType,
Payload: message.Payload,
}
}
12 changes: 6 additions & 6 deletions common/persistence/sql/sqlQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,16 @@ func (q *sqlQueue) ReadMessages(
ctx context.Context,
lastMessageID int64,
maxCount int,
) ([]*persistence.QueueMessage, error) {
) ([]*persistence.InternalQueueMessage, error) {

rows, err := q.db.GetMessagesFromQueue(ctx, q.queueType, lastMessageID, maxCount)
if err != nil {
return nil, err
}

var messages []*persistence.QueueMessage
var messages []*persistence.InternalQueueMessage
for _, row := range rows {
messages = append(messages, &persistence.QueueMessage{ID: row.MessageID, Payload: row.MessagePayload})
messages = append(messages, &persistence.InternalQueueMessage{ID: row.MessageID, Payload: row.MessagePayload})
}
return messages, nil
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func (q *sqlQueue) ReadMessagesFromDLQ(
lastMessageID int64,
pageSize int,
pageToken []byte,
) ([]*persistence.QueueMessage, []byte, error) {
) ([]*persistence.InternalQueueMessage, []byte, error) {

if pageToken != nil && len(pageToken) != 0 {
lastReadMessageID, err := deserializePageToken(pageToken)
Expand All @@ -224,9 +224,9 @@ func (q *sqlQueue) ReadMessagesFromDLQ(
}
}

var messages []*persistence.QueueMessage
var messages []*persistence.InternalQueueMessage
for _, row := range rows {
messages = append(messages, &persistence.QueueMessage{ID: row.MessageID, Payload: row.MessagePayload})
messages = append(messages, &persistence.InternalQueueMessage{ID: row.MessageID, Payload: row.MessagePayload})
}

var newPagingToken []byte
Expand Down

0 comments on commit a90f6ee

Please sign in to comment.