-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio_client.go
368 lines (319 loc) · 10.3 KB
/
io_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
package io
import (
"context"
"fmt"
"github.com/injoyai/base/maps"
"github.com/injoyai/conv"
"io"
"time"
)
// Redial 一直连接,直到成功
func Redial(dial DialFunc, options ...OptionClient) *Client {
return RedialWithContext(context.Background(), dial, options...)
}
// RedialWithContext 一直尝试连接,直到成功,需要输入上下文
func RedialWithContext(ctx context.Context, dial DialFunc, options ...OptionClient) *Client {
x := NewICloserWithContext(ctx, nil)
x.Logger.Debug()
x.SetRedialFunc(dial)
x.SetKey(conv.String(dial))
r, key := x.MustDial(ctx)
return NewClientWithContext(ctx, r, func(c *Client) {
c.SetKey(key)
c.SetRedialFunc(dial)
c.Redial(options...)
//用户控制输出,需要在SetOptions之后打印
c.Logger.Infof("[%s] 连接服务端成功...", c.GetKey())
})
}
// NewDial 尝试连接,返回*Client和错误
func NewDial(dial DialFunc, options ...OptionClient) (*Client, error) {
return NewDialWithContext(context.Background(), dial, options...)
}
// NewDialWithContext 尝试连接,返回*Client和错误,需要输入上下文
func NewDialWithContext(ctx context.Context, dial DialFunc, options ...OptionClient) (*Client, error) {
c, key, err := dial()
if err != nil {
return nil, err
}
cli := NewClientWithContext(ctx, c, func(c *Client) {
c.SetKey(key)
c.SetRedialFunc(dial)
c.SetOptions(options...)
//用户控制输出,需要在SetOptions之后打印
c.Logger.Infof("[%s] 连接服务端成功...", c.GetKey())
})
return cli, nil
}
// NewClient 标准库io.ReadWriterCloser转*Client
// 和隐性的MessageReadWriteCloser转*Client,后续1.18之后改成泛型
func NewClient(i ReadWriteCloser, options ...OptionClient) *Client {
return NewClientWithContext(context.Background(), i, options...)
}
// NewClientWithContext 标准库io.ReadWriterCloser转*Client,需要输入上下文
func NewClientWithContext(ctx context.Context, i ReadWriteCloser, options ...OptionClient) *Client {
if c, ok := i.(*Client); ok && c != nil {
return c
}
c := &Client{
Key: "",
IReadCloser: NewIReadCloserWithContext(ctx, i),
IWriter: NewIWriter(i),
i: i,
tag: nil,
createTime: time.Now(),
}
c.SetKey(fmt.Sprintf("%p", i))
c.Debug()
c.SetOptions(options...)
return c
}
/*
Client 通用IO客户端
各种设置,当Run函数执行时生效
可以作为普通的io.ReadWriteCloser(Run函数不执行)
*/
type Client struct {
Key
*IReadCloser
*IWriter
i ReadWriteCloser //接口,实例,传入的原始参数
tag *maps.Safe //标签,用于记录连接的一些信息
createTime time.Time //创建时间
}
//================================Nature================================
// ReadLastTime 最后读取时间
func (this *Client) ReadLastTime() time.Time {
return this.IReader.LastTime()
}
// WriteLastTime 最后写入时间
func (this *Client) WriteLastTime() time.Time {
return this.IWriter.LastTime()
}
// ReadBytesCount 读取的字节数
func (this *Client) ReadBytesCount() int64 {
return this.IReader.BytesCount()
}
// WriteBytesCount 写入的字节数
func (this *Client) WriteBytesCount() int64 {
return this.IWriter.BytesCount()
}
// ReadWriteCloser 读写接口,实例,传入的原始参数
func (this *Client) ReadWriteCloser() io.ReadWriteCloser {
return this.i
}
// Pointer 获取指针地址
func (this *Client) Pointer() string {
return fmt.Sprintf("%p", this.ReadWriteCloser())
}
// CreateTime 创建时间
func (this *Client) CreateTime() time.Time {
return this.createTime
}
// Tag 自定义信息,方便记录连接信息 例:c.Tag().GetString("imei")
func (this *Client) Tag() *maps.Safe {
if this.tag == nil {
this.tag = maps.NewSafe()
}
return this.tag
}
// SetKey 设置唯一标识
func (this *Client) SetKey(key string) *Client {
this.IWriter.SetKey(key)
this.IReadCloser.SetKey(key)
return this
}
// GetKey 获取唯一标识
func (this *Client) GetKey() string {
return this.IReadCloser.GetKey()
}
// WriteQueue 按队列写入
func (this *Client) WriteQueue(p []byte) *Client {
queue, _ := this.Tag().GetOrSetByHandler(writeQueueKey, func() (interface{}, error) {
return this.IWriter.NewWriteQueue(this.Ctx()), nil
})
queue.(chan []byte) <- p
return this
}
// TryWriteQueue 尝试按队列写入,加入不了会丢弃
func (this *Client) TryWriteQueue(p []byte) *Client {
queue, _ := this.Tag().GetOrSetByHandler(writeQueueKey, func() (interface{}, error) {
return this.IWriter.NewWriteQueue(this.Ctx()), nil
})
select {
case queue.(chan []byte) <- p:
default:
}
return this
}
// WriteReadWithTimeout 同步写读,超时
func (this *Client) WriteReadWithTimeout(request []byte, timeout time.Duration) (response []byte, err error) {
if _, err = this.Write(request); err != nil {
return
}
return this.ReadLast(timeout)
}
// WriteRead 同步写读
func (this *Client) WriteRead(request []byte, timeout ...time.Duration) (response []byte, err error) {
return this.WriteReadWithTimeout(request, conv.GetDefaultDuration(DefaultResponseTimeout, timeout...))
}
func (this *Client) Ping(timeout ...time.Duration) error {
_, err := this.WriteRead([]byte(Ping), conv.DefaultDuration(time.Second, timeout...))
return err
}
// GoTimerWriter 协程,定时写入数据,生命周期(一次链接,单次连接断开)
func (this *Client) GoTimerWriter(interval time.Duration, write func(w *IWriter) error) {
go this.ICloser.Timer(interval, func() error {
return write(this.IWriter)
})
}
// GoTimerWriteBytes 协程,定时写入字节数据
func (this *Client) GoTimerWriteBytes(interval time.Duration, p []byte) {
this.GoTimerWriter(interval, func(w *IWriter) error {
_, err := w.Write(p)
return err
})
}
// GoTimerWriteASCII 协程,定时写入字符数据
func (this *Client) GoTimerWriteASCII(interval time.Duration, s string) {
this.GoTimerWriter(interval, func(w *IWriter) error {
_, err := w.WriteASCII(s)
return err
})
}
// GoAfter 延迟执行函数
func (this *Client) GoAfter(after time.Duration, fn func()) {
go this.ICloser.After(after, fn)
}
// SetKeepAlive 设置连接保持,另外起了携程,服务器不需要,客户端再起一个也没啥问题
// TCP keepalive定义于RFC 1122,但并不是TCP规范中的一部分,默认必需是关闭,连接方不一定支持
func (this *Client) SetKeepAlive(t time.Duration, keeps ...[]byte) {
this.GoTimerWriter(t, func(c *IWriter) error {
keep := conv.GetDefaultBytes([]byte(Ping), keeps...)
_, err := c.Write(keep)
return err
})
}
//================================Logger================================
// Debug 调试模式,打印日志
// 为了实现Debugger接口,不需要返回值
func (this *Client) Debug(b ...bool) {
this.IWriter.Logger.Debug(b...)
this.IReadCloser.Debug(b...)
}
// SetLogger 设置日志
func (this *Client) SetLogger(logger Logger) *Client {
l := newLogger(logger)
this.Logger = l
this.IWriter.Logger = l
this.IReadCloser.IReader.Logger = l
this.IReadCloser.ICloser.Logger = l
return this
}
// SetPrintWithHEX 设置打印HEX
func (this *Client) SetPrintWithHEX() *Client {
this.IWriter.Logger.SetPrintWithHEX()
this.IReadCloser.SetPrintWithHEX()
return this
}
// SetPrintWithUTF8 设置打印编码utf-8
func (this *Client) SetPrintWithUTF8() *Client {
this.IWriter.Logger.SetPrintWithUTF8()
this.IReadCloser.SetPrintWithUTF8()
return this
}
// SetLevel 设置日志等级
func (this *Client) SetLevel(level Level) *Client {
this.IWriter.Logger.SetLevel(level)
this.IReadCloser.SetLevel(level)
return this
}
// SetPrintWithAll 设置打印等级为全部
func (this *Client) SetPrintWithAll() *Client {
return this.SetLevel(LevelAll)
}
// SetPrintWithBase 设置打印基础信息
func (this *Client) SetPrintWithBase() *Client {
return this.SetLevel(LevelInfo)
}
// SetPrintWithErr 设置打印错误信息
func (this *Client) SetPrintWithErr() *Client {
return this.SetLevel(LevelError)
}
//================================SetFunc================================
// SetOptions 设置选项
func (this *Client) SetOptions(options ...OptionClient) *Client {
for _, v := range options {
v(this)
}
return this
}
// SetDealFunc 设置处理数据函数,默认响应ping>pong,忽略pong
func (this *Client) SetDealFunc(fn func(c *Client, msg Message)) *Client {
this.IReadCloser.SetDealFunc(func(msg Message) {
switch msg.String() {
case Ping:
this.WriteString(Pong)
case Pong:
default:
fn(this, msg)
}
})
return this
}
// SetCloseFunc 设置关闭函数
func (this *Client) SetCloseFunc(fn func(ctx context.Context, c *Client, msg Message)) *Client {
this.IReadCloser.SetCloseFunc(func(ctx context.Context, msg Message) {
fn(ctx, this, msg)
})
return this
}
// SetReadWriteWithPkg 设置读写为默认分包方式
func (this *Client) SetReadWriteWithPkg() *Client {
this.IWriter.SetWriteWithPkg()
this.IReader.SetReadWithPkg()
return this
}
// SetReadWriteWithStartEnd 设置读取写入数据根据包头包尾
func (this *Client) SetReadWriteWithStartEnd(packageStart, packageEnd []byte) *Client {
this.IWriter.SetWriteWithStartEnd(packageStart, packageEnd)
this.IReadCloser.SetReadWithStartEnd(packageStart, packageEnd)
return this
}
// Redial 重新链接,重试,因为指针复用,所以需要根据上下文来处理(例如关闭)
func (this *Client) Redial(options ...OptionClient) *Client {
this.SetCloseFunc(func(ctx context.Context, c *Client, msg Message) {
<-time.After(time.Second)
readWriteCloser, key := this.IReadCloser.MustDial(ctx)
if readWriteCloser == nil {
if this.ICloser.Err() != ErrHandClose {
this.Logger.Errorf("[%s] 连接断开(%v),未设置重连函数", this.GetKey(), this.ICloser.Err())
}
return
}
this.Logger.Infof("[%s] 连接断开(%v),重连成功", this.GetKey(), this.ICloser.Err())
redialFunc := this.IReadCloser.redialFunc
//key := this.GetKey()
*this = *NewClient(readWriteCloser)
this.SetKey(key)
this.SetRedialFunc(redialFunc)
this.Redial(options...)
go this.Run()
})
this.SetOptions(options...)
//新建客户端时已经能确定连接成功,为了让用户控制是否输出,所以在Run的时候打印
//this.Logger.Infof("[%s] 连接服务端成功...", this.GetKey())
go this.Run()
return this
}
// Swap IO数据交换
func (this *Client) Swap(i ReadWriteCloser) {
this.SwapClient(NewClient(i))
}
// SwapClient IO数据交换
func (this *Client) SwapClient(c *Client) {
SwapClient(this, c)
}
func (this *Client) Run() error {
return this.IReadCloser.Run()
}