-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio_i_read_closer.go
165 lines (142 loc) · 3.91 KB
/
io_i_read_closer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package io
import (
"context"
"github.com/injoyai/base/chans"
"sync/atomic"
"time"
)
// NewIReadCloser 新建IReader,默认读取函数ReadAll
func NewIReadCloser(readCloser ReadCloser) *IReadCloser {
return NewIReadCloserWithContext(context.Background(), readCloser)
}
// NewIReadCloserWithContext 新建IReader,默认读取函数ReadAll
func NewIReadCloserWithContext(ctx context.Context, readCloser ReadCloser) *IReadCloser {
if c, ok := readCloser.(*IReadCloser); ok && c != nil {
return c
}
return &IReadCloser{
IReader: NewIReader(readCloser),
ICloser: NewICloserWithContext(ctx, readCloser),
running: 0,
timeout: 0,
readSign: make(chan struct{}),
}
}
type IReadCloser struct {
*IReader
*ICloser
dealFunc DealFunc //处理数据函数
running uint32 //是否在运行
timeout time.Duration //超时时间
readSign chan struct{} //读取到数据信号,配合超时机制使用
queue *chans.Entity //协程队列,可选
}
//================================Nature================================
// GetKey 获取唯一标识
func (this *IReadCloser) GetKey() string {
return this.IReader.GetKey()
}
// SetKey 设置唯一标识
func (this *IReadCloser) SetKey(key string) *IReadCloser {
this.IReader.SetKey(key)
this.ICloser.SetKey(key)
return this
}
// SetPrintFunc 设置打印函数
func (this *IReadCloser) SetPrintFunc(fn PrintFunc) *IReadCloser {
this.IReader.SetPrintFunc(fn)
this.ICloser.SetPrintFunc(fn) //错误信息按ASCII编码?
return this
}
// Debug debug模式
func (this *IReadCloser) Debug(b ...bool) *IReadCloser {
this.IReader.Debug(b...)
this.ICloser.Debug(b...)
return this
}
// SetTimeout 设置超时时间,需要在Run之前设置
func (this *IReadCloser) SetTimeout(timeout time.Duration) *IReadCloser {
this.timeout = timeout
return this
}
//================================DealFunc================================
// SetDealFunc 设置数据处理函数
func (this *IReadCloser) SetDealFunc(fn func(msg Message)) *IReadCloser {
this.dealFunc = fn
return this
}
// SetDealWithNil 不设置数据处理函数
func (this *IReadCloser) SetDealWithNil() *IReadCloser {
return this.SetDealFunc(nil)
}
// SetDealWithWriter 设置数据处理到io.Writer
func (this *IReadCloser) SetDealWithWriter(writer Writer) *IReadCloser {
return this.SetDealFunc(func(msg Message) {
writer.Write(msg)
})
}
// SetDealWithChan 设置数据处理到chan
func (this *IReadCloser) SetDealWithChan(c chan Message) *IReadCloser {
return this.SetDealFunc(func(msg Message) {
c <- msg
})
}
// SetDealQueueFunc 设置协程队列处理数据
// @num 协程数量
// @no 协程序号
// @count 当前协程执行次数
// @msg 消息内容
func (this *IReadCloser) SetDealQueueFunc(num int, fn func(msg Message)) *IReadCloser {
if this.queue == nil {
this.queue = chans.NewEntity(num).SetHandler(func(no, count int, data interface{}) {
fn(data.(Message))
})
} else {
this.queue.SetNum(num)
}
this.SetDealFunc(func(msg Message) { this.queue.Do(msg) })
return this
}
//================================RunTime================================
// Running 是否在运行
func (this *IReadCloser) Running() bool {
return this.running == 1
}
// Run 开始运行数据读取
func (this *IReadCloser) Run() error {
if atomic.SwapUint32(&this.running, 1) == 1 {
return nil
}
//todo is a good idea ?
if this.timeout > 0 {
go func() {
timer := time.NewTimer(this.timeout)
defer timer.Stop()
for {
timer.Reset(this.timeout)
select {
case <-timer.C:
_ = this.CloseWithErr(ErrWithReadTimeout)
case <-this.readSign:
}
}
}()
}
return this.For(func() (err error) {
//读取数据
bs, err := this.ReadMessage()
if err != nil || len(bs) == 0 {
return err
}
//尝试加入通道,超时定时器重置
select {
case this.readSign <- struct{}{}:
default:
}
//处理数据
if this.dealFunc != nil {
this.dealFunc(bs)
}
return nil
})
}