Skip to content

Commit

Permalink
Minor changes to improve readability of history's queue folder (caden…
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir authored Dec 21, 2023
1 parent 3986fcd commit feb54ff
Show file tree
Hide file tree
Showing 16 changed files with 607 additions and 525 deletions.
25 changes: 25 additions & 0 deletions service/history/queue/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:

// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.

// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package queue

const (
defaultProcessingQueueLevel = 0
)
113 changes: 7 additions & 106 deletions service/history/queue/processing_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,6 @@ var (
)

type (
processingQueueStateImpl struct {
level int
ackLevel task.Key
readLevel task.Key
maxLevel task.Key
domainFilter DomainFilter
}

processingQueueImpl struct {
state *processingQueueStateImpl
outstandingTasks map[task.Key]task.Task
Expand All @@ -54,39 +46,6 @@ type (
}
)

// NewProcessingQueueState creates a new state instance for processing queue
// readLevel will be set to the same value as ackLevel
func NewProcessingQueueState(
level int,
ackLevel task.Key,
maxLevel task.Key,
domainFilter DomainFilter,
) ProcessingQueueState {
return newProcessingQueueState(
level,
ackLevel,
ackLevel,
maxLevel,
domainFilter,
)
}

func newProcessingQueueState(
level int,
ackLevel task.Key,
readLevel task.Key,
maxLevel task.Key,
domainFilter DomainFilter,
) *processingQueueStateImpl {
return &processingQueueStateImpl{
level: level,
ackLevel: ackLevel,
readLevel: readLevel,
maxLevel: maxLevel,
domainFilter: domainFilter,
}
}

// NewProcessingQueue creates a new processing queue based on its state
func NewProcessingQueue(
state ProcessingQueueState,
Expand Down Expand Up @@ -134,39 +93,11 @@ func newProcessingQueue(
return queue
}

func (s *processingQueueStateImpl) Level() int {
return s.level
}

func (s *processingQueueStateImpl) MaxLevel() task.Key {
return s.maxLevel
}

func (s *processingQueueStateImpl) AckLevel() task.Key {
return s.ackLevel
}

func (s *processingQueueStateImpl) ReadLevel() task.Key {
return s.readLevel
}

func (s *processingQueueStateImpl) DomainFilter() DomainFilter {
return s.domainFilter
}

func (s *processingQueueStateImpl) String() string {
return fmt.Sprintf("&{level: %+v, ackLevel: %+v, readLevel: %+v, maxLevel: %+v, domainFilter: %+v}",
s.level, s.ackLevel, s.readLevel, s.maxLevel, s.domainFilter,
)
}

func (q *processingQueueImpl) State() ProcessingQueueState {
return q.state
}

func (q *processingQueueImpl) Split(
policy ProcessingQueueSplitPolicy,
) []ProcessingQueue {
func (q *processingQueueImpl) Split(policy ProcessingQueueSplitPolicy) []ProcessingQueue {
newQueueStates := policy.Evaluate(q)
if len(newQueueStates) == 0 {
// no need to split, return self
Expand All @@ -176,9 +107,7 @@ func (q *processingQueueImpl) Split(
return splitProcessingQueue([]*processingQueueImpl{q}, newQueueStates, q.logger, q.metricsClient)
}

func (q *processingQueueImpl) Merge(
queue ProcessingQueue,
) []ProcessingQueue {
func (q *processingQueueImpl) Merge(queue ProcessingQueue) []ProcessingQueue {
q1, q2 := q, queue.(*processingQueueImpl)

if q1.State().Level() != q2.State().Level() {
Expand Down Expand Up @@ -246,10 +175,7 @@ func (q *processingQueueImpl) Merge(
return splitProcessingQueue([]*processingQueueImpl{q1, q2}, newQueueStates, q.logger, q.metricsClient)
}

func (q *processingQueueImpl) AddTasks(
tasks map[task.Key]task.Task,
newReadLevel task.Key,
) {
func (q *processingQueueImpl) AddTasks(tasks map[task.Key]task.Task, newReadLevel task.Key) {
if newReadLevel.Less(q.state.readLevel) {
q.logger.Fatal("processing queue read level moved backward", tag.Error(
fmt.Errorf("current read level: %v, new read level: %v", q.state.readLevel, newReadLevel),
Expand Down Expand Up @@ -411,51 +337,26 @@ func splitProcessingQueue(
return newQueues
}

func taskBelongsToProcessQueue(
state ProcessingQueueState,
key task.Key,
task task.Task,
) bool {
func taskBelongsToProcessQueue(state ProcessingQueueState, key task.Key, task task.Task) bool {
return state.DomainFilter().Filter(task.GetDomainID()) &&
state.AckLevel().Less(key) &&
!state.MaxLevel().Less(key)
}

func taskKeyEquals(
key1 task.Key,
key2 task.Key,
) bool {
func taskKeyEquals(key1 task.Key, key2 task.Key) bool {
return !key1.Less(key2) && !key2.Less(key1)
}

func minTaskKey(
key1 task.Key,
key2 task.Key,
) task.Key {
func minTaskKey(key1 task.Key, key2 task.Key) task.Key {
if key1.Less(key2) {
return key1
}
return key2
}

func maxTaskKey(
key1 task.Key,
key2 task.Key,
) task.Key {
func maxTaskKey(key1 task.Key, key2 task.Key) task.Key {
if key1.Less(key2) {
return key2
}
return key1
}

func copyQueueState(
state ProcessingQueueState,
) *processingQueueStateImpl {
return newProcessingQueueState(
state.Level(),
state.AckLevel(),
state.ReadLevel(),
state.MaxLevel(),
state.DomainFilter(),
)
}
47 changes: 33 additions & 14 deletions service/history/queue/processing_queue_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,16 @@ import (
"fmt"
"sort"

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/service/history/task"
)

type (
processingQueueCollection struct {
level int
queues []ProcessingQueue
activeQueue ProcessingQueue
}
)
type processingQueueCollection struct {
level int
queues []ProcessingQueue
activeQueue ProcessingQueue
}

// NewProcessingQueueCollection creates a new collection for non-overlapping queues
func NewProcessingQueueCollection(level int, queues []ProcessingQueue) ProcessingQueueCollection {
Expand All @@ -43,10 +43,33 @@ func NewProcessingQueueCollection(level int, queues []ProcessingQueue) Processin
queues: queues,
}
queueCollection.resetActiveQueue()

return queueCollection
}

func newProcessingQueueCollections(
processingQueueStates []ProcessingQueueState,
logger log.Logger,
metricsClient metrics.Client,
) []ProcessingQueueCollection {
processingQueuesMap := make(map[int][]ProcessingQueue) // level -> state
for _, queueState := range processingQueueStates {
processingQueuesMap[queueState.Level()] = append(processingQueuesMap[queueState.Level()], NewProcessingQueue(
queueState,
logger,
metricsClient,
))
}
processingQueueCollections := make([]ProcessingQueueCollection, 0, len(processingQueuesMap))
for level, queues := range processingQueuesMap {
processingQueueCollections = append(processingQueueCollections, NewProcessingQueueCollection(
level,
queues,
))
}

return processingQueueCollections
}

func (c *processingQueueCollection) Level() int {
return c.level
}
Expand Down Expand Up @@ -113,9 +136,7 @@ func (c *processingQueueCollection) UpdateAckLevels() (task.Key, int) {
return minAckLevel, totalPendingTasks
}

func (c *processingQueueCollection) Split(
policy ProcessingQueueSplitPolicy,
) []ProcessingQueue {
func (c *processingQueueCollection) Split(policy ProcessingQueueSplitPolicy) []ProcessingQueue {
if len(c.queues) == 0 {
return nil
}
Expand All @@ -142,9 +163,7 @@ func (c *processingQueueCollection) Split(
return nextLevelQueues
}

func (c *processingQueueCollection) Merge(
incomingQueues []ProcessingQueue,
) {
func (c *processingQueueCollection) Merge(incomingQueues []ProcessingQueue) {
sortProcessingQueue(incomingQueues)

newQueues := make([]ProcessingQueue, 0, len(c.queues)+len(incomingQueues))
Expand Down
104 changes: 104 additions & 0 deletions service/history/queue/processing_queue_state.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package queue

import (
"fmt"

"github.com/uber/cadence/service/history/task"
)

type processingQueueStateImpl struct {
level int
ackLevel task.Key
readLevel task.Key
maxLevel task.Key
domainFilter DomainFilter
}

// NewProcessingQueueState creates a new state instance for processing queue
// readLevel will be set to the same value as ackLevel
func NewProcessingQueueState(
level int,
ackLevel task.Key,
maxLevel task.Key,
domainFilter DomainFilter,
) ProcessingQueueState {
return newProcessingQueueState(
level,
ackLevel,
ackLevel,
maxLevel,
domainFilter,
)
}

func newProcessingQueueState(
level int,
ackLevel task.Key,
readLevel task.Key,
maxLevel task.Key,
domainFilter DomainFilter,
) *processingQueueStateImpl {
return &processingQueueStateImpl{
level: level,
ackLevel: ackLevel,
readLevel: readLevel,
maxLevel: maxLevel,
domainFilter: domainFilter,
}
}

func (s *processingQueueStateImpl) Level() int {
return s.level
}

func (s *processingQueueStateImpl) MaxLevel() task.Key {
return s.maxLevel
}

func (s *processingQueueStateImpl) AckLevel() task.Key {
return s.ackLevel
}

func (s *processingQueueStateImpl) ReadLevel() task.Key {
return s.readLevel
}

func (s *processingQueueStateImpl) DomainFilter() DomainFilter {
return s.domainFilter
}

func (s *processingQueueStateImpl) String() string {
return fmt.Sprintf("&{level: %+v, ackLevel: %+v, readLevel: %+v, maxLevel: %+v, domainFilter: %+v}",
s.level, s.ackLevel, s.readLevel, s.maxLevel, s.domainFilter,
)
}

func copyQueueState(state ProcessingQueueState) *processingQueueStateImpl {
return newProcessingQueueState(
state.Level(),
state.AckLevel(),
state.ReadLevel(),
state.MaxLevel(),
state.DomainFilter(),
)
}
Loading

0 comments on commit feb54ff

Please sign in to comment.