Skip to content

Commit

Permalink
Merge pull request cloudwego#74 from Duslia997/feat/optimize_waitRead…
Browse files Browse the repository at this point in the history
…Size

optimize: fix waitReadSize and enhance trigger
  • Loading branch information
Duslia authored Nov 12, 2021
2 parents c474083 + 6345205 commit a4ab1b1
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 32 deletions.
5 changes: 2 additions & 3 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,11 +328,10 @@ func (c *connection) triggerWrite(err error) {

// waitRead will wait full n bytes.
func (c *connection) waitRead(n int) (err error) {
leftover := n - c.inputBuffer.Len()
if leftover <= 0 {
if n <= c.inputBuffer.Len() {
return nil
}
atomic.StoreInt32(&c.waitReadSize, int32(leftover))
atomic.StoreInt32(&c.waitReadSize, int32(n))
defer atomic.StoreInt32(&c.waitReadSize, 0)
if c.readTimeout > 0 {
return c.waitReadWithTimeout(n)
Expand Down
8 changes: 4 additions & 4 deletions connection_onevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func (c *connection) onPrepare(prepare OnPrepare) (err error) {
}

// onRequest is also responsible for executing the callbacks after the connection has been closed.
func (c *connection) onRequest() (err error) {
func (c *connection) onRequest() (needTrigger bool) {
var process = c.process.Load()
if process == nil {
return nil
return true
}
// Buffer has been fully processed, or task already exists
if !c.lock(processing) {
return nil
return true
}
// add new task
var task = func() {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (c *connection) onRequest() (err error) {
}
}
runTask(c.ctx, task)
return nil
return false
}

// closeCallback .
Expand Down
14 changes: 9 additions & 5 deletions connection_reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,15 @@ func (c *connection) inputAck(n int) (err error) {
if n < 0 {
n = 0
}
leftover := atomic.AddInt32(&c.waitReadSize, int32(-n))
err = c.inputBuffer.BookAck(n, leftover <= 0)
//FIXME: always trigger reader since waitReadSize may not correct when waitReading
c.triggerRead()
c.onRequest()
waitReadSize := int(atomic.LoadInt32(&c.waitReadSize))
length := c.inputBuffer.BookAck(n, waitReadSize)
var needTrigger = true
if length == n {
needTrigger = c.onRequest()
}
if needTrigger && length >= waitReadSize {
c.triggerRead()
}
return err
}

Expand Down
2 changes: 1 addition & 1 deletion connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestConnectionWaitReadHalfPacket(t *testing.T) {
for atomic.LoadInt32(&rconn.waitReadSize) <= 0 {
runtime.Gosched()
}
Equal(t, atomic.LoadInt32(&rconn.waitReadSize), int32(size/2))
Equal(t, atomic.LoadInt32(&rconn.waitReadSize), int32(size))
syscall.Write(w, msg[size/2:])
wg.Wait()
}
Expand Down
17 changes: 7 additions & 10 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ func (b *LinkBuffer) Book(min int, p [][]byte) (vs [][]byte) {
}

// BookAck will ack the first n malloc bytes and discard the rest.
func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
func (b *LinkBuffer) BookAck(n int, needSize int) (length int) {
var l int
for ack := n; ack > 0; ack = ack - l {
l = b.flush.malloc - len(b.flush.buf)
Expand All @@ -561,18 +561,16 @@ func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
node.off, node.malloc, node.refer, node.buf = 0, 0, 1, node.buf[:0]
}

// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if isEnd && cap(b.flush.buf) > pagesize {
length = b.recalLen(n)
if length >= needSize && cap(b.flush.buf) > pagesize {
// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if b.flush.next == nil {
b.flush.next = newLinkBufferNode(0)
}
b.flush = b.flush.next
}
b.write = b.flush

// re-cal length
b.recalLen(n)
return nil
return
}

// Reset resets the buffer to be empty,
Expand All @@ -587,9 +585,8 @@ func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
// }

// recalLen re-calculate the length
func (b *LinkBuffer) recalLen(delta int) (err error) {
atomic.AddInt32(&b.length, int32(delta))
return nil
func (b *LinkBuffer) recalLen(delta int) (length int) {
return int(atomic.AddInt32(&b.length, int32(delta)))
}

// ------------------------------------------ implement link node ------------------------------------------
Expand Down
16 changes: 7 additions & 9 deletions nocopy_linkbuffer_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (b *LinkBuffer) Book(min int, p [][]byte) (vs [][]byte) {
}

// BookAck will ack the first n malloc bytes and discard the rest.
func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
func (b *LinkBuffer) BookAck(n int, needSize int) (length int) {
b.Lock()
defer b.Unlock()
var l int
Expand All @@ -592,18 +592,17 @@ func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
for node := b.flush.next; node != nil; node = node.next {
node.off, node.malloc, node.refer, node.buf = 0, 0, 1, node.buf[:0]
}
// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if isEnd && cap(b.flush.buf) > pagesize {
length = b.recalLen(n)
if length >= needSize && cap(b.flush.buf) > pagesize {
// FIXME: The tail node must not be larger than 8KB to prevent Out Of Memory.
if b.flush.next == nil {
b.flush.next = newLinkBufferNode(0)
}
b.flush = b.flush.next
}
b.write = b.flush

// re-cal length
b.recalLen(n)
return nil
return
}

// Reset resets the buffer to be empty,
Expand All @@ -618,9 +617,8 @@ func (b *LinkBuffer) BookAck(n int, isEnd bool) (err error) {
// }

// recalLen re-calculate the length
func (b *LinkBuffer) recalLen(delta int) (err error) {
atomic.AddInt32(&b.length, int32(delta))
return nil
func (b *LinkBuffer) recalLen(delta int) (length int) {
return int(atomic.AddInt32(&b.length, int32(delta)))
}

// recalMallocLen re-calculate the malloc length
Expand Down

0 comments on commit a4ab1b1

Please sign in to comment.