Skip to content

Commit

Permalink
优化
Browse files Browse the repository at this point in the history
  • Loading branch information
injoyai committed Jun 7, 2024
1 parent 01e4bc0 commit 4116e77
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 180 deletions.
7 changes: 1 addition & 6 deletions dial/dial_websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,7 @@ func (this *WebsocketConfig) Dial() (io.ReadWriteCloser, string, error) {
// Websocket 连接
func Websocket(url string, header http.Header) (io.ReadWriteCloser, string, error) {
c, _, err := websocket.DefaultDialer.Dial(url, header)
return &WebsocketClient{Conn: c}, func() string {
if u, err := gourl.Parse(url); err == nil {
return u.Path
}
return url
}(), err
return &WebsocketClient{Conn: c}, url, err
}

func WithWebsocket(url string, header http.Header) io.DialFunc {
Expand Down
2 changes: 1 addition & 1 deletion extend/p2p/p2p_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestNewPeer2(t *testing.T) {
t.Log(err)
return
}
p.SetBeforeFunc(func(c *io.Client) error {
p.SetConnectFunc(func(c *io.Client) error {
logs.Debug(c.GetKey())
return nil
})
Expand Down
6 changes: 3 additions & 3 deletions extend/proxy/proxy_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ func (this *Entity) writeMessage(msg *Message) (err error) {
proxyClient.SetDealFunc(func(c *io.Client, m io.Message) {
this.AddMessage(msg.Response(m.Bytes()))
})
proxyClient.SetCloseFunc(func(ctx context.Context, c *io.Client, msg io.Message) {
proxyClient.SetCloseFunc(func(ctx context.Context, c *io.Client, err error) {
this.delIO(c.GetKey())
this.AddMessage(NewCloseMessage(c.GetKey(), msg.String()))
this.AddMessage(NewCloseMessage(c.GetKey(), err.Error()))
})
go proxyClient.Run()
//加入到缓存
Expand Down Expand Up @@ -221,7 +221,7 @@ func WithClientDebug(b ...bool) func(c *io.Client, e *Entity) {

// setIO 添加记录,存在则关闭并覆盖
func (this *Entity) setIO(key string, i *io.Client) {
old := this.ioMap.GetAndSet(key, i)
old, _ := this.ioMap.GetAndSet(key, i)
if val, ok := old.(*io.Client); ok {
val.Close()
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/goburrow/serial v0.1.0
github.com/gorilla/websocket v1.5.0
github.com/injoyai/base v1.0.11
github.com/injoyai/base v1.0.12
github.com/injoyai/conv v1.1.5
github.com/injoyai/logs v1.0.7
go.bug.st/serial v1.5.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/injoyai/base v1.0.11 h1:bKO98VbXyOrvbMIwMmVJ8WIlVTTKyKku7/V8gW9PEa0=
github.com/injoyai/base v1.0.11/go.mod h1:2JSv8ndJ/FmKyazfhDBewzylEbVfJV0EE/r6kOvnzwU=
github.com/injoyai/base v1.0.12 h1:R5QhUPUQAdI32T0vZXn+ssfUvQa093XGwxwDj8Xwrpc=
github.com/injoyai/base v1.0.12/go.mod h1:2JSv8ndJ/FmKyazfhDBewzylEbVfJV0EE/r6kOvnzwU=
github.com/injoyai/conv v1.1.5 h1:PC1639ZRHWFp/bSnan1PeCRePW3ODAFpwOFx6NW0Kic=
github.com/injoyai/conv v1.1.5/go.mod h1:PYoJcbqaz4eyQUovzErRFCuDSPGB4L07AvdmFsTrTew=
github.com/injoyai/logs v1.0.7 h1:pVBB8/rJ/nsnlSvxZoBfCfu+kL84q2pqEx5oN4qPG7I=
Expand Down
137 changes: 101 additions & 36 deletions io_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ func Redial(dial DialFunc, options ...OptionClient) *Client {

// RedialWithContext 一直尝试连接,直到成功,需要输入上下文
func RedialWithContext(ctx context.Context, dial DialFunc, options ...OptionClient) *Client {
ctxParent, cancelParent := context.WithCancel(ctx)
c := &Client{
ctxParent: ctxParent,
cancelParent: cancelParent,
}
c := newClient(ctx)
c.SetDialFunc(dial)
err := c.MustDial(ctx, func(c *Client) {
c.Redial(options...)
Expand All @@ -41,11 +37,7 @@ func NewDial(dial DialFunc, options ...OptionClient) (*Client, error) {

// NewDialWithContext 尝试连接,返回*Client和错误,需要输入上下文
func NewDialWithContext(ctx context.Context, dial DialFunc, options ...OptionClient) (*Client, error) {
ctxParent, cancelParent := context.WithCancel(ctx)
c := &Client{
ctxParent: ctxParent,
cancelParent: cancelParent,
}
c := newClient(ctx)
c.SetDialFunc(dial)
err := c.Dial(options...)
return c, err
Expand All @@ -62,14 +54,20 @@ func NewClientWithContext(ctx context.Context, i ReadWriteCloser, options ...Opt
if c, ok := i.(*Client); ok && c != nil {
return c
}
c := newClient(ctx)
c.reset(i, c.ID(), options...)
return c
}

func newClient(ctx context.Context) *Client {
ctxParent, cancelParent := context.WithCancel(ctx)
c := &Client{
ctxParent: ctxParent,
cancelParent: cancelParent,
CreateTime: time.Now(),
return &Client{
ctxParent: ctxParent,
cancelParent: cancelParent,
CreateTime: time.Now(),
logger: defaultLogger(),
redialMaxTime: time.Second * 32,
}
c.reset(i, c.Pointer(), options...)
return c
}

/*
Expand Down Expand Up @@ -103,26 +101,28 @@ type Client struct {
cancelParent context.CancelFunc //父级上下文,主动关闭时,用于关闭redial

//runtime
Key //自定义标识
Key string //自定义标识
*logger //日志
pointer string //唯一标识,指针地址
i ReadWriteCloser //接口,实例,传入的原始参数
buf *bufio.Reader //buffer
tag *maps.Safe //标签,用于记录连接的一些信息
CreateTime time.Time //创建时间
ReadTime time.Time //最后读取到数据的时间
ReadCount uint64 //读取的字节数量
WriteTime time.Time //最后写入数据时间
WriteCount uint64 //写入的字节数量
WriteNumber uint64 //写入的次数
CreateTime time.Time //创建时间,对象创建时间,重连不会改变
DialTime time.Time //连接时间,每次重连会改变
ReadTime time.Time //本次连接,最后读取到数据的时间
ReadCount uint64 //本次连接,读取的字节数量
WriteTime time.Time //本次连接,最后写入数据时间
WriteCount uint64 //本次连接,写入的字节数量
WriteNumber uint64 //本次连接,写入的次数

//连接成功事件,可以手动进行数据的读写,或者关闭,返回错误会关闭连接
//如果设置了重连,则会再次建立连接而触发连接事件
//所以固定返回错误的话,会陷入无限连接断开的情况,
connectFunc []func(c *Client) error

//连接断开事件,连接断开的时候触发,可以调用Dial方法进行重连操作
closeFunc []func(ctx context.Context, c *Client, err error)
//怕用户设置多个redial,重连之后越连越多,固只设置一个函数
closeFunc func(ctx context.Context, c *Client, err error)

//从流中读取数据
readFunc func(buf *bufio.Reader) (Acker, error)
Expand All @@ -132,6 +132,12 @@ type Client struct {

//写入数据事件,可以进行封装或者打印等操作
writeFunc []func(p []byte) ([]byte, error)

//当写入结束时出发
writeResultFunc []func(c *Client, err error)

//当key变化时触发
keyChangeFunc []func(c *Client, oldKey string)
}

//================================Nature================================
Expand All @@ -140,16 +146,62 @@ func (this *Client) reset(i ReadWriteCloser, key string, options ...OptionClient
if v, ok := i.(*Client); ok {
this.reset(v.i, key, options...)
}
this.ctx, this.cancel = context.WithCancel(this.ctxParent)

this.latestChan = make(chan Message)
this.writeQueueOnce = sync.Once{}
this.writeQueue = nil

this.redialMaxTime = time.Second * 32
this.redialMaxNum = 0
this.dealFunc = nil

this.timeout = 0
this.timeoutReset = make(chan struct{})
this.logger = defaultLogger()
this.running = 0
this.closed = 0
//错误初始化,初始化会执行用户option,
//例如用户在option中设置了Close
//初始化之后会判断错误信息
//所以这里得初始化错误
this.closeErr = nil
this.ctx, this.cancel = context.WithCancel(this.ctxParent)
//父级上下文保留
//this.ctxParent = this.ctxParent
//this.cancelParent= this.cancelParent

this.Key = key
//在外部声明,连接失败的时候需要打印日志,还没到初始化这一步
this.logger = defaultLogger()
//还是原来的对象,使用的原先的指针
//this.pointer=this.pointer
this.i = i
this.tag = nil
//this.closeErr = nil
//buf在下面的处理
//this.buf
this.tag = nil //是否保留tag信息,重新设置就能覆盖
//使用的是第一次连接的时间
//this.CreateTime=this.CreateTime
//当连接成功的时候会进行重置操作
this.DialTime = time.Now()
this.ReadTime = time.Time{}
this.ReadCount = 0
this.WriteTime = time.Time{}
this.WriteCount = 0
this.WriteNumber = 0

//初始化事件函数
this.connectFunc = nil
this.closeFunc = nil
this.readFunc = nil
this.dealFunc = nil
this.writeFunc = nil
this.writeResultFunc = nil
this.keyChangeFunc = nil

/*
*/

defaultReadFunc := ReadFuncToAck(buf.Read1KB)
switch v := i.(type) {
Expand All @@ -170,34 +222,47 @@ func (this *Client) reset(i ReadWriteCloser, key string, options ...OptionClient
this.buf = bufio.NewReaderSize(i, DefaultBufferSize+1)

//设置默认的Option
this.SetKey(key) //唯一标识
this.Debug() //打印日志
this.SetKey(key) //设置唯一标识
this.Debug() //设置打印日志

//设置默认事件
this.SetConnectWithNil().SetConnectWithLog()
this.SetReadAckFunc(defaultReadFunc)
this.SetDealWithNil().SetDealWithDefault()
this.SetWriteWithNil().SetWriteWithLog()
this.SetCloseWithNil().SetCloseWithLog()
this.SetCloseWithLog()

//设置用户的Option
this.SetOptions(options...)

return this
}

// GetKey 获取标识
func (this *Client) GetKey() string {
return this.Key
}

// SetKey 设置标识
func (this *Client) SetKey(key string) *Client {
if key == this.Key {
return this
}
oldKey := this.Key
this.Key = key
for _, v := range this.keyChangeFunc {
v(this, oldKey)
}
return this
}

// ReadWriteCloser 读写接口,实例,传入的原始参数
func (this *Client) ReadWriteCloser() io.ReadWriteCloser {
return this.i
}

// ID 获取唯一标识,不可改,溯源,不用因为改了key而出现数据错误的bug
func (this *Client) ID() string {
return this.Pointer()
}

// Pointer 获取指针地址
func (this *Client) Pointer() string {
if this.pointer == "" {
pointer := fmt.Sprintf("%p", this)
this.pointer = pointer
Expand Down
23 changes: 17 additions & 6 deletions io_client_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,8 @@ func (this *Client) closeWithErr(closeErr error, fn ...func(Closer) error) (err
close(this.writeQueue)
}
//关闭实例,可自定义关闭方式,例如设置超时
if len(fn) == 0 {
if this.i != nil {
err = this.i.Close()
}
if len(fn) == 0 && this.i != nil {
err = this.i.Close()
} else {
for _, v := range fn {
err = v(this.i)
Expand All @@ -230,11 +228,13 @@ func (this *Client) closeWithErr(closeErr error, fn ...func(Closer) error) (err
//msg := Message(this.closeErr.Error())
////打印错误信息
//this.logger.Errorf("[%s] %s\n", this.GetKey(), msg.String())
this.logger.Errorf("[%s] 断开连接: %v\n", this.GetKey(), this.closeErr)

//执行用户设置的错误函数,需要最后执行,防止后续操作无法执行,如果设置了重连不会执行到下一步
for _, f := range this.closeFunc {
f(this.CtxAll(), this, this.closeErr)
if this.closeFunc != nil {
this.closeFunc(this.CtxAll(), this, this.closeErr)
}

////执行用户设置的错误函数
//if this.closeFunc != nil {
// //需要最后执行,防止后续操作无法执行,如果设置了重连不会执行到下一步
Expand Down Expand Up @@ -276,6 +276,7 @@ func (this *Client) MustDial(ctx context.Context, options ...OptionClient) error
if t > this.redialMaxTime {
t = this.redialMaxTime
}

this.Logger.Errorf("[%s] %v,等待%d秒重试\n", this.GetKey(), dealErr(err), t/time.Second)
timer.Reset(t)
}
Expand All @@ -301,12 +302,22 @@ func (this *Client) Dial(options ...OptionClient) error {
//尝试进行连接,返回ReadWriteCloser和唯一标识key
i, key, err := this.dialFunc(this.ctx)
if err != nil {
if len(key) > 0 {
//尝试设置key,如果错误也返回key的话
this.SetKey(key)
}
return err
}

//数据初始化操作,声明内存等操作
this.reset(i, key, options...)

//判断初始化操作是否出现错误,出现错误则返回错误
//例如开始连接成功,后来失败了,或者option中关闭了连接
if this.Err() != nil {
return this.Err()
}

//连接成功事件
for _, f := range this.connectFunc {
if err := this.CloseWithErr(f(this)); err != nil {
Expand Down
Loading

0 comments on commit 4116e77

Please sign in to comment.