Skip to content

Commit

Permalink
Convert UnboundedQueue to UnboundedDeque (ava-labs#1878)
Browse files Browse the repository at this point in the history
Co-authored-by: Stephen Buttolph <[email protected]>
  • Loading branch information
Dan Laine and StephenButtolph authored Oct 20, 2022
1 parent d747a96 commit d913e13
Show file tree
Hide file tree
Showing 11 changed files with 833 additions and 616 deletions.
9 changes: 4 additions & 5 deletions chains/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ type manager struct {
registrants []Registrant

// queue that holds chain create requests
chainsQueue buffer.UnboundedBlockingQueue[ChainParameters]
chainsQueue buffer.BlockingDeque[ChainParameters]
// unblocks chain creator to start processing the queue
unblockChainCreatorCh chan struct{}
chainCreatorShutdownCh chan struct{}
Expand All @@ -234,13 +234,12 @@ type manager struct {

// New returns a new Manager
func New(config *ManagerConfig) Manager {
queue := buffer.NewUnboundedSliceQueue[ChainParameters](initialQueueSize)
return &manager{
Aliaser: ids.NewAliaser(),
ManagerConfig: *config,
subnets: make(map[ids.ID]Subnet),
chains: make(map[ids.ID]handler.Handler),
chainsQueue: buffer.NewUnboundedBlockingQueue(queue),
chainsQueue: buffer.NewUnboundedBlockingDeque[ChainParameters](initialQueueSize),
unblockChainCreatorCh: make(chan struct{}),
chainCreatorShutdownCh: make(chan struct{}),
}
Expand All @@ -252,7 +251,7 @@ func (m *manager) Router() router.Router { return m.ManagerConfig.Router }
// QueueChainCreation queues a chain creation request
// Invariant: Whitelisted Subnet must be checked before calling this function
func (m *manager) QueueChainCreation(chainParams ChainParameters) {
if ok := m.chainsQueue.Enqueue(chainParams); !ok {
if ok := m.chainsQueue.PushRight(chainParams); !ok {
m.Log.Debug("cannot enqueue new chain",
zap.Stringer("chainID", chainParams.ID),
)
Expand Down Expand Up @@ -1031,7 +1030,7 @@ func (m *manager) dispatchChainCreator(platform ChainParameters) {
// Get the next chain we should create.
// Dequeue waits until an element is pushed, so this is not
// busy-looping.
chainParams, ok := m.chainsQueue.Dequeue()
chainParams, ok := m.chainsQueue.PopLeft()
if !ok { // queue is closed, return directly
return
}
Expand Down
10 changes: 5 additions & 5 deletions network/peer/message_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type throttledMessageQueue struct {

// queue of the messages
// [cond.L] must be held while accessing [queue].
queue buffer.UnboundedQueue[message.OutboundMessage]
queue buffer.Deque[message.OutboundMessage]
}

func NewThrottledMessageQueue(
Expand All @@ -82,7 +82,7 @@ func NewThrottledMessageQueue(
log: log,
outboundMsgThrottler: outboundMsgThrottler,
cond: sync.NewCond(&sync.Mutex{}),
queue: buffer.NewUnboundedSliceQueue[message.OutboundMessage](initialQueueSize),
queue: buffer.NewUnboundedDeque[message.OutboundMessage](initialQueueSize),
}
}

Expand Down Expand Up @@ -129,7 +129,7 @@ func (q *throttledMessageQueue) Push(ctx context.Context, msg message.OutboundMe
return false
}

q.queue.Enqueue(msg)
q.queue.PushRight(msg)
q.cond.Signal()
return true
}
Expand Down Expand Up @@ -166,7 +166,7 @@ func (q *throttledMessageQueue) PopNow() (message.OutboundMessage, bool) {
}

func (q *throttledMessageQueue) pop() message.OutboundMessage {
msg, _ := q.queue.Dequeue()
msg, _ := q.queue.PopLeft()

q.outboundMsgThrottler.Release(msg, q.id)
return msg
Expand All @@ -183,7 +183,7 @@ func (q *throttledMessageQueue) Close() {
q.closed = true

for q.queue.Len() > 0 {
msg, _ := q.queue.Dequeue()
msg, _ := q.queue.PopLeft()
q.outboundMsgThrottler.Release(msg, q.id)
q.onFailed.SendFailed(msg)
}
Expand Down
149 changes: 149 additions & 0 deletions utils/buffer/unbounded_blocking_deque.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package buffer

import (
"sync"

"github.com/ava-labs/avalanchego/utils"
)

var _ BlockingDeque[int] = &unboundedBlockingDeque[int]{}

type BlockingDeque[T any] interface {
Deque[T]

// Close and empty the deque.
Close()
}

// Returns a new unbounded deque with the given initial size.
// Note that the returned deque is always empty -- [initSize] is just
// a hint to prevent unnecessary resizing.
func NewUnboundedBlockingDeque[T any](initSize int) BlockingDeque[T] {
q := &unboundedBlockingDeque[T]{
Deque: NewUnboundedDeque[T](initSize),
}
q.cond = sync.NewCond(&q.lock)
return q
}

type unboundedBlockingDeque[T any] struct {
lock sync.RWMutex
cond *sync.Cond
closed bool

Deque[T]
}

// If the deque is closed returns false.
func (q *unboundedBlockingDeque[T]) PushRight(elt T) bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.closed {
return false
}

// Add the item to the queue
q.Deque.PushRight(elt)

// Signal a waiting thread
q.cond.Signal()
return true
}

// If the deque is closed returns false.
func (q *unboundedBlockingDeque[T]) PopRight() (T, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

for {
if q.closed {
return utils.Zero[T](), false
}
if q.Deque.Len() != 0 {
return q.Deque.PopRight()
}
q.cond.Wait()
}
}

func (q *unboundedBlockingDeque[T]) PeekRight() (T, bool) {
q.lock.RLock()
defer q.lock.RUnlock()

if q.closed {
return utils.Zero[T](), false
}
return q.Deque.PeekRight()
}

// If the deque is closed returns false.
func (q *unboundedBlockingDeque[T]) PushLeft(elt T) bool {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.closed {
return false
}

// Add the item to the queue
q.Deque.PushLeft(elt)

// Signal a waiting thread
q.cond.Signal()
return true
}

// If the deque is closed returns false.
func (q *unboundedBlockingDeque[T]) PopLeft() (T, bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()

for {
if q.closed {
return utils.Zero[T](), false
}
if q.Deque.Len() != 0 {
return q.Deque.PopLeft()
}
q.cond.Wait()
}
}

func (q *unboundedBlockingDeque[T]) PeekLeft() (T, bool) {
q.lock.RLock()
defer q.lock.RUnlock()

if q.closed {
return utils.Zero[T](), false
}
return q.Deque.PeekLeft()
}

func (q *unboundedBlockingDeque[T]) Len() int {
q.lock.RLock()
defer q.lock.RUnlock()

if q.closed {
return 0
}
return q.Deque.Len()
}

func (q *unboundedBlockingDeque[T]) Close() {
q.cond.L.Lock()
defer q.cond.L.Unlock()

if q.closed {
return
}

q.Deque = nil

// Mark the queue as closed
q.closed = true
q.cond.Broadcast()
}
69 changes: 69 additions & 0 deletions utils/buffer/unbounded_blocking_deque_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package buffer

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestUnboundedBlockingDequePush(t *testing.T) {
require := require.New(t)

deque := NewUnboundedBlockingDeque[int](2)

ok := deque.PushRight(1)
require.True(ok)
ok = deque.PushRight(2)
require.True(ok)

ch, ok := deque.PopLeft()
require.True(ok)
require.Equal(1, ch)
}

func TestUnboundedBlockingDequePop(t *testing.T) {
require := require.New(t)

deque := NewUnboundedBlockingDeque[int](2)

ok := deque.PushRight(1)
require.True(ok)

ch, ok := deque.PopLeft()
require.True(ok)
require.Equal(1, ch)

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
ch, ok := deque.PopLeft()
require.True(ok)
require.Equal(2, ch)
wg.Done()
}()

ok = deque.PushRight(2)
require.True(ok)
wg.Wait()
}

func TestUnboundedBlockingDequeClose(t *testing.T) {
require := require.New(t)

deque := NewUnboundedBlockingDeque[int](2)

ok := deque.PushLeft(1)
require.True(ok)

deque.Close()

_, ok = deque.PopRight()
require.False(ok)

ok = deque.PushLeft(1)
require.False(ok)
}
Loading

0 comments on commit d913e13

Please sign in to comment.