diff --git a/client.go b/client.go index b3e6d4c..7cf8ad7 100644 --- a/client.go +++ b/client.go @@ -222,15 +222,19 @@ func (c *Client) NewProducer(ctx context.Context, topic, producerName string) (* // See "Subscription modes" for more information: // https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl func (c *Client) NewSharedConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error) { - return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Shared, queue) + return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Shared, api.CommandSubscribe_Latest, queue) } // NewExclusiveConsumer creates a new exclusive consumer capable of reading messages from the // given topic. // See "Subscription modes" for more information: // https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl -func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error) { - return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Exclusive, queue) +func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionName string, earliest bool, queue chan Message) (*Consumer, error) { + initialPosition := api.CommandSubscribe_Latest + if earliest { + initialPosition = api.CommandSubscribe_Earliest + } + return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Exclusive, initialPosition, queue) } // NewFailoverConsumer creates a new failover consumer capable of reading messages from the @@ -238,7 +242,7 @@ func (c *Client) NewExclusiveConsumer(ctx context.Context, topic, subscriptionNa // See "Subscription modes" for more information: // https://pulsar.incubator.apache.org/docs/latest/getting-started/ConceptsAndArchitecture/#Subscriptionmodes-jdrefl func (c *Client) NewFailoverConsumer(ctx context.Context, topic, subscriptionName string, queue chan Message) (*Consumer, error) { - return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Failover, queue) + return c.pubsub.subscribe(ctx, topic, subscriptionName, api.CommandSubscribe_Failover, api.CommandSubscribe_Latest, queue) } // handleFrame is called by the underlaying conn with diff --git a/client_integration_test.go b/client_integration_test.go index bd4d4d3..c5f0813 100644 --- a/client_integration_test.go +++ b/client_integration_test.go @@ -83,7 +83,7 @@ func TestClient_Int_PubSub(t *testing.T) { subName := randString(16) for i := range consumers { name := fmt.Sprintf("%s-%d", subName, i) - consumers[i], err = c.NewExclusiveConsumer(ctx, topic, name, make(chan Message, N)) + consumers[i], err = c.NewExclusiveConsumer(ctx, topic, name, false, make(chan Message, N)) if err != nil { t.Fatal(err) } @@ -219,7 +219,7 @@ func TestClient_Int_ServerInitiatedTopicClose(t *testing.T) { t.Log(topicResp.String()) subscriptionName := randString(32) - topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, subscriptionName, make(chan Message, 1)) + topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, subscriptionName, false, make(chan Message, 1)) if err != nil { t.Fatal(err) } @@ -348,7 +348,7 @@ func TestClient_Int_Unsubscribe(t *testing.T) { } t.Log(topicResp.String()) - topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, randString(32), make(chan Message, 1)) + topicConsumer, err := c.NewExclusiveConsumer(ctx, topic, randString(32), false, make(chan Message, 1)) if err != nil { t.Fatal(err) } @@ -555,7 +555,7 @@ func TestClient_Int_RedeliverAll(t *testing.T) { } // create single consumer with buffer size N - cs, err := c.NewExclusiveConsumer(ctx, topic, randString(16), make(chan Message, N)) + cs, err := c.NewExclusiveConsumer(ctx, topic, randString(16), false, make(chan Message, N)) if err != nil { t.Fatal(err) } diff --git a/consumer.go b/consumer.go index 8c76692..cf5ab56 100644 --- a/consumer.go +++ b/consumer.go @@ -235,8 +235,8 @@ func (c *Consumer) handleReachedEndOfTopic(f frame.Frame) error { return nil } -// RedeliverUnacknowledged sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request -// for all messages that have not been acked. +// RedeliverUnacknowledged uses the protocol option +// REDELIVER_UNACKNOWLEDGED_MESSAGES to re-retrieve unacked messages. func (c *Consumer) RedeliverUnacknowledged(ctx context.Context) error { cmd := api.BaseCommand{ Type: api.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES.Enum(), diff --git a/managed_consumer.go b/managed_consumer.go index d7c9e9e..e86007e 100644 --- a/managed_consumer.go +++ b/managed_consumer.go @@ -27,6 +27,7 @@ type ManagedConsumerConfig struct { Topic string Name string // subscription name Exclusive bool // if false, subscription is shared + Earliest bool // if true, subscription cursor set to beginning QueueSize int // number of messages to buffer before dropping messages NewConsumerTimeout time.Duration // maximum duration to create Consumer, including topic lookup @@ -277,7 +278,7 @@ func (m *ManagedConsumer) newConsumer(ctx context.Context) (*Consumer, error) { // Create the topic consumer. A non-blank consumer name is required. if m.cfg.Exclusive { - return client.NewExclusiveConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.queue) + return client.NewExclusiveConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.cfg.Earliest, m.queue) } return client.NewSharedConsumer(ctx, m.cfg.Topic, m.cfg.Name, m.queue) } @@ -338,3 +339,74 @@ func (m *ManagedConsumer) manage() { m.set(consumer) } } + +// RedeliverUnacknowledged sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request +// for all messages that have not been acked. +func (m *ManagedConsumer) RedeliverUnacknowledged(ctx context.Context) error { + for { + m.mu.RLock() + consumer := m.consumer + wait := m.waitc + m.mu.RUnlock() + + if consumer == nil { + select { + case <-wait: + // a new consumer was established. + // Re-enter read-lock to obtain it. + continue + case <-ctx.Done(): + return ctx.Err() + } + } + return consumer.RedeliverUnacknowledged(ctx) + } +} + +// RedeliverOverflow sends of REDELIVER_UNACKNOWLEDGED_MESSAGES request +// for all messages that were dropped because of full message buffer. Note that +// for all subscription types other than `shared`, _all_ unacknowledged messages +// will be redelivered. +// https://github.com/apache/incubator-pulsar/issues/2003 +func (m *ManagedConsumer) RedeliverOverflow(ctx context.Context) (int, error) { + for { + m.mu.RLock() + consumer := m.consumer + wait := m.waitc + m.mu.RUnlock() + + if consumer == nil { + select { + case <-wait: + // a new consumer was established. + // Re-enter read-lock to obtain it. + continue + case <-ctx.Done(): + return -1, ctx.Err() + } + } + return consumer.RedeliverOverflow(ctx) + } +} + +// Unsubscribe the consumer from its topic. +func (m *ManagedConsumer) Unsubscribe(ctx context.Context) error { + for { + m.mu.RLock() + consumer := m.consumer + wait := m.waitc + m.mu.RUnlock() + + if consumer == nil { + select { + case <-wait: + // a new consumer was established. + // Re-enter read-lock to obtain it. + continue + case <-ctx.Done(): + return ctx.Err() + } + } + consumer.Unsubscribe(ctx) + } +} diff --git a/pubsub.go b/pubsub.go index bae466f..436f9be 100644 --- a/pubsub.go +++ b/pubsub.go @@ -46,18 +46,19 @@ type pubsub struct { // subscribe subscribes to the given topic. The queueSize determines the buffer // size of the Consumer.Messages() channel. -func (t *pubsub) subscribe(ctx context.Context, topic, sub string, subType api.CommandSubscribe_SubType, queue chan Message) (*Consumer, error) { +func (t *pubsub) subscribe(ctx context.Context, topic, sub string, subType api.CommandSubscribe_SubType, initialPosition api.CommandSubscribe_InitialPosition, queue chan Message) (*Consumer, error) { requestID := t.reqID.next() consumerID := t.consumerID.next() cmd := api.BaseCommand{ Type: api.BaseCommand_SUBSCRIBE.Enum(), Subscribe: &api.CommandSubscribe{ - SubType: subType.Enum(), - Topic: proto.String(topic), - Subscription: proto.String(sub), - RequestId: requestID, - ConsumerId: consumerID, + SubType: subType.Enum(), + Topic: proto.String(topic), + Subscription: proto.String(sub), + RequestId: requestID, + ConsumerId: consumerID, + InitialPosition: initialPosition.Enum(), }, } diff --git a/pubsub_test.go b/pubsub_test.go index 7133bcf..87ecb05 100644 --- a/pubsub_test.go +++ b/pubsub_test.go @@ -47,7 +47,7 @@ func TestPubsub_Subscribe_Success(t *testing.T) { go func() { var r response - r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, make(chan Message, 1)) + r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, api.CommandSubscribe_Latest, make(chan Message, 1)) resp <- r }() @@ -105,7 +105,7 @@ func TestPubsub_Subscribe_Error(t *testing.T) { go func() { var r response - r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, make(chan Message, 1)) + r.c, r.err = tp.subscribe(ctx, "test-topic", "test-subscription", api.CommandSubscribe_Exclusive, api.CommandSubscribe_Latest, make(chan Message, 1)) resp <- r }()