Skip to content

Commit

Permalink
typo: rename SharedQueue to ShardQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
Hchenn committed Nov 1, 2021
1 parent 768bc51 commit c474083
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 44 deletions.
82 changes: 41 additions & 41 deletions mux/shared_queue.go → mux/shard_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ import (
)

/* DOC:
* SharedQueue uses the netpoll's nocopy API to merge and send data.
* The Data Flush is passively triggered by SharedQueue.Add and does not require user operations.
* ShardQueue uses the netpoll's nocopy API to merge and send data.
* The Data Flush is passively triggered by ShardQueue.Add and does not require user operations.
* If there is an error in the data transmission, the connection will be closed.
*
* SharedQueue.Add: add the data to be sent.
* NewSharedQueue: create a queue with netpoll.Connection.
* SharedSize: the recommended number of shards is 32.
* ShardQueue.Add: add the data to be sent.
* NewShardQueue: create a queue with netpoll.Connection.
* ShardSize: the recommended number of shards is 32.
*/
const SharedSize = 32
const ShardSize = 32

// NewSharedQueue .
func NewSharedQueue(size int32, conn netpoll.Connection) (queue *SharedQueue) {
queue = &SharedQueue{
// NewShardQueue .
func NewShardQueue(size int32, conn netpoll.Connection) (queue *ShardQueue) {
queue = &ShardQueue{
conn: conn,
size: size,
getters: make([][]WriterGetter, size),
Expand All @@ -50,11 +50,11 @@ func NewSharedQueue(size int32, conn netpoll.Connection) (queue *SharedQueue) {
// WriterGetter is used to get a netpoll.Writer.
type WriterGetter func() (buf netpoll.Writer, isNil bool)

// SharedQueue uses the netpoll's nocopy API to merge and send data.
// The Data Flush is passively triggered by SharedQueue.Add and does not require user operations.
// ShardQueue uses the netpoll's nocopy API to merge and send data.
// The Data Flush is passively triggered by ShardQueue.Add and does not require user operations.
// If there is an error in the data transmission, the connection will be closed.
// SharedQueue.Add: add the data to be sent.
type SharedQueue struct {
// ShardQueue.Add: add the data to be sent.
type ShardQueue struct {
conn netpoll.Connection
idx, size int32
getters [][]WriterGetter // len(getters) = size
Expand All @@ -63,45 +63,45 @@ type SharedQueue struct {
trigger, runNum int32
}

// Add adds to q.getters[shared]
func (q *SharedQueue) Add(gts ...WriterGetter) {
shared := atomic.AddInt32(&q.idx, 1) % q.size
q.lock(shared)
trigger := len(q.getters[shared]) == 0
q.getters[shared] = append(q.getters[shared], gts...)
q.unlock(shared)
// Add adds to q.getters[shard]
func (q *ShardQueue) Add(gts ...WriterGetter) {
shard := atomic.AddInt32(&q.idx, 1) % q.size
q.lock(shard)
trigger := len(q.getters[shard]) == 0
q.getters[shard] = append(q.getters[shard], gts...)
q.unlock(shard)
if trigger {
q.triggering(shared)
q.triggering(shard)
}
}

// triggering shared.
func (q *SharedQueue) triggering(shared int32) {
// triggering shard.
func (q *ShardQueue) triggering(shard int32) {
if atomic.AddInt32(&q.trigger, 1) > 1 {
return
}
q.foreach(shared)
q.foreach(shard)
}

// foreach swap r & w. It's not concurrency safe.
func (q *SharedQueue) foreach(shared int32) {
func (q *ShardQueue) foreach(shard int32) {
if atomic.AddInt32(&q.runNum, 1) > 1 {
return
}
go func() {
var tmp []WriterGetter
for ; atomic.LoadInt32(&q.trigger) > 0; shared = (shared + 1) % q.size {
for ; atomic.LoadInt32(&q.trigger) > 0; shard = (shard + 1) % q.size {
// lock & swap
q.lock(shared)
if len(q.getters[shared]) == 0 {
q.unlock(shared)
q.lock(shard)
if len(q.getters[shard]) == 0 {
q.unlock(shard)
continue
}
// swap
tmp = q.getters[shared]
q.getters[shared] = q.swap[:0]
tmp = q.getters[shard]
q.getters[shard] = q.swap[:0]
q.swap = tmp
q.unlock(shared)
q.unlock(shard)
atomic.AddInt32(&q.trigger, -1)

// deal
Expand All @@ -112,13 +112,13 @@ func (q *SharedQueue) foreach(shared int32) {
// quit & check again
atomic.StoreInt32(&q.runNum, 0)
if atomic.LoadInt32(&q.trigger) > 0 {
q.foreach(shared)
q.foreach(shard)
}
}()
}

// deal is used to get deal of netpoll.Writer.
func (q *SharedQueue) deal(gts []WriterGetter) {
func (q *ShardQueue) deal(gts []WriterGetter) {
writer := q.conn.Writer()
for _, gt := range gts {
buf, isNil := gt()
Expand All @@ -133,22 +133,22 @@ func (q *SharedQueue) deal(gts []WriterGetter) {
}

// flush is used to flush netpoll.Writer.
func (q *SharedQueue) flush() {
func (q *ShardQueue) flush() {
err := q.conn.Writer().Flush()
if err != nil {
q.conn.Close()
return
}
}

// lock shared.
func (q *SharedQueue) lock(shared int32) {
for !atomic.CompareAndSwapInt32(&q.locks[shared], 0, 1) {
// lock shard.
func (q *ShardQueue) lock(shard int32) {
for !atomic.CompareAndSwapInt32(&q.locks[shard], 0, 1) {
runtime.Gosched()
}
}

// unlock shared.
func (q *SharedQueue) unlock(shared int32) {
atomic.StoreInt32(&q.locks[shared], 0)
// unlock shard.
func (q *ShardQueue) unlock(shard int32) {
atomic.StoreInt32(&q.locks[shard], 0)
}
6 changes: 3 additions & 3 deletions mux/shared_queue_test.go → mux/shard_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/cloudwego/netpoll"
)

func TestShareQueue(t *testing.T) {
func TestShardQueue(t *testing.T) {
var svrConn net.Conn

network, address := "tcp", ":1234"
Expand Down Expand Up @@ -53,7 +53,7 @@ func TestShareQueue(t *testing.T) {
}

// test
queue := NewSharedQueue(4, conn)
queue := NewShardQueue(4, conn)
count, pkgsize := 16, 11
for i := 0; i < int(count); i++ {
var getter WriterGetter = func() (buf netpoll.Writer, isNil bool) {
Expand All @@ -72,6 +72,6 @@ func TestShareQueue(t *testing.T) {
}

// TODO: need mock flush
func BenchmarkShareQueue(b *testing.B) {
func BenchmarkShardQueue(b *testing.B) {
b.Skip()
}

0 comments on commit c474083

Please sign in to comment.