@@ -11,7 +11,6 @@ import (
11
11
"sync/atomic"
12
12
"time"
13
13
14
- "github.com/bitly/go-nsq"
15
14
"github.com/bitly/nsq/util"
16
15
"github.com/bitly/nsq/util/pqueue"
17
16
)
@@ -50,9 +49,9 @@ type Channel struct {
50
49
51
50
backend BackendQueue
52
51
53
- incomingMsgChan chan * nsq. Message
54
- memoryMsgChan chan * nsq. Message
55
- clientMsgChan chan * nsq. Message
52
+ incomingMsgChan chan * Message
53
+ memoryMsgChan chan * Message
54
+ clientMsgChan chan * Message
56
55
exitChan chan int
57
56
waitGroup util.WaitGroupWrapper
58
57
exitFlag int32
@@ -68,10 +67,10 @@ type Channel struct {
68
67
e2eProcessingLatencyStream * util.Quantile
69
68
70
69
// TODO: these can be DRYd up
71
- deferredMessages map [nsq. MessageID ]* pqueue.Item
70
+ deferredMessages map [MessageID ]* pqueue.Item
72
71
deferredPQ pqueue.PriorityQueue
73
72
deferredMutex sync.Mutex
74
- inFlightMessages map [nsq. MessageID ]* pqueue.Item
73
+ inFlightMessages map [MessageID ]* pqueue.Item
75
74
inFlightPQ pqueue.PriorityQueue
76
75
inFlightMutex sync.Mutex
77
76
@@ -80,7 +79,7 @@ type Channel struct {
80
79
}
81
80
82
81
type inFlightMessage struct {
83
- msg * nsq. Message
82
+ msg * Message
84
83
clientID int64
85
84
ts time.Time
86
85
}
@@ -92,9 +91,9 @@ func NewChannel(topicName string, channelName string, context *context,
92
91
c := & Channel {
93
92
topicName : topicName ,
94
93
name : channelName ,
95
- incomingMsgChan : make (chan * nsq. Message , 1 ),
96
- memoryMsgChan : make (chan * nsq. Message , context .nsqd .options .MemQueueSize ),
97
- clientMsgChan : make (chan * nsq. Message ),
94
+ incomingMsgChan : make (chan * Message , 1 ),
95
+ memoryMsgChan : make (chan * Message , context .nsqd .options .MemQueueSize ),
96
+ clientMsgChan : make (chan * Message ),
98
97
exitChan : make (chan int ),
99
98
clients : make (map [int64 ]Consumer ),
100
99
deleteCallback : deleteCallback ,
@@ -136,8 +135,8 @@ func NewChannel(topicName string, channelName string, context *context,
136
135
func (c * Channel ) initPQ () {
137
136
pqSize := int (math .Max (1 , float64 (c .context .nsqd .options .MemQueueSize )/ 10 ))
138
137
139
- c .inFlightMessages = make (map [nsq. MessageID ]* pqueue.Item )
140
- c .deferredMessages = make (map [nsq. MessageID ]* pqueue.Item )
138
+ c .inFlightMessages = make (map [MessageID ]* pqueue.Item )
139
+ c .deferredMessages = make (map [MessageID ]* pqueue.Item )
141
140
142
141
c .inFlightMutex .Lock ()
143
142
c .inFlightPQ = pqueue .New (pqSize )
@@ -273,7 +272,7 @@ finish:
273
272
}
274
273
275
274
for _ , item := range c .deferredMessages {
276
- msg := item .Value .(* nsq. Message )
275
+ msg := item .Value .(* Message )
277
276
err := writeMessageToBackend (& msgBuf , msg , c .backend )
278
277
if err != nil {
279
278
log .Printf ("ERROR: failed to write message to backend - %s" , err .Error ())
@@ -325,7 +324,7 @@ func (c *Channel) IsPaused() bool {
325
324
326
325
// PutMessage writes to the appropriate incoming message channel
327
326
// (which will be routed asynchronously)
328
- func (c * Channel ) PutMessage (msg * nsq. Message ) error {
327
+ func (c * Channel ) PutMessage (msg * Message ) error {
329
328
c .RLock ()
330
329
defer c .RUnlock ()
331
330
if atomic .LoadInt32 (& c .exitFlag ) == 1 {
@@ -337,7 +336,7 @@ func (c *Channel) PutMessage(msg *nsq.Message) error {
337
336
}
338
337
339
338
// TouchMessage resets the timeout for an in-flight message
340
- func (c * Channel ) TouchMessage (clientID int64 , id nsq. MessageID ) error {
339
+ func (c * Channel ) TouchMessage (clientID int64 , id MessageID ) error {
341
340
item , err := c .popInFlightMessage (clientID , id )
342
341
if err != nil {
343
342
return err
@@ -362,7 +361,7 @@ func (c *Channel) TouchMessage(clientID int64, id nsq.MessageID) error {
362
361
}
363
362
364
363
// FinishMessage successfully discards an in-flight message
365
- func (c * Channel ) FinishMessage (clientID int64 , id nsq. MessageID ) error {
364
+ func (c * Channel ) FinishMessage (clientID int64 , id MessageID ) error {
366
365
item , err := c .popInFlightMessage (clientID , id )
367
366
if err != nil {
368
367
return err
@@ -381,7 +380,7 @@ func (c *Channel) FinishMessage(clientID int64, id nsq.MessageID) error {
381
380
// `timeoutMs` > 0 - asynchronously wait for the specified timeout
382
381
// and requeue a message (aka "deferred requeue")
383
382
//
384
- func (c * Channel ) RequeueMessage (clientID int64 , id nsq. MessageID , timeout time.Duration ) error {
383
+ func (c * Channel ) RequeueMessage (clientID int64 , id MessageID , timeout time.Duration ) error {
385
384
// remove from inflight first
386
385
item , err := c .popInFlightMessage (clientID , id )
387
386
if err != nil {
@@ -427,7 +426,7 @@ func (c *Channel) RemoveClient(clientID int64) {
427
426
}
428
427
}
429
428
430
- func (c * Channel ) StartInFlightTimeout (msg * nsq. Message , clientID int64 , timeout time.Duration ) error {
429
+ func (c * Channel ) StartInFlightTimeout (msg * Message , clientID int64 , timeout time.Duration ) error {
431
430
now := time .Now ()
432
431
value := & inFlightMessage {msg , clientID , now }
433
432
absTs := now .Add (timeout ).UnixNano ()
@@ -440,7 +439,7 @@ func (c *Channel) StartInFlightTimeout(msg *nsq.Message, clientID int64, timeout
440
439
return nil
441
440
}
442
441
443
- func (c * Channel ) StartDeferredTimeout (msg * nsq. Message , timeout time.Duration ) error {
442
+ func (c * Channel ) StartDeferredTimeout (msg * Message , timeout time.Duration ) error {
444
443
absTs := time .Now ().Add (timeout ).UnixNano ()
445
444
item := & pqueue.Item {Value : msg , Priority : absTs }
446
445
err := c .pushDeferredMessage (item )
@@ -452,7 +451,7 @@ func (c *Channel) StartDeferredTimeout(msg *nsq.Message, timeout time.Duration)
452
451
}
453
452
454
453
// doRequeue performs the low level operations to requeue a message
455
- func (c * Channel ) doRequeue (msg * nsq. Message ) error {
454
+ func (c * Channel ) doRequeue (msg * Message ) error {
456
455
c .RLock ()
457
456
defer c .RUnlock ()
458
457
if atomic .LoadInt32 (& c .exitFlag ) == 1 {
@@ -468,7 +467,7 @@ func (c *Channel) pushInFlightMessage(item *pqueue.Item) error {
468
467
c .Lock ()
469
468
defer c .Unlock ()
470
469
471
- id := item .Value .(* inFlightMessage ).msg .Id
470
+ id := item .Value .(* inFlightMessage ).msg .ID
472
471
_ , ok := c .inFlightMessages [id ]
473
472
if ok {
474
473
return errors .New ("ID already in flight" )
@@ -479,7 +478,7 @@ func (c *Channel) pushInFlightMessage(item *pqueue.Item) error {
479
478
}
480
479
481
480
// popInFlightMessage atomically removes a message from the in-flight dictionary
482
- func (c * Channel ) popInFlightMessage (clientID int64 , id nsq. MessageID ) (* pqueue.Item , error ) {
481
+ func (c * Channel ) popInFlightMessage (clientID int64 , id MessageID ) (* pqueue.Item , error ) {
483
482
c .Lock ()
484
483
defer c .Unlock ()
485
484
@@ -521,7 +520,7 @@ func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
521
520
defer c .Unlock ()
522
521
523
522
// TODO: these map lookups are costly
524
- id := item .Value .(* nsq. Message ).Id
523
+ id := item .Value .(* Message ).ID
525
524
_ , ok := c .deferredMessages [id ]
526
525
if ok {
527
526
return errors .New ("ID already deferred" )
@@ -531,7 +530,7 @@ func (c *Channel) pushDeferredMessage(item *pqueue.Item) error {
531
530
return nil
532
531
}
533
532
534
- func (c * Channel ) popDeferredMessage (id nsq. MessageID ) (* pqueue.Item , error ) {
533
+ func (c * Channel ) popDeferredMessage (id MessageID ) (* pqueue.Item , error ) {
535
534
c .Lock ()
536
535
defer c .Unlock ()
537
536
@@ -578,7 +577,7 @@ func (c *Channel) router() {
578
577
// it is also performs in-flight accounting and initiates the auto-requeue
579
578
// goroutine
580
579
func (c * Channel ) messagePump () {
581
- var msg * nsq. Message
580
+ var msg * Message
582
581
var buf []byte
583
582
var err error
584
583
@@ -593,7 +592,7 @@ func (c *Channel) messagePump() {
593
592
select {
594
593
case msg = <- c .memoryMsgChan :
595
594
case buf = <- c .backend .ReadChan ():
596
- msg , err = nsq . DecodeMessage (buf )
595
+ msg , err = decodeMessage (buf )
597
596
if err != nil {
598
597
log .Printf ("ERROR: failed to decode message - %s" , err .Error ())
599
598
continue
@@ -617,8 +616,8 @@ exit:
617
616
618
617
func (c * Channel ) deferredWorker () {
619
618
c .pqWorker (& c .deferredPQ , & c .deferredMutex , func (item * pqueue.Item ) {
620
- msg := item .Value .(* nsq. Message )
621
- _ , err := c .popDeferredMessage (msg .Id )
619
+ msg := item .Value .(* Message )
620
+ _ , err := c .popDeferredMessage (msg .ID )
622
621
if err != nil {
623
622
return
624
623
}
@@ -630,7 +629,7 @@ func (c *Channel) inFlightWorker() {
630
629
c .pqWorker (& c .inFlightPQ , & c .inFlightMutex , func (item * pqueue.Item ) {
631
630
clientID := item .Value .(* inFlightMessage ).clientID
632
631
msg := item .Value .(* inFlightMessage ).msg
633
- _ , err := c .popInFlightMessage (clientID , msg .Id )
632
+ _ , err := c .popInFlightMessage (clientID , msg .ID )
634
633
if err != nil {
635
634
return
636
635
}
0 commit comments