Skip to content

Commit

Permalink
Add multi-topic and regex consumer in Go client (apache#2448)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Aug 27, 2018
1 parent 73bc19d commit 8ba8688
Show file tree
Hide file tree
Showing 6 changed files with 207 additions and 9 deletions.
10 changes: 10 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c
const pulsar_consumer_configuration_t *conf,
pulsar_subscribe_callback callback, void *ctx);

void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount,
const char *subscriptionName,
const pulsar_consumer_configuration_t *conf,
pulsar_subscribe_callback callback, void *ctx);

void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
const char *subscriptionName,
const pulsar_consumer_configuration_t *conf,
pulsar_subscribe_callback callback, void *ctx);

/**
* Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified
* topic.
Expand Down
21 changes: 21 additions & 0 deletions pulsar-client-cpp/lib/c/c_Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c
boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
}

void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount,
const char *subscriptionName,
const pulsar_consumer_configuration_t *conf,
pulsar_subscribe_callback callback, void *ctx) {
std::vector<std::string> topicsList;
for (int i = 0; i < topicsCount; i++) {
topicsList.push_back(topics[i]);
}

client->client->subscribeAsync(topicsList, subscriptionName, conf->consumerConfiguration,
boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
}

void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
const char *subscriptionName,
const pulsar_consumer_configuration_t *conf,
pulsar_subscribe_callback callback, void *ctx) {
client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, conf->consumerConfiguration,
boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
}

pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic,
const pulsar_message_id_t *startMessageId,
pulsar_reader_configuration_t *conf, pulsar_reader_t **c_reader) {
Expand Down
38 changes: 32 additions & 6 deletions pulsar-client-go/pulsar/c_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ package pulsar
import "C"

import (
"context"
"runtime"
"time"
"unsafe"
"context"
)

type consumer struct {
Expand Down Expand Up @@ -64,7 +64,7 @@ type subscribeContext struct {
}

func subscribeAsync(client *client, options ConsumerOptions, callback func(Consumer, error)) {
if options.Topic == "" {
if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" {
go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required"))
return
}
Expand Down Expand Up @@ -120,12 +120,38 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu
C.pulsar_consumer_set_consumer_name(conf, name)
}

topic := C.CString(options.Topic)
subName := C.CString(options.SubscriptionName)
defer C.free(unsafe.Pointer(topic))
defer C.free(unsafe.Pointer(subName))
C._pulsar_client_subscribe_async(client.ptr, topic, subName,
conf, savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback}))

callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback})

if options.Topic != "" {
topic := C.CString(options.Topic)
defer C.free(unsafe.Pointer(topic))
C._pulsar_client_subscribe_async(client.ptr, topic, subName, conf, callbackPtr)
} else if options.Topics != nil {
cArray := C.malloc(C.size_t(len(options.Topics)) * C.size_t(unsafe.Sizeof(uintptr(0))))

// convert the C array to a Go Array so we can index it
a := (*[1<<30 - 1]*C.char)(cArray)

for idx, topic := range options.Topics {
a[idx] = C.CString(topic)
}

C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)),
subName, conf, callbackPtr)

for idx, _ := range options.Topics {
C.free(unsafe.Pointer(a[idx]))
}

C.free(cArray)
} else if options.TopicsPattern != "" {
topicsPattern := C.CString(options.TopicsPattern)
defer C.free(unsafe.Pointer(topicsPattern))
C._pulsar_client_subscribe_pattern_async(client.ptr, topicsPattern, subName, conf, callbackPtr)
}
}

type consumerCallback struct {
Expand Down
13 changes: 13 additions & 0 deletions pulsar-client-go/pulsar/c_go_pulsar.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,19 @@ static inline void _pulsar_client_subscribe_async(pulsar_client_t *client, const
pulsar_client_subscribe_async(client, topic, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
}

static inline void _pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char ** topics,
int topicsCount, const char *subscriptionName,
const pulsar_consumer_configuration_t *conf, void *ctx) {
pulsar_client_subscribe_multi_topics_async(client, topics, topicsCount, subscriptionName, conf,
pulsarSubscribeCallbackProxy, ctx);
}

static inline void _pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern,
const char *subscriptionName,
const pulsar_consumer_configuration_t *conf, void *ctx) {
pulsar_client_subscribe_pattern_async(client, topicPattern, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
}

void pulsarMessageListenerProxy(pulsar_consumer_t *consumer, pulsar_message_t *message, void *ctx);

static inline void _pulsar_consumer_configuration_set_message_listener(
Expand Down
10 changes: 9 additions & 1 deletion pulsar-client-go/pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,17 @@ const (
// ConsumerBuilder is used to configure and create instances of Consumer
type ConsumerOptions struct {
// Specify the topic this consumer will subscribe on.
// This argument is required when subscribing
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topic string

// Specify a list of topics this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topics []string

// Specify a regular expression to subscribe to multiple topics under the same namespace.
// Either a topic, a list of topics or a topics pattern are required when subscribing
TopicsPattern string

// Specify the subscription name for this consumer
// This argument is required when subscribing
SubscriptionName string
Expand Down
124 changes: 122 additions & 2 deletions pulsar-client-go/pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package pulsar

import (
"testing"
"fmt"
"context"
"fmt"
"testing"
"time"
)

Expand Down Expand Up @@ -131,3 +131,123 @@ func TestConsumerWithInvalidConf(t *testing.T) {

assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
}


func TestConsumerMultiTopics(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})

assertNil(t, err)
defer client.Close()

producer1, err := client.CreateProducer(ProducerOptions{
Topic: "multi-topic-1",
})

assertNil(t, err)

producer2, err := client.CreateProducer(ProducerOptions{
Topic: "multi-topic-2",
})

assertNil(t, err)
defer producer1.Close()
defer producer2.Close()

consumer, err := client.Subscribe(ConsumerOptions{
Topics: []string{"multi-topic-1", "multi-topic-2"},
SubscriptionName: "my-sub",
})

assertNil(t, err)
defer consumer.Close()

assertEqual(t, consumer.Subscription(), "my-sub")

ctx := context.Background()

for i := 0; i < 10; i++ {
if err := producer1.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}

if err := producer2.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}
}

for i := 0; i < 20; i++ {
msg, err := consumer.Receive(ctx)
assertNil(t, err)
assertNotNil(t, msg)

consumer.Ack(msg)
}

consumer.Unsubscribe()
}


func TestConsumerRegex(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
})

assertNil(t, err)
defer client.Close()

producer1, err := client.CreateProducer(ProducerOptions{
Topic: "topic-1",
})

assertNil(t, err)

producer2, err := client.CreateProducer(ProducerOptions{
Topic: "topic-2",
})

assertNil(t, err)
defer producer1.Close()
defer producer2.Close()

consumer, err := client.Subscribe(ConsumerOptions{
TopicsPattern: "topic-\\d+",
SubscriptionName: "my-sub",
})

assertNil(t, err)
defer consumer.Close()

assertEqual(t, consumer.Subscription(), "my-sub")

ctx := context.Background()

for i := 0; i < 10; i++ {
if err := producer1.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}

if err := producer2.Send(ctx, ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
t.Fatal(err)
}
}

for i := 0; i < 20; i++ {
msg, err := consumer.Receive(ctx)
assertNil(t, err)
assertNotNil(t, msg)

consumer.Ack(msg)
}

consumer.Unsubscribe()
}

0 comments on commit 8ba8688

Please sign in to comment.