Skip to content

Commit

Permalink
Modify consumer to receiver message channel (Comcast#1)
Browse files Browse the repository at this point in the history
Instead of creating its own internal channel, the consumer new receives
one at creation. This allows for the ManagedConsumer to use the same
channel for a consumer that is recreated. This will ensure that messages
are dropped when a consumer is closed unexpectedly.
  • Loading branch information
awilliams authored Jun 28, 2018
1 parent 0501834 commit 9b67df1
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 31 deletions.
12 changes: 6 additions & 6 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,24 +227,24 @@ func (c *Client) NewProducer(ctx context.Context, topic, producerName string) (*
// given topic.
// See "Subscription modes" for more information:
// https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queueSize int) (*Consumer, error) {
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Shared, queueSize)
func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error) {
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Shared, queue)
}

// NewExclusiveConsumer creates a new exclusive consumer capable of reading messages from the
// given topic.
// See "Subscription modes" for more information:
// https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, queueSize int) (*Consumer, error) {
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Exclusive, queueSize)
func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error) {
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Exclusive, queue)
}

// NewFailoverConsumer creates a new failover consumer capable of reading messages from the
// given topic.
// See "Subscription modes" for more information:
// https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queueSize int) (*Consumer, error) {
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Failover, queueSize)
func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error) {
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Failover, queue)
}

// handleFrame is called by the underlaying conn with
Expand Down
10 changes: 5 additions & 5 deletions client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestClient_Int_PubSub(t *testing.T) {
subName := randString(16)
for i := range consumers {
name := fmt.Sprintf("%s-%d", subName, i)
consumers[i], err = c.NewExclusiveConsumer(ctx, topic, name, N)
consumers[i], err = c.NewExclusiveConsumer(ctx, topic, name, make(chan Message, N))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestClient_Int_ServerInitiatedTopicClose(t *testing.T) {
t.Log(topicResp.String())

subscriptionName := randString(32)
topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, subscriptionName, 1)
topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, subscriptionName, make(chan Message, 1))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestClient_Int_Unsubscribe(t *testing.T) {
}
t.Log(topicResp.String())

topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, randString(32), 1)
topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, randString(32), make(chan Message, 1))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestClient_Int_RedeliverOverflow(t *testing.T) {
}

// create single consumer with buffer size 1
cs, err := c.NewSharedConsumer(ctx, topic, randString(16), 1)
cs, err := c.NewSharedConsumer(ctx, topic, randString(16), make(chan Message, 1))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -557,7 +557,7 @@ func TestClient_Int_RedeliverAll(t *testing.T) {
}

// create single consumer with buffer size N
cs, err := c.NewExclusiveConsumer(ctx, topic, randString(16), N)
cs, err := c.NewExclusiveConsumer(ctx, topic, randString(16), make(chan Message, N))
if err != nil {
t.Fatal(err)
}
Expand Down
15 changes: 8 additions & 7 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ const maxRedeliverUnacknowledged = 1000

// newConsumer returns a ready-to-use consumer.
// A consumer is used to attach to a subscription and
// consume messages from it.
func newConsumer(s cmdSender, dispatcher *frameDispatcher, topic string, reqID *monotonicID, consumerID uint64, queueSize int) *Consumer {
// consumes messages from it. The provided channel is sent
// all messages the consumer receives.
func newConsumer(s cmdSender, dispatcher *frameDispatcher, topic string, reqID *monotonicID, consumerID uint64, queue chan Message) *Consumer {
return &Consumer{
s: s,
topic: topic,
consumerID: consumerID,
reqID: reqID,
dispatcher: dispatcher,
msgs: make(chan Message, queueSize),
queue: queue,
closedc: make(chan struct{}),
endOfTopicc: make(chan struct{}),
}
Expand All @@ -56,7 +57,7 @@ type Consumer struct {

dispatcher *frameDispatcher // handles request/response state

msgs chan Message
queue chan Message

omu sync.Mutex // protects following
overflow []*api.MessageIdData // IDs of messages that were dropped because of full buffer
Expand Down Expand Up @@ -212,7 +213,7 @@ func (c *Consumer) handleReachedEndOfTopic(f Frame) error {
// Messages returns a read-only channel that callers
// should read from until the subscription is closed.
func (c *Consumer) Messages() <-chan Message {
return c.msgs
return c.queue
}

// RedeliverUnacknowledged sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request
Expand Down Expand Up @@ -291,7 +292,7 @@ func (c *Consumer) handleMessage(f Frame) error {
}

select {
case c.msgs <- m:
case c.queue <- m:
return nil

default:
Expand All @@ -311,6 +312,6 @@ func (c *Consumer) handleMessage(f Frame) error {
}
c.omu.Unlock()

return fmt.Errorf("consumer message queue on topic %q is full (capacity = %d)", c.topic, cap(c.msgs))
return fmt.Errorf("consumer message queue on topic %q is full (capacity = %d)", c.topic, cap(c.queue))
}
}
14 changes: 7 additions & 7 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestConsumer_Flow(t *testing.T) {
reqID := monotonicID{id}
dispatcher := newFrameDispatcher()

c := newConsumer(&ms, dispatcher, "test", &reqID, consID, 1)
c := newConsumer(&ms, dispatcher, "test", &reqID, consID, make(chan Message, 1))

if err := c.Flow(123); err != nil {
t.Fatalf("Flow() err = %v; nil expected", err)
Expand All @@ -51,7 +51,7 @@ func TestConsumer_Close_Success(t *testing.T) {
reqID := monotonicID{id}
dispatcher := newFrameDispatcher()

c := newConsumer(&ms, dispatcher, "test", &reqID, consID, 1)
c := newConsumer(&ms, dispatcher, "test", &reqID, consID, make(chan Message, 1))

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestConsumer_handleMessage(t *testing.T) {
reqID := monotonicID{id}
dispatcher := newFrameDispatcher()

c := newConsumer(&ms, dispatcher, "test", &reqID, consID, 1)
c := newConsumer(&ms, dispatcher, "test", &reqID, consID, make(chan Message, 1))

f := Frame{
BaseCmd: &api.BaseCommand{
Expand Down Expand Up @@ -157,7 +157,7 @@ func TestConsumer_handleMessage_fullQueue(t *testing.T) {
dispatcher := newFrameDispatcher()

queueSize := 3
c := newConsumer(&ms, dispatcher, "test", &reqID, consID, queueSize)
c := newConsumer(&ms, dispatcher, "test", &reqID, consID, make(chan Message, queueSize))

f := Frame{
BaseCmd: &api.BaseCommand{
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestConsumer_handleCloseConsumer(t *testing.T) {
reqID := monotonicID{id}
dispatcher := newFrameDispatcher()

c := newConsumer(&ms, dispatcher, "test", &reqID, consID, 1)
c := newConsumer(&ms, dispatcher, "test", &reqID, consID, make(chan Message, 1))

select {
case <-c.Closed():
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestConsumer_handleReachedEndOfTopic(t *testing.T) {
reqID := monotonicID{id}
dispatcher := newFrameDispatcher()

c := newConsumer(&ms, dispatcher, "test", &reqID, consID, 1)
c := newConsumer(&ms, dispatcher, "test", &reqID, consID, make(chan Message, 1))

select {
case <-c.ReachedEndOfTopic():
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestConsumer_RedeliverOverflow(t *testing.T) {

queueSize := 1
N := 8 // number of messages to push to consumer
c := newConsumer(&ms, dispatcher, "test", &reqID, consID, queueSize)
c := newConsumer(&ms, dispatcher, "test", &reqID, consID, make(chan Message, queueSize))

for i := 0; i < N; i++ {
entryID := uint64(i)
Expand Down
7 changes: 5 additions & 2 deletions managed_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func NewManagedConsumer(cp *ManagedClientPool, cfg ManagedConsumerConfig) *Manag
clientPool: cp,
cfg: cfg,
asyncErrs: asyncErrors(cfg.Errs),
queue: make(chan Message, cfg.QueueSize),
waitc: make(chan struct{}),
}

Expand All @@ -77,6 +78,8 @@ type ManagedConsumer struct {
cfg ManagedConsumerConfig
asyncErrs asyncErrors

queue chan Message

mu sync.RWMutex // protects following
consumer *Consumer // either consumer is nil and wait isn't or vice versa
waitc chan struct{} // if consumer is nil, this will unblock when it's been re-set
Expand Down Expand Up @@ -168,9 +171,9 @@ func (m *ManagedConsumer) newConsumer(ctx context.Context) (*Consumer, error) {

// Create the topic consumer. A non-blank consumer name is required.
if m.cfg.Exclusive {
return client.NewExclusiveConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.cfg.QueueSize)
return client.NewExclusiveConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.queue)
}
return client.NewSharedConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.cfg.QueueSize)
return client.NewSharedConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.queue)
}

// reconnect blocks while a new Consumer is created.
Expand Down
4 changes: 2 additions & 2 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type pubsub struct {

// subscribe subscribes to the given topic. The queueSize determines the buffer
// size of the Consumer.Messages() channel.
func (t *pubsub) subscribe(ctx context.Context, topic, sub string, subType api.CommandSubscribe_SubType, queueSize int) (*Consumer, error) {
func (t *pubsub) subscribe(ctx context.Context, topic, sub string, subType api.CommandSubscribe_SubType, queue chan Message) (*Consumer, error) {
requestID := t.reqID.next()
consumerID := t.consumerID.next()

Expand All @@ -69,7 +69,7 @@ func (t *pubsub) subscribe(ctx context.Context, topic, sub string, subType api.C
}
defer cancel()

c := newConsumer(t.s, t.dispatcher, topic, t.reqID, *consumerID, queueSize)
c := newConsumer(t.s, t.dispatcher, topic, t.reqID, *consumerID, queue)
// the new subscription needs to be added to the map
// before sending the subscribe command, otherwise there'd
// be a race between receiving the success result and
Expand Down
4 changes: 2 additions & 2 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestPubsub_Subscribe_Success(t *testing.T) {

go func() {
var r response
r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, 1)
r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, make(chan Message, 1))
resp <- r
}()

Expand Down Expand Up @@ -106,7 +106,7 @@ func TestPubsub_Subscribe_Error(t *testing.T) {

go func() {
var r response
r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, 1)
r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, make(chan Message, 1))
resp <- r
}()

Expand Down

0 comments on commit 9b67df1

Please sign in to comment.