Skip to content

Commit

Permalink
Support key-shared subscribe mode for go client (apache#4465)
Browse files Browse the repository at this point in the history
* Support key-shared subscribe mode for go client

Signed-off-by: xiaolong.ran <[email protected]>

* fix comments

Signed-off-by: xiaolong.ran <[email protected]>
  • Loading branch information
wolfstudy authored and jiazhai committed Jun 6, 2019
1 parent 05ec310 commit 2c3f783
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pulsar-client-go/pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ const (
// Multiple consumer will be able to use the same subscription name but only 1 consumer will receive the messages.
// If that consumer disconnects, one of the other connected consumers will start receiving messages.
Failover

// Multiple consumer will be able to use the same subscription and all messages with the same key
// will be dispatched to only one consumer
KeyShared
)

type InitialPosition int
Expand Down
69 changes: 69 additions & 0 deletions pulsar-client-go/pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,3 +587,72 @@ func TestConsumerNegativeAcks(t *testing.T) {

consumer.Unsubscribe()
}

func TestConsumerShared(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})
assert.Nil(t, err)
defer client.Close()

topic := "persistent://public/default/test-topic-6"

consumer1, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer1.Close()

consumer2, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "sub-1",
Type: KeyShared,
})
assert.Nil(t, err)
defer consumer2.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
Batching: false,
})
assert.Nil(t, err)
defer producer.Close()

ctx := context.Background()
for i := 0; i < 10; i++ {
err := producer.Send(ctx, ProducerMessage{
Key: fmt.Sprintf("key-shared-%d", i%4),
Payload: []byte(fmt.Sprintf("value-%d", i)),
})
assert.Nil(t, err)
}

time.Sleep(time.Second * 5)

go func() {
for i := 0; i < 10; i++ {
msg, err := consumer1.Receive(ctx)
assert.Nil(t, err)
if msg != nil {
fmt.Printf("consumer1 key is: %s, value is: %s\n", msg.Key(), string(msg.Payload()))
err = consumer1.Ack(msg)
assert.Nil(t, err)
}
}
}()

go func() {
for i := 0; i < 10; i++ {
msg2, err := consumer2.Receive(ctx)
assert.Nil(t, err)
if msg2 != nil {
fmt.Printf("consumer2 key is:%s, value is: %s\n", msg2.Key(), string(msg2.Payload()))
err = consumer2.Ack(msg2)
assert.Nil(t, err)
}
}
}()
}

0 comments on commit 2c3f783

Please sign in to comment.