Skip to content

Commit

Permalink
[go-client] support consumer seek (apache#3478)
Browse files Browse the repository at this point in the history
* [go-client] support consumer seek

Signed-off-by: xiaolong.ran <[email protected]>

* add unit test for consumer seek

Signed-off-by: xiaolong.ran <[email protected]>
  • Loading branch information
wolfstudy authored and merlimat committed Jan 31, 2019
1 parent 2f1aae1 commit 9917784
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 6 deletions.
24 changes: 24 additions & 0 deletions pulsar-client-go/pulsar/c_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,3 +278,27 @@ func pulsarConsumerCloseCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
func (c *consumer) RedeliverUnackedMessages() {
C.pulsar_consumer_redeliver_unacknowledged_messages(c.ptr)
}

func (c *consumer) Seek(msgID MessageID) error {
channel := make(chan error)
c.SeekAsync(msgID, func(err error) {
channel <- err
close(channel)
})
return <-channel
}

func (c *consumer) SeekAsync(msgID MessageID, callback func(error)) {
C._pulsar_consumer_seek_async(c.ptr, msgID.(*messageID).ptr, savePointer(callback))
}

//export pulsarConsumerSeekCallbackProxy
func pulsarConsumerSeekCallbackProxy(res C.pulsar_result, ctx unsafe.Pointer) {
callback := restorePointer(ctx).(func(err error))

if res != C.pulsar_result_Ok {
go callback(newError(res, "Failed to seek Consumer"))
} else {
go callback(nil)
}
}
6 changes: 6 additions & 0 deletions pulsar-client-go/pulsar/c_go_pulsar.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ static inline void _pulsar_consumer_close_async(pulsar_consumer_t *consumer, voi
pulsar_consumer_close_async(consumer, pulsarConsumerCloseCallbackProxy, ctx);
}

void pulsarConsumerSeekCallbackProxy(pulsar_result result, void *ctx);

static inline void _pulsar_consumer_seek_async(pulsar_consumer_t *consumer, pulsar_message_id_t *messageId,void *ctx) {
pulsar_consumer_seek_async(consumer, messageId,pulsarConsumerSeekCallbackProxy, ctx);
}

//// Reader callbacks

void pulsarCreateReaderCallbackProxy(pulsar_result result, pulsar_reader_t *reader, void *ctx);
Expand Down
7 changes: 7 additions & 0 deletions pulsar-client-go/pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ type Consumer interface {
// Close the consumer and stop the broker to push more messages
Close() error

// Reset the subscription associated with this consumer to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(msgID MessageID) error

// Redelivers all the unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
// active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
// the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
Expand Down
75 changes: 69 additions & 6 deletions pulsar-client-go/pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ func TestConsumer(t *testing.T) {
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-sub",
AckTimeout: 1 * time.Minute,
Name: "my-consumer-name",
ReceiverQueueSize: 100,
Topic: "my-topic",
SubscriptionName: "my-sub",
AckTimeout: 1 * time.Minute,
Name: "my-consumer-name",
ReceiverQueueSize: 100,
MaxTotalReceiverQueueSizeAcrossPartitions: 10000,
Type: Shared,
Type: Shared,
})

assert.Nil(t, err)
Expand Down Expand Up @@ -101,6 +101,9 @@ func TestConsumer(t *testing.T) {
consumer.Ack(msg)
}

err = consumer.Seek(EarliestMessage)
assert.Nil(t, err)

consumer.Unsubscribe()
}

Expand Down Expand Up @@ -395,3 +398,63 @@ func TestConsumerRegex(t *testing.T) {

consumer.Unsubscribe()
}

func TestConsumer_Seek(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})

assert.Nil(t, err)
defer client.Close()

topicName := "persistent://public/default/testSeek"
subName := "sub-testSeek"

producer, err := client.CreateProducer(ProducerOptions{
Topic: topicName,
})
assert.Nil(t, err)
assert.Equal(t, producer.Topic(), topicName)
defer producer.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topic: topicName,
SubscriptionName: subName,
})
assert.Nil(t, err)
assert.Equal(t, consumer.Topic(), topicName)
assert.Equal(t, consumer.Subscription(), subName)
defer consumer.Close()

ctx := context.Background()

// Send 10 messages synchronously
t.Log("Publishing 10 messages synchronously")
for msgNum := 0; msgNum < 10; msgNum++ {
if err := producer.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
}); err != nil {
t.Fatal(err)
}
}

t.Log("Trying to receive 10 messages")
for msgNum := 0; msgNum < 10; msgNum++ {
_, err := consumer.Receive(ctx)
assert.Nil(t, err)
}

// seek to earliest, expected receive first message.
err = consumer.Seek(EarliestMessage)
assert.Nil(t, err)

// Sleeping for 500ms to wait for consumer re-connect
time.Sleep(500 * time.Millisecond)

msg, err := consumer.Receive(ctx)
assert.Nil(t, err)
t.Logf("again received message:%+v", msg.ID())
assert.Equal(t,string(msg.Payload()),"msg-content-0")

consumer.Unsubscribe()
}

0 comments on commit 9917784

Please sign in to comment.