Skip to content

Commit

Permalink
feat: new conn api - Until (cloudwego#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
joway authored Jan 17, 2022
1 parent 3003ba0 commit 8ab6184
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 0 deletions.
20 changes: 20 additions & 0 deletions connection_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,26 @@ func (c *connection) Len() (length int) {
return c.inputBuffer.Len()
}

// Until implements Connection.
func (c *connection) Until(delim byte) (line []byte, err error) {
var n, l int
for {
if err = c.waitRead(n + 1); err != nil {
// return all the data in the buffer
line, _ = c.inputBuffer.Next(c.inputBuffer.Len())
return
}

l = c.inputBuffer.Len()
i := c.inputBuffer.indexByte(delim, n)
if i < 0 {
n = l //skip all exists bytes
continue
}
return c.Next(i + 1)
}
}

// ReadString implements Connection.
func (c *connection) ReadString(n int) (s string, err error) {
if err = c.waitRead(n); err != nil {
Expand Down
32 changes: 32 additions & 0 deletions connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package netpoll

import (
"context"
"errors"
"fmt"
"math/rand"
"runtime"
Expand Down Expand Up @@ -280,6 +281,37 @@ func TestSetTCPNoDelay(t *testing.T) {
MustTrue(t, n == 0)
}

func TestConnectionUntil(t *testing.T) {
r, w := GetSysFdPairs()
rconn, wconn := &connection{}, &connection{}
rconn.init(&netFD{fd: r}, nil)
wconn.init(&netFD{fd: w}, nil)
loopSize := 100000

msg := make([]byte, 1002)
msg[500], msg[1001] = '\n', '\n'
go func() {
for i := 0; i < loopSize; i++ {
n, err := wconn.Write(msg)
MustNil(t, err)
MustTrue(t, n == len(msg))
}
wconn.Write(msg[:100])
wconn.Close()
}()

for i := 0; i < loopSize*2; i++ {
buf, err := rconn.Reader().Until('\n')
MustNil(t, err)
Equal(t, len(buf), 501)
rconn.Reader().Release()
}

buf, err := rconn.Reader().Until('\n')
Equal(t, len(buf), 100)
MustTrue(t, errors.Is(err, ErrEOF))
}

func TestBookSizeLargerThanMaxSize(t *testing.T) {
r, w := GetSysFdPairs()
var rconn, wconn = &connection{}, &connection{}
Expand Down
7 changes: 7 additions & 0 deletions nocopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ type Reader interface {
// a faster implementation of Next when the next data is not used.
Skip(n int) (err error)

// Until reads until the first occurrence of delim in the input,
// returning a slice stops with delim in the input buffer.
// If Until encounters an error before finding a delimiter,
// it returns all the data in the buffer and the error itself (often ErrEOF or ErrConnClosed).
// Until returns err != nil only if line does not end in delim.
Until(delim byte) (line []byte, err error)

// ReadString is a faster implementation of Next when a string needs to be returned.
// It replaces:
//
Expand Down
42 changes: 42 additions & 0 deletions nocopy_linkbuffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package netpoll

import (
"bytes"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -260,6 +261,15 @@ func (b *LinkBuffer) ReadByte() (p byte, err error) {
}
}

// Until returns a slice ends with the delim in the buffer.
func (b *LinkBuffer) Until(delim byte) (line []byte, err error) {
n := b.indexByte(delim, 0)
if n < 0 {
return nil, fmt.Errorf("link buffer read slice cannot find: '%b'", delim)
}
return b.Next(n + 1)
}

// Slice returns a new LinkBuffer, which is a zero-copy slice of this LinkBuffer,
// and only holds the ability of Reader.
//
Expand Down Expand Up @@ -599,6 +609,38 @@ func (b *LinkBuffer) calcMaxSize() (sum int) {
return sum
}

// indexByte returns the index of the first instance of c in buffer, or -1 if c is not present in buffer.
func (b *LinkBuffer) indexByte(c byte, skip int) int {
size := b.Len()
if skip >= size {
return -1
}
var unread, n, l int
node := b.read
for unread = size; unread > 0; unread -= n {
l = node.Len()
if l >= unread { // last node
n = unread
} else { // read full node
n = l
}

// skip current node
if skip >= n {
skip -= n
node = node.next
continue
}
i := bytes.IndexByte(node.Peek(n)[skip:], c)
if i >= 0 {
return (size - unread) + skip + i // past_read + skip_read + index
}
skip = 0 // no skip bytes
node = node.next
}
return -1
}

// resetTail will reset tail node or add an empty tail node to
// guarantee the tail node is not larger than 8KB
func (b *LinkBuffer) resetTail(maxSize int) {
Expand Down
45 changes: 45 additions & 0 deletions nocopy_linkbuffer_race.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package netpoll

import (
"bytes"
"errors"
"fmt"
"reflect"
Expand Down Expand Up @@ -182,6 +183,17 @@ func (b *LinkBuffer) Skip(n int) (err error) {
return nil
}

// Until returns a slice ends with the delim in the buffer.
func (b *LinkBuffer) Until(delim byte) (line []byte, err error) {
b.Lock()
defer b.Unlock()
n := b.indexByte(delim, 0)
if n < 0 {
return nil, fmt.Errorf("link buffer cannot find delim: '%b'", delim)
}
return b.Next(n + 1)
}

// Release the node that has been read.
// b.flush == nil indicates that this LinkBuffer is created by LinkBuffer.Slice
func (b *LinkBuffer) Release() (err error) {
Expand Down Expand Up @@ -645,6 +657,39 @@ func (b *LinkBuffer) calcMaxSize() (sum int) {
return sum
}

func (b *LinkBuffer) indexByte(c byte, skip int) int {
b.Lock()
defer b.Unlock()
size := b.Len()
if skip >= size {
return -1
}
var unread, n, l int
node := b.read
for unread = size; unread > 0; unread -= n {
l = node.Len()
if l >= unread { // last node
n = unread
} else { // read full node
n = l
}

// skip current node
if skip >= n {
skip -= n
node = node.next
continue
}
i := bytes.IndexByte(node.Peek(n)[skip:], c)
if i >= 0 {
return (size - unread) + skip + i // past_read + skip_read + index
}
skip = 0 // no skip bytes
node = node.next
}
return -1
}

// resetTail will reset tail node or add an empty tail node to
// guarantee the tail node is not larger than 8KB
func (b *LinkBuffer) resetTail(maxSize int) {
Expand Down
30 changes: 30 additions & 0 deletions nocopy_linkbuffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,36 @@ func TestUnsafeStringToSlice(t *testing.T) {
Equal(t, string(bs), "hello world")
}

func TestLinkBufferIndexByte(t *testing.T) {
// clean & new
LinkBufferCap = 128
loopSize := 1000
trigger := make(chan struct{}, 16)

lb := NewLinkBuffer()
go func() {
for i := 0; i < loopSize; i++ {
buf, err := lb.Malloc(1002)
buf[500] = '\n'
buf[1001] = '\n'
MustNil(t, err)
lb.Flush()
trigger <- struct{}{}
}
}()

for i := 0; i < loopSize; i++ {
<-trigger
last := i * 1002
n := lb.indexByte('\n', 0+last)
Equal(t, n, 500+last)
n = lb.indexByte('\n', 500+last)
Equal(t, n, 500+last)
n = lb.indexByte('\n', 501+last)
Equal(t, n, 1001+last)
}
}

func BenchmarkStringToSliceByte(b *testing.B) {
b.StopTimer()
s := "hello world"
Expand Down
4 changes: 4 additions & 0 deletions nocopy_readwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (r *zcReader) ReadByte() (b byte, err error) {
return r.buf.ReadByte()
}

func (r *zcReader) Until(delim byte) (line []byte, err error) {
return r.buf.Until(delim)
}

func (r *zcReader) waitRead(n int) (err error) {
for r.buf.Len() < n {
err = r.fill(n)
Expand Down

0 comments on commit 8ab6184

Please sign in to comment.