diff --git a/pulsar-client-cpp/include/pulsar/c/client.h b/pulsar-client-cpp/include/pulsar/c/client.h index 2da7c6de9255a..4b603bbff370b 100644 --- a/pulsar-client-cpp/include/pulsar/c/client.h +++ b/pulsar-client-cpp/include/pulsar/c/client.h @@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c const pulsar_consumer_configuration_t *conf, pulsar_subscribe_callback callback, void *ctx); +void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx); + +void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx); + /** * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified * topic. diff --git a/pulsar-client-cpp/lib/c/c_Client.cc b/pulsar-client-cpp/lib/c/c_Client.cc index 905e41067f175..ecefe20c034ad 100644 --- a/pulsar-client-cpp/lib/c/c_Client.cc +++ b/pulsar-client-cpp/lib/c/c_Client.cc @@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, const char *topic, c boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx)); } +void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char **topics, int topicsCount, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx) { + std::vector topicsList; + for (int i = 0; i < topicsCount; i++) { + topicsList.push_back(topics[i]); + } + + client->client->subscribeAsync(topicsList, subscriptionName, conf->consumerConfiguration, + boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx)); +} + +void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, + pulsar_subscribe_callback callback, void *ctx) { + client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, conf->consumerConfiguration, + boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx)); +} + pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char *topic, const pulsar_message_id_t *startMessageId, pulsar_reader_configuration_t *conf, pulsar_reader_t **c_reader) { diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go index aebabc877da5f..093dd9dc6ec88 100644 --- a/pulsar-client-go/pulsar/c_consumer.go +++ b/pulsar-client-go/pulsar/c_consumer.go @@ -25,10 +25,10 @@ package pulsar import "C" import ( + "context" "runtime" "time" "unsafe" - "context" ) type consumer struct { @@ -64,7 +64,7 @@ type subscribeContext struct { } func subscribeAsync(client *client, options ConsumerOptions, callback func(Consumer, error)) { - if options.Topic == "" { + if options.Topic == "" && options.Topics == nil && options.TopicsPattern == "" { go callback(nil, newError(C.pulsar_result_InvalidConfiguration, "topic is required")) return } @@ -120,12 +120,38 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu C.pulsar_consumer_set_consumer_name(conf, name) } - topic := C.CString(options.Topic) subName := C.CString(options.SubscriptionName) - defer C.free(unsafe.Pointer(topic)) defer C.free(unsafe.Pointer(subName)) - C._pulsar_client_subscribe_async(client.ptr, topic, subName, - conf, savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback})) + + callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: consumer, callback: callback}) + + if options.Topic != "" { + topic := C.CString(options.Topic) + defer C.free(unsafe.Pointer(topic)) + C._pulsar_client_subscribe_async(client.ptr, topic, subName, conf, callbackPtr) + } else if options.Topics != nil { + cArray := C.malloc(C.size_t(len(options.Topics)) * C.size_t(unsafe.Sizeof(uintptr(0)))) + + // convert the C array to a Go Array so we can index it + a := (*[1<<30 - 1]*C.char)(cArray) + + for idx, topic := range options.Topics { + a[idx] = C.CString(topic) + } + + C._pulsar_client_subscribe_multi_topics_async(client.ptr, (**C.char)(cArray), C.int(len(options.Topics)), + subName, conf, callbackPtr) + + for idx, _ := range options.Topics { + C.free(unsafe.Pointer(a[idx])) + } + + C.free(cArray) + } else if options.TopicsPattern != "" { + topicsPattern := C.CString(options.TopicsPattern) + defer C.free(unsafe.Pointer(topicsPattern)) + C._pulsar_client_subscribe_pattern_async(client.ptr, topicsPattern, subName, conf, callbackPtr) + } } type consumerCallback struct { diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h b/pulsar-client-go/pulsar/c_go_pulsar.h index 045a4a2a848a6..881427610e625 100644 --- a/pulsar-client-go/pulsar/c_go_pulsar.h +++ b/pulsar-client-go/pulsar/c_go_pulsar.h @@ -73,6 +73,19 @@ static inline void _pulsar_client_subscribe_async(pulsar_client_t *client, const pulsar_client_subscribe_async(client, topic, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx); } +static inline void _pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const char ** topics, + int topicsCount, const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, void *ctx) { + pulsar_client_subscribe_multi_topics_async(client, topics, topicsCount, subscriptionName, conf, + pulsarSubscribeCallbackProxy, ctx); +} + +static inline void _pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char *topicPattern, + const char *subscriptionName, + const pulsar_consumer_configuration_t *conf, void *ctx) { + pulsar_client_subscribe_pattern_async(client, topicPattern, subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx); +} + void pulsarMessageListenerProxy(pulsar_consumer_t *consumer, pulsar_message_t *message, void *ctx); static inline void _pulsar_consumer_configuration_set_message_listener( diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go index 4ce0857652a31..ed56d9e817d0e 100644 --- a/pulsar-client-go/pulsar/consumer.go +++ b/pulsar-client-go/pulsar/consumer.go @@ -49,9 +49,17 @@ const ( // ConsumerBuilder is used to configure and create instances of Consumer type ConsumerOptions struct { // Specify the topic this consumer will subscribe on. - // This argument is required when subscribing + // Either a topic, a list of topics or a topics pattern are required when subscribing Topic string + // Specify a list of topics this consumer will subscribe on. + // Either a topic, a list of topics or a topics pattern are required when subscribing + Topics []string + + // Specify a regular expression to subscribe to multiple topics under the same namespace. + // Either a topic, a list of topics or a topics pattern are required when subscribing + TopicsPattern string + // Specify the subscription name for this consumer // This argument is required when subscribing SubscriptionName string diff --git a/pulsar-client-go/pulsar/consumer_test.go b/pulsar-client-go/pulsar/consumer_test.go index 2930f19d9ce38..75a454be88199 100644 --- a/pulsar-client-go/pulsar/consumer_test.go +++ b/pulsar-client-go/pulsar/consumer_test.go @@ -20,9 +20,9 @@ package pulsar import ( - "testing" - "fmt" "context" + "fmt" + "testing" "time" ) @@ -131,3 +131,123 @@ func TestConsumerWithInvalidConf(t *testing.T) { assertEqual(t, err.(*Error).Result(), InvalidConfiguration) } + + +func TestConsumerMultiTopics(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6650", + }) + + assertNil(t, err) + defer client.Close() + + producer1, err := client.CreateProducer(ProducerOptions{ + Topic: "multi-topic-1", + }) + + assertNil(t, err) + + producer2, err := client.CreateProducer(ProducerOptions{ + Topic: "multi-topic-2", + }) + + assertNil(t, err) + defer producer1.Close() + defer producer2.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topics: []string{"multi-topic-1", "multi-topic-2"}, + SubscriptionName: "my-sub", + }) + + assertNil(t, err) + defer consumer.Close() + + assertEqual(t, consumer.Subscription(), "my-sub") + + ctx := context.Background() + + for i := 0; i < 10; i++ { + if err := producer1.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + t.Fatal(err) + } + + if err := producer2.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 20; i++ { + msg, err := consumer.Receive(ctx) + assertNil(t, err) + assertNotNil(t, msg) + + consumer.Ack(msg) + } + + consumer.Unsubscribe() +} + + +func TestConsumerRegex(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: "pulsar://localhost:6650", + }) + + assertNil(t, err) + defer client.Close() + + producer1, err := client.CreateProducer(ProducerOptions{ + Topic: "topic-1", + }) + + assertNil(t, err) + + producer2, err := client.CreateProducer(ProducerOptions{ + Topic: "topic-2", + }) + + assertNil(t, err) + defer producer1.Close() + defer producer2.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + TopicsPattern: "topic-\\d+", + SubscriptionName: "my-sub", + }) + + assertNil(t, err) + defer consumer.Close() + + assertEqual(t, consumer.Subscription(), "my-sub") + + ctx := context.Background() + + for i := 0; i < 10; i++ { + if err := producer1.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + t.Fatal(err) + } + + if err := producer2.Send(ctx, ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + t.Fatal(err) + } + } + + for i := 0; i < 20; i++ { + msg, err := consumer.Receive(ctx) + assertNil(t, err) + assertNotNil(t, msg) + + consumer.Ack(msg) + } + + consumer.Unsubscribe() +} \ No newline at end of file