-
Notifications
You must be signed in to change notification settings - Fork 256
/
Copy pathdirect.go
109 lines (89 loc) · 1.84 KB
/
direct.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package queue
import (
"errors"
"sync"
. "github.com/qiniu/logkit/utils/models"
)
const (
StatusInit int32 = iota
StatusClosed
)
var _ DataQueue = &directQueue{}
var _ LinesQueue = &directQueue{}
type directQueue struct {
name string
channel chan []Data
lineChan chan []string
mux sync.Mutex
status int32
quit chan bool
}
func NewDirectQueue(name string) BackendQueue {
return &directQueue{
name: name,
channel: make(chan []Data),
lineChan: make(chan []string),
mux: sync.Mutex{},
status: StatusInit,
quit: make(chan bool),
}
}
func (dq *directQueue) Name() string {
return dq.name
}
func (dq *directQueue) Put(msg []byte) error {
return errors.New("method Put is not supported, please use PutData")
}
func (dq *directQueue) ReadChan() <-chan []byte {
return make(chan []byte) // Blocks forever because no inputs
}
func (dq *directQueue) PutLines(datas []string) error {
dq.mux.Lock()
defer dq.mux.Unlock()
if dq.status == StatusClosed {
return ErrQueueClosed
}
select {
case dq.lineChan <- datas:
return nil
case <-dq.quit:
return ErrQueueClosed
}
}
func (dq *directQueue) ReadLinesChan() <-chan []string {
return dq.lineChan
}
func (dq *directQueue) PutDatas(datas []Data) error {
dq.mux.Lock()
defer dq.mux.Unlock()
if dq.status == StatusClosed {
return ErrQueueClosed
}
select {
case dq.channel <- datas:
return nil
case <-dq.quit:
return ErrQueueClosed
}
}
func (dq *directQueue) ReadDatasChan() <-chan []Data {
return dq.channel
}
func (dq *directQueue) Close() error {
close(dq.quit)
dq.mux.Lock()
defer dq.mux.Unlock()
dq.status = StatusClosed
close(dq.channel)
close(dq.lineChan)
return nil
}
func (dq *directQueue) Delete() error {
return dq.Close()
}
func (dq *directQueue) Depth() int64 {
return 0
}
func (dq *directQueue) Empty() error {
return nil
}