-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio_i_reader.go
172 lines (149 loc) · 4.39 KB
/
io_i_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package io
import (
"bufio"
"github.com/injoyai/io/buf"
"io"
"time"
)
// NewIReader 新建IReader,默认读取函数ReadAll
func NewIReader(r Reader) *IReader {
i := &IReader{
printer: newPrinter(""),
lastChan: make(chan Message),
lastTime: time.Now(),
}
if v, ok := r.(MessageReader); ok {
i.mReader = v
} else {
i.buf = bufio.NewReader(r)
}
i.SetReadFunc(buf.ReadWithAll)
return i
}
type IReader struct {
*printer //
mReader MessageReader //接口MessageReader,兼容Reader
buf *bufio.Reader //buffer
readFunc func(*bufio.Reader) ([]byte, error) //读取函数
lastChan chan Message //读取最新数据chan
lastTime time.Time //最后读取数据时间
}
//================================Nature================================
// LastTime 最后数据时间
func (this *IReader) LastTime() time.Time {
return this.lastTime
}
//================================Read================================
// Buffer 极大的增加读取速度
func (this *IReader) Buffer() *bufio.Reader {
return this.buf
}
// Read io.reader
func (this *IReader) Read(p []byte) (int, error) {
return this.Buffer().Read(p)
}
// ReadByte 读取一字节
func (this *IReader) ReadByte() (byte, error) {
return this.Buffer().ReadByte()
}
// ReadAll 读取全部数据
func (this *IReader) ReadAll() ([]byte, error) {
return buf.ReadWithAll(this.Buffer())
}
// ReadMessage 实现MessageReader接口
func (this *IReader) ReadMessage() ([]byte, error) {
if this.readFunc == nil {
return nil, ErrInvalidReadFunc
}
return this.readFunc(this.Buffer())
}
// ReadLast 读取最新的数据
func (this *IReader) ReadLast(timeout time.Duration) (response []byte, err error) {
if timeout <= 0 {
select {
case response = <-this.lastChan:
}
} else {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case response = <-this.lastChan:
case <-timer.C:
err = ErrWithTimeout
}
}
return
}
// WriteTo 写入io.Writer
func (this *IReader) WriteTo(writer Writer) (int64, error) {
return Copy(writer, this)
}
// CopyTo 写入io.Writer
func (this *IReader) CopyTo(writer Writer) (int64, error) {
return Copy(writer, this)
}
//================================ReadFunc================================
// SetReadFunc 设置读取函数
func (this *IReader) SetReadFunc(fn buf.ReadFunc) *IReader {
this.readFunc = func(reader *bufio.Reader) (bs []byte, err error) {
switch true {
case this.mReader != nil:
//特殊处理MessageReader
bs, err = this.mReader.ReadMessage()
case fn == nil:
//默认读取全部
bs, err = buf.ReadWithAll(reader)
default:
//按用户设置函数
bs, err = fn(reader)
}
if err != nil {
return nil, err
}
if len(bs) > 0 {
//设置最后读取有效数据时间
this.lastTime = time.Now()
//尝试加入通道
select {
case this.lastChan <- bs:
default:
}
//打印日志
this.Print(bs, TagRead, this.GetKey())
}
return bs, nil
}
return this
}
// SetReadWithAll 一次性全部读取
func (this *IReader) SetReadWithAll() *IReader {
return this.SetReadFunc(buf.ReadWithAll)
}
// SetReadWithKB 读取固定字节长度
func (this *IReader) SetReadWithKB(n uint) *IReader {
return this.SetReadFunc(func(buf *bufio.Reader) ([]byte, error) {
bytes := make([]byte, n<<10)
length, err := buf.Read(bytes)
return bytes[:length], err
})
}
// SetReadWithStartEnd 设置根据包头包尾读取数据
func (this *IReader) SetReadWithStartEnd(packageStart, packageEnd []byte) *IReader {
return this.SetReadFunc(buf.NewReadWithStartEnd(packageStart, packageEnd))
}
// SetReadWithWriter same io.Copy 注意不能设置读取超时
func (this *IReader) SetReadWithWriter(writer io.Writer) *IReader {
return this.SetReadFunc(buf.NewReadWithWriter(writer))
}
// SetReadWithLenFrame 根据动态长度读取数据
func (this *IReader) SetReadWithLenFrame(f *buf.LenFrame) *IReader {
return this.SetReadFunc(buf.NewReadWithLen(f))
}
// SetReadWithTimeout 根据超时时间读取数据(需要及时读取,避免阻塞产生粘包)
func (this *IReader) SetReadWithTimeout(timeout time.Duration) *IReader {
return this.SetReadFunc(buf.NewReadWithTimeout(timeout))
}
// SetReadWithFrame 适配预大部分读取
func (this *IReader) SetReadWithFrame(f *buf.Frame) *IReader {
return this.SetReadFunc(buf.NewReadWithFrame(f))
}