Skip to content

Commit 3abe106

Browse files
committed
More descriptive names for event structs and files
This is to eliminate some confusion around how events are handled in Swarm. Signed-off-by: Nishant Totla <[email protected]>
1 parent eb9d3dc commit 3abe106

File tree

9 files changed

+168
-162
lines changed

9 files changed

+168
-162
lines changed

api/handlers.go

-41
Original file line numberDiff line numberDiff line change
@@ -853,47 +853,6 @@ func postImagesLoad(c *context, w http.ResponseWriter, r *http.Request) {
853853
if errorFound {
854854
sendErrorJSONMessage(wf, 1, errorMessage)
855855
}
856-
857-
}
858-
859-
// normalizeEvent takes a cluster Event and ensures backward compatibility
860-
// and all the right fields filled up
861-
func normalizeEvent(receivedEvent *cluster.Event) ([]byte, error) {
862-
// make a local copy of the event
863-
e := *receivedEvent
864-
// make a fresh copy of the Actor.Attributes map to prevent a race condition
865-
e.Actor.Attributes = make(map[string]string)
866-
for k, v := range receivedEvent.Actor.Attributes {
867-
e.Actor.Attributes[k] = v
868-
}
869-
870-
// remove this hack once 1.10 is broadly adopted
871-
e.From = e.From + " node:" + e.Engine.Name
872-
873-
e.Actor.Attributes["node.name"] = e.Engine.Name
874-
e.Actor.Attributes["node.id"] = e.Engine.ID
875-
e.Actor.Attributes["node.addr"] = e.Engine.Addr
876-
e.Actor.Attributes["node.ip"] = e.Engine.IP
877-
878-
data, err := json.Marshal(&e)
879-
if err != nil {
880-
return nil, err
881-
}
882-
883-
// remove the node field once 1.10 is broadly adopted & interlock stops relying on it
884-
node := fmt.Sprintf(",%q:{%q:%q,%q:%q,%q:%q,%q:%q}}",
885-
"node",
886-
"Name", e.Engine.Name,
887-
"Id", e.Engine.ID,
888-
"Addr", e.Engine.Addr,
889-
"Ip", e.Engine.IP,
890-
)
891-
892-
// insert Node field
893-
data = data[:len(data)-1]
894-
data = append(data, []byte(node)...)
895-
896-
return data, nil
897856
}
898857

899858
// GET /events

api/primary.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
// Primary router context, used by handlers.
1515
type context struct {
1616
cluster cluster.Cluster
17-
eventsHandler *cluster.EventsHandler
17+
eventsHandler *cluster.APIEventHandler
1818
listenerCount *uint64
1919
statusHandler StatusHandler
2020
debug bool
@@ -119,11 +119,11 @@ func profilerSetup(mainRouter *mux.Router, path string) {
119119
func NewPrimary(cluster cluster.Cluster, tlsConfig *tls.Config, status StatusHandler, debug, enableCors bool) *mux.Router {
120120
// Register the API events handler in the cluster.
121121

122-
// NewEventsHandler creates a new eventsHandler object. The new eventsHandler
122+
// NewAPIEventHandler creates a new eventsHandler object. The new eventsHandler
123123
// is initialized with no writers or channels. This is in api/events.go and
124124
// uses the watch package from SwarmKit, which is based on the go-events
125125
// package. See https://github.com/docker/swarm/issues/2718 for context
126-
eventsHandler := cluster.NewAPIEventsHandler()
126+
eventsHandler := cluster.NewAPIEventHandler()
127127
listenerCount := uint64(0)
128128
// need to add this queue to the cluster
129129
// This just calls c.eventHandlers.RegisterEventHandler(eventsHandler) internally.

api/utils.go

+40
Original file line numberDiff line numberDiff line change
@@ -388,3 +388,43 @@ func matchImageOSError(errMsg string) string {
388388
}
389389
return results[1]
390390
}
391+
392+
// normalizeEvent takes a cluster Event and ensures backward compatibility
393+
// and all the right fields filled up
394+
func normalizeEvent(receivedEvent *cluster.Event) ([]byte, error) {
395+
// make a local copy of the event
396+
e := *receivedEvent
397+
// make a fresh copy of the Actor.Attributes map to prevent a race condition
398+
e.Actor.Attributes = make(map[string]string)
399+
for k, v := range receivedEvent.Actor.Attributes {
400+
e.Actor.Attributes[k] = v
401+
}
402+
403+
// remove this hack once 1.10 is broadly adopted
404+
e.From = e.From + " node:" + e.Engine.Name
405+
406+
e.Actor.Attributes["node.name"] = e.Engine.Name
407+
e.Actor.Attributes["node.id"] = e.Engine.ID
408+
e.Actor.Attributes["node.addr"] = e.Engine.Addr
409+
e.Actor.Attributes["node.ip"] = e.Engine.IP
410+
411+
data, err := json.Marshal(&e)
412+
if err != nil {
413+
return nil, err
414+
}
415+
416+
// remove the node field once 1.10 is broadly adopted & interlock stops relying on it
417+
node := fmt.Sprintf(",%q:{%q:%q,%q:%q,%q:%q,%q:%q}}",
418+
"node",
419+
"Name", e.Engine.Name,
420+
"Id", e.Engine.ID,
421+
"Addr", e.Engine.Addr,
422+
"Ip", e.Engine.IP,
423+
)
424+
425+
// insert Node field
426+
data = data[:len(data)-1]
427+
data = append(data, []byte(node)...)
428+
429+
return data, nil
430+
}

cluster/api_events.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package cluster
2+
3+
import (
4+
"time"
5+
6+
"github.com/docker/go-events"
7+
"github.com/docker/swarmkit/watch"
8+
)
9+
10+
const (
11+
defaultEventQueueLimit = 10000
12+
defaultEventQueueTimeout = 10 * time.Second
13+
)
14+
15+
// APIEventsHandler broadcasts events to multiple client listeners.
16+
type APIEventHandler struct {
17+
watchQueue *watch.Queue
18+
}
19+
20+
// NewAPIEventHandler creates a new APIEventsHandler for a cluster.
21+
// The new eventsHandler is initialized with no writers or channels.
22+
func NewAPIEventHandler() *APIEventHandler {
23+
return &APIEventHandler{
24+
watchQueue: watch.NewQueue(watch.WithTimeout(defaultEventQueueTimeout), watch.WithLimit(defaultEventQueueLimit), watch.WithCloseOutChan()),
25+
}
26+
}
27+
28+
// Add adds the writer and a new channel for the remote address.
29+
func (eh *APIEventHandler) Watch() (eventq chan events.Event, cancel func()) {
30+
eventq, cancel = eh.watchQueue.Watch()
31+
return eventq, cancel
32+
}
33+
34+
func (eh *APIEventHandler) cleanupHandler(remoteAddr string) {
35+
eh.watchQueue.Close()
36+
}
37+
38+
// Handle writes information about a cluster event to each remote address in the cluster that has been added to the events handler.
39+
// After an unsuccessful write to a remote address, the associated channel is closed and the address is removed from the events handler.
40+
func (eh *APIEventHandler) Handle(e *Event) error {
41+
eh.watchQueue.Publish(e)
42+
return nil
43+
}
44+
45+
// Size returns the number of remote addresses that the events handler currently contains.
46+
func (eh *APIEventHandler) Size() int {
47+
return 0
48+
}

cluster/cluster.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ type Cluster interface {
8181
// UnregisterEventHandler unregisters an event handler.
8282
UnregisterEventHandler(h EventHandler)
8383

84-
// NewAPIEventsHandler creates a new API events handler
85-
NewAPIEventsHandler() *EventsHandler
84+
// NewAPIEventHandler creates a new API events handler
85+
NewAPIEventHandler() *APIEventHandler
8686

8787
// CloseWatchQueue closes the watchQueue when the manager shuts down.
8888
CloseWatchQueue()

cluster/event.go cluster/event_map.go

+8-21
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,23 @@ import (
55
"sync"
66

77
log "github.com/Sirupsen/logrus"
8-
"github.com/docker/docker/api/types/events"
98
)
109

11-
// Event is exported
12-
type Event struct {
13-
events.Message
14-
Engine *Engine `json:"-"`
15-
}
16-
17-
// EventHandler is exported
18-
type EventHandler interface {
19-
Handle(*Event) error
20-
}
21-
22-
// EventHandlers is a map of EventHandler
23-
type EventHandlers struct {
10+
// ClusterEventHandlers is a map of EventHandler
11+
type ClusterEventHandlers struct {
2412
sync.RWMutex
25-
2613
eventHandlers map[EventHandler]struct{}
2714
}
2815

29-
// NewEventHandlers returns an EventHandlers
30-
func NewEventHandlers() *EventHandlers {
31-
return &EventHandlers{
16+
// NewClusterEventHandlers returns an EventHandlers
17+
func NewClusterEventHandlers() *ClusterEventHandlers {
18+
return &ClusterEventHandlers{
3219
eventHandlers: make(map[EventHandler]struct{}),
3320
}
3421
}
3522

3623
// Handle callbacks for the events
37-
func (eh *EventHandlers) Handle(e *Event) {
24+
func (eh *ClusterEventHandlers) Handle(e *Event) {
3825
eh.RLock()
3926
defer eh.RUnlock()
4027

@@ -46,7 +33,7 @@ func (eh *EventHandlers) Handle(e *Event) {
4633
}
4734

4835
// RegisterEventHandler registers an event handler.
49-
func (eh *EventHandlers) RegisterEventHandler(h EventHandler) error {
36+
func (eh *ClusterEventHandlers) RegisterEventHandler(h EventHandler) error {
5037
eh.Lock()
5138
defer eh.Unlock()
5239

@@ -58,7 +45,7 @@ func (eh *EventHandlers) RegisterEventHandler(h EventHandler) error {
5845
}
5946

6047
// UnregisterEventHandler unregisters a previously registered event handler.
61-
func (eh *EventHandlers) UnregisterEventHandler(h EventHandler) {
48+
func (eh *ClusterEventHandlers) UnregisterEventHandler(h EventHandler) {
6249
eh.Lock()
6350
defer eh.Unlock()
6451

cluster/events.go

+13-41
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,20 @@
11
package cluster
22

3-
import (
4-
"time"
3+
import "github.com/docker/docker/api/types/events"
54

6-
"github.com/docker/go-events"
7-
"github.com/docker/swarmkit/watch"
8-
)
9-
10-
const (
11-
defaultEventQueueLimit = 10000
12-
defaultEventQueueTimeout = 10 * time.Second
13-
)
14-
15-
// EventsHandler broadcasts events to multiple client listeners.
16-
type EventsHandler struct {
17-
watchQueue *watch.Queue
5+
// Event is exported
6+
type Event struct {
7+
events.Message
8+
Engine *Engine `json:"-"`
189
}
1910

20-
// NewEventsHandler creates a new EventsHandler for a cluster.
21-
// The new eventsHandler is initialized with no writers or channels.
22-
func NewEventsHandler() *EventsHandler {
23-
return &EventsHandler{
24-
watchQueue: watch.NewQueue(watch.WithTimeout(defaultEventQueueTimeout), watch.WithLimit(defaultEventQueueLimit), watch.WithCloseOutChan()),
25-
}
11+
// EventHandler is exported
12+
type EventHandler interface {
13+
Handle(*Event) error
2614
}
2715

28-
// Add adds the writer and a new channel for the remote address.
29-
func (eh *EventsHandler) Watch() (eventq chan events.Event, cancel func()) {
30-
eventq, cancel = eh.watchQueue.Watch()
31-
return eventq, cancel
32-
}
33-
34-
func (eh *EventsHandler) cleanupHandler(remoteAddr string) {
35-
eh.watchQueue.Close()
36-
}
37-
38-
// Handle writes information about a cluster event to each remote address in the cluster that has been added to the events handler.
39-
// After an unsuccessful write to a remote address, the associated channel is closed and the address is removed from the events handler.
40-
func (eh *EventsHandler) Handle(e *Event) error {
41-
eh.watchQueue.Publish(e)
42-
return nil
43-
}
44-
45-
// Size returns the number of remote addresses that the events handler currently contains.
46-
func (eh *EventsHandler) Size() int {
47-
return 0
48-
}
16+
// EventHandler is implemented by all event handlers in Swarm
17+
// - APIEventHandler: Handles API level events
18+
// - Watchdog: Handles events related to rescheduling
19+
// - Cluster: Acts as a proxy event handler for the engine, but essentially
20+
// punts all handling to the above two handlers

cluster/mesos/cluster.go

+29-29
Original file line numberDiff line numberDiff line change
@@ -31,18 +31,18 @@ import (
3131
type Cluster struct {
3232
sync.RWMutex
3333

34-
dockerEnginePort string
35-
eventHandlers *cluster.EventHandlers
36-
master string
37-
agents map[string]*agent
38-
scheduler *Scheduler
39-
TLSConfig *tls.Config
40-
options *cluster.DriverOpts
41-
offerTimeout time.Duration
42-
refuseTimeout time.Duration
43-
taskCreationTimeout time.Duration
44-
pendingTasks *task.Tasks
45-
engineOpts *cluster.EngineOpts
34+
dockerEnginePort string
35+
clusterEventHandlers *cluster.ClusterEventHandlers
36+
master string
37+
agents map[string]*agent
38+
scheduler *Scheduler
39+
TLSConfig *tls.Config
40+
options *cluster.DriverOpts
41+
offerTimeout time.Duration
42+
refuseTimeout time.Duration
43+
taskCreationTimeout time.Duration
44+
pendingTasks *task.Tasks
45+
engineOpts *cluster.EngineOpts
4646
}
4747

4848
const (
@@ -69,16 +69,16 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
6969
flag.Lookup("logtostderr").Value.Set("true")
7070
}
7171
cluster := &Cluster{
72-
dockerEnginePort: defaultDockerEnginePort,
73-
eventHandlers: cluster.NewEventHandlers(),
74-
master: master,
75-
agents: make(map[string]*agent),
76-
TLSConfig: TLSConfig,
77-
options: &options,
78-
offerTimeout: defaultOfferTimeout,
79-
taskCreationTimeout: defaultTaskCreationTimeout,
80-
engineOpts: engineOptions,
81-
refuseTimeout: defaultRefuseTimeout,
72+
dockerEnginePort: defaultDockerEnginePort,
73+
clusterEventHandlers: cluster.NewClusterEventHandlers(),
74+
master: master,
75+
agents: make(map[string]*agent),
76+
TLSConfig: TLSConfig,
77+
options: &options,
78+
offerTimeout: defaultOfferTimeout,
79+
taskCreationTimeout: defaultTaskCreationTimeout,
80+
engineOpts: engineOptions,
81+
refuseTimeout: defaultRefuseTimeout,
8282
}
8383

8484
cluster.pendingTasks = task.NewTasks(cluster)
@@ -167,24 +167,24 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
167167

168168
// Handle callbacks for the events
169169
func (c *Cluster) Handle(e *cluster.Event) error {
170-
// call Handle for all eventHandlers
171-
c.eventHandlers.Handle(e)
170+
// call Handle for all clusterEventHandlers
171+
c.clusterEventHandlers.Handle(e)
172172
return nil
173173
}
174174

175175
// RegisterEventHandler registers an event handler.
176176
func (c *Cluster) RegisterEventHandler(h cluster.EventHandler) error {
177-
return c.eventHandlers.RegisterEventHandler(h)
177+
return c.clusterEventHandlers.RegisterEventHandler(h)
178178
}
179179

180180
// UnregisterEventHandler unregisters a previously registered event handler.
181181
func (c *Cluster) UnregisterEventHandler(h cluster.EventHandler) {
182-
c.eventHandlers.UnregisterEventHandler(h)
182+
c.clusterEventHandlers.UnregisterEventHandler(h)
183183
}
184184

185-
// NewAPIEventsHandler creates a new API events handler
186-
func (c *Cluster) NewAPIEventsHandler() *cluster.EventsHandler {
187-
return cluster.NewEventsHandler()
185+
// NewAPIEventHandler creates a new API events handler
186+
func (c *Cluster) NewAPIEventHandler() *cluster.APIEventHandler {
187+
return cluster.NewAPIEventHandler()
188188
}
189189

190190
// CloseWatchQueue closes the watchQueue when the manager shuts down.

0 commit comments

Comments
 (0)