-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio_i_read_closer.go
187 lines (159 loc) · 4.58 KB
/
io_i_read_closer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package io
import (
"context"
"github.com/injoyai/base/chans"
"sync/atomic"
"time"
)
// NewIReadCloser 新建IReader,默认读取函数ReadAll
func NewIReadCloser(readCloser ReadCloser) *IReadCloser {
return NewIReadCloserWithContext(context.Background(), readCloser)
}
// NewIReadCloserWithContext 新建IReader,默认读取函数ReadAll
func NewIReadCloserWithContext(ctx context.Context, readCloser ReadCloser) *IReadCloser {
if c, ok := readCloser.(*IReadCloser); ok && c != nil {
return c
}
return &IReadCloser{
IReader: NewIReader(readCloser),
ICloser: NewICloserWithContext(ctx, readCloser),
running: 0,
timeout: 0,
readSign: make(chan struct{}),
}
}
type IReadCloser struct {
*IReader
*ICloser
dealFunc func(msg Message) //处理数据函数
running uint32 //是否在运行
timeout time.Duration //超时时间,读取
readSign chan struct{} //读取到数据信号,配合超时机制使用
queue *chans.Entity //协程队列,可选
}
//================================Nature================================
// GetKey 获取唯一标识
func (this *IReadCloser) GetKey() string {
return this.IReader.GetKey()
}
// SetKey 设置唯一标识
func (this *IReadCloser) SetKey(key string) *IReadCloser {
this.IReader.SetKey(key)
this.ICloser.SetKey(key)
return this
}
// SetReadIntervalTimeout 设置读取间隔超时时间,需要在Run之前设置
func (this *IReadCloser) SetReadIntervalTimeout(timeout time.Duration) *IReadCloser {
this.timeout = timeout
return this
}
//================================Log================================
// Debug debug模式,实现Debugger接口,不用返回值
func (this *IReadCloser) Debug(b ...bool) {
this.IReader.Logger.Debug(b...)
this.ICloser.Logger.Debug(b...)
}
func (this *IReadCloser) SetLogger(logger Logger) *IReadCloser {
l := newLogger(logger)
this.IReader.Logger = l
this.ICloser.Logger = l
return this
}
func (this *IReadCloser) SetLevel(level Level) *IReadCloser {
this.IReader.Logger.SetLevel(level)
this.ICloser.Logger.SetLevel(level)
return this
}
// SetPrintWithHEX 设置打印HEX
func (this *IReadCloser) SetPrintWithHEX() *IReadCloser {
this.IReader.Logger.SetPrintWithHEX()
this.ICloser.Logger.SetPrintWithHEX()
return this
}
func (this *IReadCloser) SetPrintWithUTF8() *IReadCloser {
this.IReader.Logger.SetPrintWithUTF8()
this.ICloser.Logger.SetPrintWithUTF8()
return this
}
//================================DealFunc================================
// SetDealFunc 设置数据处理函数
func (this *IReadCloser) SetDealFunc(fn func(msg Message)) *IReadCloser {
this.dealFunc = fn
return this
}
// SetDealWithNil 不设置数据处理函数
func (this *IReadCloser) SetDealWithNil() *IReadCloser {
return this.SetDealFunc(nil)
}
// SetDealWithWriter 设置数据处理到io.Writer
func (this *IReadCloser) SetDealWithWriter(writer Writer) *IReadCloser {
return this.SetDealFunc(func(msg Message) {
writer.Write(msg)
})
}
// SetDealWithChan 设置数据处理到chan
func (this *IReadCloser) SetDealWithChan(c chan Message) *IReadCloser {
return this.SetDealFunc(func(msg Message) {
c <- msg
})
}
// SetDealQueueFunc 设置协程队列处理数据
// @num 协程数量
// @no 协程序号
// @count 当前协程执行次数
// @msg 消息内容
func (this *IReadCloser) SetDealQueueFunc(num int, fn func(msg Message)) *IReadCloser {
if this.queue == nil {
this.queue = chans.NewEntity(num).SetHandler(func(ctx context.Context, no, count int, data interface{}) {
fn(data.(Message))
})
} else {
this.queue.SetNum(num)
}
this.SetDealFunc(func(msg Message) { this.queue.Do(msg) })
return this
}
//================================RunTime================================
// Running 是否在运行
func (this *IReadCloser) Running() bool {
return atomic.LoadUint32(&this.running) == 1
}
// Run 开始运行数据读取
func (this *IReadCloser) Run() error {
if atomic.SwapUint32(&this.running, 1) == 1 {
return nil
}
//todo is a good idea ?
if this.timeout > 0 {
go func() {
timer := time.NewTimer(this.timeout)
defer timer.Stop()
for {
timer.Reset(this.timeout)
select {
case <-timer.C:
_ = this.CloseWithErr(ErrWithReadTimeout)
case <-this.readSign:
}
}
}()
}
readFunc := func(ctx context.Context) (err error) {
//读取数据
bs, err := this.ReadMessage()
if err != nil || len(bs) == 0 {
return err
}
//尝试加入通道,超时定时器重置
select {
case this.readSign <- struct{}{}:
default:
}
//处理数据
if this.dealFunc != nil {
this.dealFunc(bs)
}
return nil
}
return this.For(readFunc)
}