forked from docker-archive/classicswarm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapi_events.go
61 lines (51 loc) · 1.74 KB
/
api_events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package cluster
import (
"sync/atomic"
"time"
"github.com/docker/go-events"
"github.com/docker/swarmkit/watch"
)
const (
defaultEventQueueLimit = 10000
defaultEventQueueTimeout = 10 * time.Second
)
// APIEventHandler broadcasts events to multiple client listeners.
type APIEventHandler struct {
listenerCount *uint64
watchQueue *watch.Queue
}
// NewAPIEventHandler creates a new APIEventsHandler for a cluster.
// The new eventsHandler is initialized with no writers or channels.
func NewAPIEventHandler() *APIEventHandler {
count := uint64(0)
return &APIEventHandler{
listenerCount: &count,
watchQueue: watch.NewQueue(watch.WithTimeout(defaultEventQueueTimeout), watch.WithLimit(defaultEventQueueLimit), watch.WithCloseOutChan()),
}
}
// Watch adds the writer and a new channel for the remote address.
func (eh *APIEventHandler) Watch() (chan events.Event, func()) {
// create a new queue and subscribe to it
eventq, cancelFunc := eh.watchQueue.Watch()
// increment counter
atomic.AddUint64(eh.listenerCount, 1)
cancel := func() {
// decrement counter
atomic.AddUint64(eh.listenerCount, ^uint64(0))
cancelFunc()
}
return eventq, cancel
}
func (eh *APIEventHandler) cleanupHandler() {
eh.watchQueue.Close()
}
// Handle writes information about a cluster event to each remote address in the cluster that has been added to the events handler.
// After an unsuccessful write to a remote address, the associated channel is closed and the address is removed from the events handler.
func (eh *APIEventHandler) Handle(e *Event) error {
eh.watchQueue.Publish(e)
return nil
}
// Size returns the number of event queues currently listening for events
func (eh *APIEventHandler) Size() int {
return int(atomic.LoadUint64(eh.listenerCount))
}