Skip to content

Commit

Permalink
Negative acks implementation for Go client (apache#3817)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Mar 14, 2019
1 parent 4b3fed9 commit c83631b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 0 deletions.
15 changes: 15 additions & 0 deletions pulsar-client-go/pulsar/c_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
C.pulsar_consumer_set_unacked_messages_timeout_ms(conf, C.uint64_t(timeoutMillis))
}

if options.NackRedeliveryDelay != nil {
delayMillis := options.NackRedeliveryDelay.Nanoseconds() / int64(time.Millisecond)
C.pulsar_configure_set_negative_ack_redelivery_delay_ms(conf, C.long(delayMillis))
}

if options.Type != Exclusive {
C.pulsar_consumer_configuration_set_consumer_type(conf, C.pulsar_consumer_type(options.Type))
}
Expand Down Expand Up @@ -254,6 +259,16 @@ func (c *consumer) AckCumulativeID(msgId MessageID) error {
return nil
}

func (c *consumer) Nack(msg Message) error {
C.pulsar_consumer_negative_acknowledge(c.ptr, msg.(*message).ptr)
return nil
}

func (c *consumer) NackID(msgId MessageID) error {
C.pulsar_consumer_negative_acknowledge_id(c.ptr, msgId.(*messageID).ptr)
return nil
}

func (c *consumer) Close() error {
channel := make(chan error)
c.CloseAsync(func(err error) { channel <- err; close(channel) })
Expand Down
22 changes: 22 additions & 0 deletions pulsar-client-go/pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type ConsumerOptions struct {
// Default is 0, which means message are not being replayed based on ack time
AckTimeout time.Duration

// The delay after which to redeliver the messages that failed to be
// processed. Default is 1min. (See `Consumer.Nack()`)
NackRedeliveryDelay *time.Duration

// Select the subscription type to be used when subscribing to the topic.
// Default is `Exclusive`
Type SubscriptionType
Expand Down Expand Up @@ -161,6 +165,24 @@ type Consumer interface {
// It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and waiting for the callback to be triggered.
AckCumulativeID(MessageID) error

// Acknowledge the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
// with ConsumerOptions.NAckRedeliveryDelay .
//
// This call is not blocking.
Nack(Message) error

// Acknowledge the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
// with ConsumerOptions.NackRedeliveryDelay .
//
// This call is not blocking.
NackID(MessageID) error

// Close the consumer and stop the broker to push more messages
Close() error

Expand Down
67 changes: 67 additions & 0 deletions pulsar-client-go/pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,3 +514,70 @@ func TestConsumer_SubscriptionInitPos(t *testing.T) {

assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
}

func TestConsumerNegativeAcks(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})

assert.Nil(t, err)
defer client.Close()

topic := "TestConsumerNegativeAcks"

producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
})

assert.Nil(t, err)
defer producer.Close()

nackDelay := 100 * time.Millisecond

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
NackRedeliveryDelay: &nackDelay,
})

assert.Nil(t, err)
defer consumer.Close()

ctx := context.Background()

for i := 0; i < 10; i++ {
producer.SendAsync(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}, func(producerMessage ProducerMessage, e error) {
fmt.Print("send complete. err=", e)
})
}

producer.Flush()

for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)

assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))

// Ack with error
consumer.Nack(msg)
}

// Messages will be redelivered
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
assert.NotNil(t, msg)

assert.Equal(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i))

// This time acks successfully
consumer.Ack(msg)
}


consumer.Unsubscribe()
}

0 comments on commit c83631b

Please sign in to comment.