Skip to content

Commit

Permalink
优化timeout和keepAlive
Browse files Browse the repository at this point in the history
  • Loading branch information
injoyai committed Feb 19, 2023
1 parent 38c077b commit ccae0a1
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 106 deletions.
2 changes: 1 addition & 1 deletion buf/buf_message_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (this *messageReader) ReadMessage() ([]byte, error) {
return this.readFunc(this.buf)
}

func NewMessageReader(reader io.Reader, fn ReadFunc) *messageReader {
func NewMessageReader(reader io.Reader, fn ReadFunc) MessageReader {
m := &messageReader{buf: bufio.NewReader(reader)}
m.SetReadFunc(fn)
return m
Expand Down
12 changes: 1 addition & 11 deletions buf/buf_read_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package buf

import (
"bufio"
"fmt"
"io"
"time"
)
Expand All @@ -27,16 +26,6 @@ func ReadWithAll(buf *bufio.Reader) (bytes []byte, err error) {
}
}

// ReadWithAllSafe 安全读取, todo 好像bufio.Read出现过一次数组越界 , 待确认
func ReadWithAllSafe(buf *bufio.Reader) (bytes []byte, err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("%v", e)
}
}()
return ReadWithAll(buf)
}

// ReadWithLine 读取一行
func ReadWithLine(buf *bufio.Reader) (bytes []byte, err error) {
bytes, _, err = buf.ReadLine()
Expand Down Expand Up @@ -75,6 +64,7 @@ func NewReadWithTimeout(timeout time.Duration) ReadFunc {
return f.ReadMessage
}

// NewReadWithFrame 根据Frame配置读取数据
func NewReadWithFrame(f *Frame) ReadFunc {
return f.ReadMessage
}
91 changes: 38 additions & 53 deletions io_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,8 @@ func NewClientWithContext(ctx context.Context, i ReadWriteCloser) *Client {
IWriter: NewWriter(i),
i: i,
tag: maps.NewSafe(),
timerKeep: time.NewTimer(0),
timer: time.NewTimer(0),
timeout: 0,
}

c.SetKey(fmt.Sprintf("%p", i))

<-c.timer.C
<-c.timerKeep.C

return c
}

Expand All @@ -79,13 +71,10 @@ type Client struct {
*IReadCloser
*IWriter

i ReadWriteCloser //接口
tag *maps.Safe //标签
timer *time.Timer //超时定时器,时间范围内没有发送数据或者接收数据,则断开链接
timeout time.Duration //超时时间
timerKeep *time.Timer //正常通讯不发送心跳
keepAlive time.Duration //保持连接
createTime time.Time //创建时间,链接成功时间
i ReadWriteCloser //接口
tag *maps.Safe //标签
keepAliveCancel context.CancelFunc //keep上下文
createTime time.Time //创建时间,链接成功时间
}

//================================Nature================================
Expand All @@ -111,7 +100,12 @@ func (this *Client) Tag() *maps.Safe {
}

// GetTag 获取一个tag
func (this *Client) GetTag(key interface{}) interface{} {
func (this *Client) GetTag(key interface{}) (interface{}, bool) {
return this.tag.Get(key)
}

// MustGetTag 获取一个tag
func (this *Client) MustGetTag(key interface{}) interface{} {
return this.tag.MustGet(key)
}

Expand All @@ -129,7 +123,7 @@ func (this *Client) SetKey(key string) *Client {

// GetKey 获取唯一标识
func (this *Client) GetKey() string {
return this.IReadCloser.key
return this.IReadCloser.GetKey()
}

// Debug 调试模式,打印日志
Expand Down Expand Up @@ -188,19 +182,37 @@ func (this *Client) GoFor(interval time.Duration, fn func(c *Client) error) {
}(this.ParentCtx(), this)
}

//================================SetFunc================================

// SetTimeout 设置超时时间
func (this *Client) SetTimeout(timeout time.Duration) *Client {
this.timeout = timeout
if timeout <= 0 {
this.timer.Stop()
} else {
this.timer.Reset(timeout)
// SetKeepAlive 设置连接保持,另外起了携程,服务器不需要,客户端再起一个也没啥问题
// TCP keepalive定义于RFC 1122,但并不是TCP规范中的一部分,默认必需是关闭,连接方不一定支持
func (this *Client) SetKeepAlive(t time.Duration, keeps ...[]byte) *Client {
if t > 0 {
if this.keepAliveCancel != nil {
//关闭老的keepAlive
this.keepAliveCancel()
}
ctx, cancel := context.WithCancel(this.Ctx())
this.keepAliveCancel = cancel
keep := conv.GetDefaultBytes([]byte(Ping), keeps...)
go func(ctx context.Context) {
timer := time.NewTimer(t)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-timer.C:
if _, err := this.Write(keep); err != nil {
return
}
}
}
}(ctx)
}
return this
}

//================================SetFunc================================

// SetDealFunc 设置处理数据函数
func (this *Client) SetDealFunc(fn func(msg *IMessage)) {
this.IReadCloser.SetDealFunc(func(msg Message) {
Expand Down Expand Up @@ -232,33 +244,6 @@ func (this *Client) SetPrintWithASCII() {
this.SetPrintFunc(PrintWithASCII)
}

// SetKeepAlive 设置连接保持,另外起了携程,服务器不需要,客户端再起一个也没啥问题
// TCP keepalive定义于RFC 1122,但并不是TCP规范中的一部分,默认必需是关闭,连接方不一定支持
func (this *Client) SetKeepAlive(t time.Duration, keeps ...[]byte) *Client {
keep := conv.GetDefaultBytes([]byte(Ping), keeps...)
old := this.keepAlive
this.keepAlive = t
if old == 0 && this.keepAlive > 0 {
this.timerKeep.Reset(this.keepAlive)
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-this.timerKeep.C:
if this.keepAlive <= 0 {
return
}
if _, err := this.Write(keep); err != nil {
return
}
}
}
}(this.Ctx())
}
return this
}

// SetReadWriteWithStartEnd 设置读取写入数据根据包头包尾
func (this *Client) SetReadWriteWithStartEnd(packageStart, packageEnd []byte) *Client {
this.IWriter.SetWriteWithStartEnd(packageStart, packageEnd)
Expand Down
2 changes: 1 addition & 1 deletion io_i_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (this *ICloser) ParentCtx() context.Context {
return this.ctxParent
}

// Ctx 上下文
// Ctx 子级上下文
func (this *ICloser) Ctx() context.Context {
return this.ctx
}
Expand Down
101 changes: 67 additions & 34 deletions io_i_read_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync/atomic"
"time"
)

func NewIReadCloser(readCloser ReadCloser) *IReadCloser {
Expand All @@ -24,52 +25,68 @@ func NewIReadCloserWithContext(ctx context.Context, readCloser ReadCloser) *IRea
type IReadCloser struct {
*IReader
*ICloser
dealFunc DealFunc //处理数据函数
running uint32 //是否在运行
dealFunc DealFunc //处理数据函数
running uint32 //是否在运行
timeout time.Duration //超时时间
}

//================================Nature================================

// GetKey 获取唯一标识
func (this *IReadCloser) GetKey() string {
return this.IReader.GetKey()
}

// SetKey 设置唯一标识
func (this *IReadCloser) SetKey(key string) *IReadCloser {
this.IPrinter.SetKey(key)
this.IReader.SetKey(key)
this.ICloser.SetKey(key)
return this
}

// SetPrintFunc 设置打印函数
func (this *IReadCloser) SetPrintFunc(fn PrintFunc) *IReadCloser {
this.IPrinter.SetPrintFunc(fn)
this.IReader.SetPrintFunc(fn)
//错误信息按ASCII编码
return this
}

// Debug debug模式
func (this *IReadCloser) Debug(b ...bool) *IReadCloser {
this.IPrinter.Debug(b...)
this.IReader.Debug(b...)
this.ICloser.Debug(b...)
return this
}

// SetTimeout 设置超时时间
func (this *IReadCloser) SetTimeout(timeout time.Duration) *IReadCloser {
this.timeout = timeout
return this
}

//================================DealFunc================================

// SetDealFunc 设置数据处理函数
func (this *IReadCloser) SetDealFunc(fn func(msg Message)) {
func (this *IReadCloser) SetDealFunc(fn func(msg Message)) *IReadCloser {
this.dealFunc = fn
return this
}

// SetDealWithNil 不设置数据处理函数
func (this *IReadCloser) SetDealWithNil() {
this.SetDealFunc(nil)
func (this *IReadCloser) SetDealWithNil() *IReadCloser {
return this.SetDealFunc(nil)
}

// SetDealWithWriter 设置数据处理到io.Writer
func (this *IReadCloser) SetDealWithWriter(writer Writer) {
this.SetDealFunc(func(msg Message) {
func (this *IReadCloser) SetDealWithWriter(writer Writer) *IReadCloser {
return this.SetDealFunc(func(msg Message) {
writer.Write(msg)
})
}

// SetDealWithChan 设置数据处理到chan
func (this *IReadCloser) SetDealWithChan(c chan Message) {
this.SetDealFunc(func(msg Message) {
func (this *IReadCloser) SetDealWithChan(c chan Message) *IReadCloser {
return this.SetDealFunc(func(msg Message) {
c <- msg
})
}
Expand All @@ -82,32 +99,48 @@ func (this *IReadCloser) Running() bool {
}

func (this *IReadCloser) Run() error {

if atomic.SwapUint32(&this.running, 1) == 1 {
return nil
}

timer := time.NewTimer(0)
<-timer.C
for {
select {
case <-this.Done():
return this.Err()
default:
_ = this.CloseWithErr(func() (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("%v", e)
}
}()
bytes, err := this.ReadMessage()
if err != nil || len(bytes) == 0 {
return err
}
//打印日志
this.IPrinter.Print(bytes, TagRead, this.GetKey())
//处理数据
if this.dealFunc != nil {
this.dealFunc(bytes)
}
return
}())
if this.timeout <= 0 {
select {
case <-this.Done():
return this.Err()
default:
_ = this.CloseWithErr(this.run())
}
} else {
select {
case <-this.Done():
return this.Err()
case <-timer.C:
return ErrWithReadTimeout
default:
_ = this.CloseWithErr(this.run())
}
}
}
}

func (this *IReadCloser) run() (err error) {
defer func() {
if e := recover(); e != nil {
err = fmt.Errorf("%v", e)
}
}()
//读取数据
bytes, err := this.ReadMessage()
if err != nil || len(bytes) == 0 {
return err
}
//处理数据
if this.dealFunc != nil {
this.dealFunc(bytes)
}
return
}
13 changes: 7 additions & 6 deletions io_i_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@ package io

import (
"bufio"
"fmt"
"github.com/injoyai/io/buf"
"io"
"time"
)

func NewIReader(r Reader) *IReader {
i := &IReader{lastChan: make(chan Message)}
i := &IReader{
IPrinter: NewIPrinter(fmt.Sprint(r)),
lastChan: make(chan Message)}
if v, ok := r.(MessageReader); ok {
i.MessageReader = v
} else {
Expand All @@ -19,6 +22,7 @@ func NewIReader(r Reader) *IReader {
}

type IReader struct {
*IPrinter
MessageReader
buf *bufio.Reader //buffer
readFunc func(*bufio.Reader) ([]byte, error) //读取函数
Expand Down Expand Up @@ -117,16 +121,13 @@ func (this *IReader) SetReadFunc(fn buf.ReadFunc) {
case this.lastChan <- bs:
default:
}
//打印日志
this.IPrinter.Print(bs, TagRead, this.GetKey())
}
return bs, nil
}
}

//// SetReadWithNil 设置读取函数为nil
//func (this *IReader) SetReadWithNil() {
// this.SetReadFunc(nil)
//}

// SetReadWithAll 一次性全部读取
func (this *IReader) SetReadWithAll() {
this.SetReadFunc(buf.ReadWithAll)
Expand Down

0 comments on commit ccae0a1

Please sign in to comment.