Skip to content

Commit

Permalink
Consumer/position re-consume unacked or from topic start (Comcast#16)
Browse files Browse the repository at this point in the history
* Add ability to consume from beginning of topic

This allows for the `InitialPosition` field to be set when creating a
new subscriber. More tests should be added around this in the future.
For now, it's only available for "exclusive" consumers, but could be
added to other subscription types.

* expose options for re-reading unacked messages (Comcast#13)

* expose options for re-reading unacked messages

Signed-off-by: David Walter <[email protected]>

* fix comment typos

Signed-off-by: David Walter <[email protected]>

* Per review: prefer caller returns result directly

Signed-off-by: David Walter <[email protected]>

* update comment wording
  • Loading branch information
davidwalter0 authored Jan 25, 2019
1 parent c0ba52b commit 075cf83
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 19 deletions.
12 changes: 8 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,23 +222,27 @@ func (c *Client) NewProducer(ctx context.Context, topic, producerName string) (*
// See "Subscription modes" for more information:
// https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error) {
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Shared, queue)
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Shared, api.CommandSubscribe_Latest, queue)
}

// NewExclusiveConsumer creates a new exclusive consumer capable of reading messages from the
// given topic.
// See "Subscription modes" for more information:
// https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error) {
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Exclusive, queue)
func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, earliest bool, queue chan Message) (*Consumer, error) {
initialPosition := api.CommandSubscribe_Latest
if earliest {
initialPosition = api.CommandSubscribe_Earliest
}
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Exclusive, initialPosition, queue)
}

// NewFailoverConsumer creates a new failover consumer capable of reading messages from the
// given topic.
// See "Subscription modes" for more information:
// https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl
func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error) {
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Failover, queue)
return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Failover, api.CommandSubscribe_Latest, queue)
}

// handleFrame is called by the underlaying conn with
Expand Down
8 changes: 4 additions & 4 deletions client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestClient_Int_PubSub(t *testing.T) {
subName := randString(16)
for i := range consumers {
name := fmt.Sprintf("%s-%d", subName, i)
consumers[i], err = c.NewExclusiveConsumer(ctx, topic, name, make(chan Message, N))
consumers[i], err = c.NewExclusiveConsumer(ctx, topic, name, false, make(chan Message, N))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestClient_Int_ServerInitiatedTopicClose(t *testing.T) {
t.Log(topicResp.String())

subscriptionName := randString(32)
topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, subscriptionName, make(chan Message, 1))
topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, subscriptionName, false, make(chan Message, 1))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestClient_Int_Unsubscribe(t *testing.T) {
}
t.Log(topicResp.String())

topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, randString(32), make(chan Message, 1))
topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, randString(32), false, make(chan Message, 1))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestClient_Int_RedeliverAll(t *testing.T) {
}

// create single consumer with buffer size N
cs, err := c.NewExclusiveConsumer(ctx, topic, randString(16), make(chan Message, N))
cs, err := c.NewExclusiveConsumer(ctx, topic, randString(16), false, make(chan Message, N))
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,8 @@ func (c *Consumer) handleReachedEndOfTopic(f frame.Frame) error {
return nil
}

// RedeliverUnacknowledged sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request
// for all messages that have not been acked.
// RedeliverUnacknowledged uses the protocol option
// REDELIVER_UNACKNOWLEDGED_MESSAGES to re-retrieve unacked messages.
func (c *Consumer) RedeliverUnacknowledged(ctx context.Context) error {
cmd := api.BaseCommand{
Type: api.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES.Enum(),
Expand Down
74 changes: 73 additions & 1 deletion managed_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ManagedConsumerConfig struct {
Topic string
Name string // subscription name
Exclusive bool // if false, subscription is shared
Earliest bool // if true, subscription cursor set to beginning
QueueSize int // number of messages to buffer before dropping messages

NewConsumerTimeout time.Duration // maximum duration to create Consumer, including topic lookup
Expand Down Expand Up @@ -277,7 +278,7 @@ func (m *ManagedConsumer) newConsumer(ctx context.Context) (*Consumer, error) {

// Create the topic consumer. A non-blank consumer name is required.
if m.cfg.Exclusive {
return client.NewExclusiveConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.queue)
return client.NewExclusiveConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.cfg.Earliest, m.queue)
}
return client.NewSharedConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.queue)
}
Expand Down Expand Up @@ -338,3 +339,74 @@ func (m *ManagedConsumer) manage() {
m.set(consumer)
}
}

// RedeliverUnacknowledged sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request
// for all messages that have not been acked.
func (m *ManagedConsumer) RedeliverUnacknowledged(ctx context.Context) error {
for {
m.mu.RLock()
consumer := m.consumer
wait := m.waitc
m.mu.RUnlock()

if consumer == nil {
select {
case <-wait:
// a new consumer was established.
// Re-enter read-lock to obtain it.
continue
case <-ctx.Done():
return ctx.Err()
}
}
return consumer.RedeliverUnacknowledged(ctx)
}
}

// RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request
// for all messages that were dropped because of full message buffer. Note that
// for all subscription types other than `shared`, _all_ unacknowledged messages
// will be redelivered.
// https://github.com/apache/incubator-pulsar/issues/2003
func (m *ManagedConsumer) RedeliverOverflow(ctx context.Context) (int, error) {
for {
m.mu.RLock()
consumer := m.consumer
wait := m.waitc
m.mu.RUnlock()

if consumer == nil {
select {
case <-wait:
// a new consumer was established.
// Re-enter read-lock to obtain it.
continue
case <-ctx.Done():
return -1, ctx.Err()
}
}
return consumer.RedeliverOverflow(ctx)
}
}

// Unsubscribe the consumer from its topic.
func (m *ManagedConsumer) Unsubscribe(ctx context.Context) error {
for {
m.mu.RLock()
consumer := m.consumer
wait := m.waitc
m.mu.RUnlock()

if consumer == nil {
select {
case <-wait:
// a new consumer was established.
// Re-enter read-lock to obtain it.
continue
case <-ctx.Done():
return ctx.Err()
}
}
consumer.Unsubscribe(ctx)
}
}
13 changes: 7 additions & 6 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,19 @@ type pubsub struct {

// subscribe subscribes to the given topic. The queueSize determines the buffer
// size of the Consumer.Messages() channel.
func (t *pubsub) subscribe(ctx context.Context, topic, sub string, subType api.CommandSubscribe_SubType, queue chan Message) (*Consumer, error) {
func (t *pubsub) subscribe(ctx context.Context, topic, sub string, subType api.CommandSubscribe_SubType, initialPosition api.CommandSubscribe_InitialPosition, queue chan Message) (*Consumer, error) {
requestID := t.reqID.next()
consumerID := t.consumerID.next()

cmd := api.BaseCommand{
Type: api.BaseCommand_SUBSCRIBE.Enum(),
Subscribe: &api.CommandSubscribe{
SubType: subType.Enum(),
Topic: proto.String(topic),
Subscription: proto.String(sub),
RequestId: requestID,
ConsumerId: consumerID,
SubType: subType.Enum(),
Topic: proto.String(topic),
Subscription: proto.String(sub),
RequestId: requestID,
ConsumerId: consumerID,
InitialPosition: initialPosition.Enum(),
},
}

Expand Down
4 changes: 2 additions & 2 deletions pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestPubsub_Subscribe_Success(t *testing.T) {

go func() {
var r response
r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, make(chan Message, 1))
r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, api.CommandSubscribe_Latest, make(chan Message, 1))
resp <- r
}()

Expand Down Expand Up @@ -105,7 +105,7 @@ func TestPubsub_Subscribe_Error(t *testing.T) {

go func() {
var r response
r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, make(chan Message, 1))
r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, api.CommandSubscribe_Latest, make(chan Message, 1))
resp <- r
}()

Expand Down

0 comments on commit 075cf83

Please sign in to comment.