Skip to content

Commit

Permalink
Pulsar Go client library (apache#1764)
Browse files Browse the repository at this point in the history
* WIP - Pulsar Go client library

* Refactored configurations to use struct instead of builder

* Simplified methods terminology

* Fixed typos

* Refactored message builder

* More refactoring from comments

* More polishing and using context

* Implemented message properties

* Use standard include and link paths

* Removed dependencies on library for saving pointers

* Remoed all Async methods (except producer.SendAsync() from public API

* MessageBuilder -> ProducerMessage

* Allow to configure custom loggers from Go

* Cleaned up logger

* Use fully qualified package name in example

* Added Unit tests
  • Loading branch information
merlimat authored Jun 4, 2018
1 parent 44cbed3 commit 9089692
Show file tree
Hide file tree
Showing 36 changed files with 2,818 additions and 30 deletions.
2 changes: 2 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -953,9 +953,11 @@ flexible messaging model and an intuitive client API.</description>
<exclude>site/img/**</exclude>
<exclude>generated-site/**</exclude>
<exclude>.github/*.md</exclude>
<exclude>**/.idea/*</exclude>
</excludes>
<mapping>
<proto>JAVADOC_STYLE</proto>
<go>DOUBLESLASH_STYLE</go>
<conf>SCRIPT_STYLE</conf>
<ini>SCRIPT_STYLE</ini>
<yaml>SCRIPT_STYLE</yaml>
Expand Down
6 changes: 4 additions & 2 deletions pulsar-client-cpp/include/pulsar/c/client_configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ extern "C" {

typedef enum { pulsar_DEBUG = 0, pulsar_INFO = 1, pulsar_WARN = 2, pulsar_ERROR = 3 } pulsar_logger_level_t;

typedef void (*pulsar_logger)(pulsar_logger_level_t level, const char *file, int line, const char *message);
typedef void (*pulsar_logger)(pulsar_logger_level_t level, const char *file, int line, const char *message,
void *ctx);

typedef struct _pulsar_client_configuration pulsar_client_configuration_t;
typedef struct _pulsar_authentication pulsar_authentication_t;
Expand Down Expand Up @@ -105,7 +106,8 @@ void pulsar_client_configuration_set_concurrent_lookup_request(pulsar_client_con
*/
int pulsar_client_configuration_get_concurrent_lookup_request(pulsar_client_configuration_t *conf);

void pulsar_client_configuration_logger(pulsar_client_configuration_t *conf, pulsar_logger logger);
void pulsar_client_configuration_set_logger(pulsar_client_configuration_t *conf, pulsar_logger logger,
void *ctx);

void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf, int useTls);

Expand Down
4 changes: 3 additions & 1 deletion pulsar-client-cpp/include/pulsar/c/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ extern "C" {
#include <stddef.h>
#include <stdint.h>

#include "string_map.h"

#pragma GCC visibility push(default)

typedef struct _pulsar_message pulsar_message_t;
Expand Down Expand Up @@ -102,7 +104,7 @@ void pulsar_message_disable_replication(pulsar_message_t *message, int flag);
*
* @return an unmodifiable view of the properties map
*/
// const StringMap& getProperties() const;
pulsar_string_map_t *pulsar_message_get_properties(pulsar_message_t *message);

/**
* Check whether the message has a specific property attached.
Expand Down
46 changes: 46 additions & 0 deletions pulsar-client-cpp/include/pulsar/c/string_map.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#ifdef __cplusplus
extern "C" {
#endif

#pragma GCC visibility push(default)

typedef struct _pulsar_string_map pulsar_string_map_t;

pulsar_string_map_t *pulsar_string_map_create();
void pulsar_string_map_free(pulsar_string_map_t *map);

int pulsar_string_map_size(pulsar_string_map_t *map);

void pulsar_string_map_put(pulsar_string_map_t *map, const char *key, const char *value);

const char *pulsar_string_map_get(pulsar_string_map_t *map, const char *key);

const char *pulsar_string_map_get_key(pulsar_string_map_t *map, int idx);
const char *pulsar_string_map_get_value(pulsar_string_map_t *map, int idx);

#pragma GCC visibility pop

#ifdef __cplusplus
}
#endif
4 changes: 2 additions & 2 deletions pulsar-client-cpp/lib/ClientConfigurationImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ struct ClientConfigurationImpl {
logConfFilePath(),
useTls(false),
tlsAllowInsecureConnection(true),
statsIntervalInSeconds(600) { // 10 minutes
}
statsIntervalInSeconds(600), // 10 minutes
loggerFactory() {}
};
} // namespace pulsar

Expand Down
9 changes: 8 additions & 1 deletion pulsar-client-cpp/lib/LogUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace pulsar {

void LogUtils::init(const std::string &logfilePath) {
void LogUtils::init(const std::string& logfilePath) {
// If this is called explicitely, we fallback to Log4cxx config, if enabled

#ifdef USE_LOG4CXX
Expand All @@ -48,4 +48,11 @@ LoggerFactoryPtr LogUtils::getLoggerFactory() {
return s_loggerFactory;
}

std::string LogUtils::getLoggerName(const std::string& path) {
// Remove all directories from filename
int startIdx = path.find_last_of("/");
int endIdx = path.find_last_of(".");
return path.substr(startIdx + 1, endIdx - startIdx - 1);
}

} // namespace pulsar
21 changes: 12 additions & 9 deletions pulsar-client-cpp/lib/LogUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ namespace pulsar {

#define PULSAR_UNLIKELY(expr) __builtin_expect(expr, 0)

#define DECLARE_LOG_OBJECT() \
static pulsar::Logger* logger() { \
static boost::thread_specific_ptr<pulsar::Logger> threadSpecificLogPtr; \
pulsar::Logger* ptr = threadSpecificLogPtr.get(); \
if (PULSAR_UNLIKELY(!ptr)) { \
threadSpecificLogPtr.reset(pulsar::LogUtils::getLoggerFactory()->getLogger(__FILE__)); \
ptr = threadSpecificLogPtr.get(); \
} \
return ptr; \
#define DECLARE_LOG_OBJECT() \
static pulsar::Logger* logger() { \
static boost::thread_specific_ptr<pulsar::Logger> threadSpecificLogPtr; \
pulsar::Logger* ptr = threadSpecificLogPtr.get(); \
if (PULSAR_UNLIKELY(!ptr)) { \
std::string logger = pulsar::LogUtils::getLoggerName(__FILE__); \
threadSpecificLogPtr.reset(pulsar::LogUtils::getLoggerFactory()->getLogger(logger)); \
ptr = threadSpecificLogPtr.get(); \
} \
return ptr; \
}

#define LOG_DEBUG(message) \
Expand Down Expand Up @@ -85,6 +86,8 @@ class LogUtils {
static void setLoggerFactory(LoggerFactoryPtr loggerFactory);

static LoggerFactoryPtr getLoggerFactory();

static std::string getLoggerName(const std::string& path);
};

#pragma GCC visibility pop
Expand Down
8 changes: 1 addition & 7 deletions pulsar-client-cpp/lib/SimpleLoggerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,7 @@ class SimpleLogger : public Logger {
}
};

Logger *SimpleLoggerFactory::getLogger(const std::string &path) {
// Remove all directories from filename
int startIdx = path.find_last_of("/");
int endIdx = path.find_last_of(".");
std::string fileName = path.substr(startIdx + 1, endIdx - startIdx - 1);
return new SimpleLogger(fileName);
}
Logger *SimpleLoggerFactory::getLogger(const std::string &file) { return new SimpleLogger(file); }

LoggerFactoryPtr SimpleLoggerFactory::create() { return LoggerFactoryPtr(new SimpleLoggerFactory); }
} // namespace pulsar
60 changes: 60 additions & 0 deletions pulsar-client-cpp/lib/c/cStringMap.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <pulsar/c/string_map.h>

#include "c_structs.h"

pulsar_string_map_t *pulsar_string_map_create() { return new pulsar_string_map_t; }

void pulsar_string_map_free(pulsar_string_map_t *map) { delete map; }

int pulsar_string_map_size(pulsar_string_map_t *map) { return map->map.size(); }

void pulsar_string_map_put(pulsar_string_map_t *map, const char *key, const char *value) {
map->map[key] = value;
}

const char *pulsar_string_map_get(pulsar_string_map_t *map, const char *key) {
std::map<std::string, std::string>::iterator it = map->map.find(key);

if (it == map->map.end()) {
return NULL;
} else {
return it->second.c_str();
}
}

const char *pulsar_string_map_get_key(pulsar_string_map_t *map, int idx) {
std::map<std::string, std::string>::iterator it = map->map.begin();
while (idx-- > 0) {
++it;
}

return it->first.c_str();
}

const char *pulsar_string_map_get_value(pulsar_string_map_t *map, int idx) {
std::map<std::string, std::string>::iterator it = map->map.begin();
while (idx-- > 0) {
++it;
}

return it->second.c_str();
}
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/c/c_Client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
pulsar_client_t *pulsar_client_create(const char *serviceUrl,
const pulsar_client_configuration_t *clientConfiguration) {
pulsar_client_t *c_client = new pulsar_client_t;
c_client->client.reset(new pulsar::Client(std::string(serviceUrl)));
c_client->client.reset(new pulsar::Client(std::string(serviceUrl), clientConfiguration->conf));
return c_client;
}

Expand Down
18 changes: 12 additions & 6 deletions pulsar-client-cpp/lib/c/c_ClientConfiguration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,28 +72,34 @@ int pulsar_client_configuration_get_concurrent_lookup_request(pulsar_client_conf
class PulsarCLogger : public pulsar::Logger {
std::string file_;
pulsar_logger logger_;
void *ctx_;

public:
PulsarCLogger(const std::string &file, pulsar_logger logger) : file_(file), logger_(logger) {}
PulsarCLogger(const std::string &file, pulsar_logger logger, void *ctx)
: file_(file), logger_(logger), ctx_(ctx) {}

bool isEnabled(Level level) { return level >= pulsar::Logger::INFO; }

void log(Level level, int line, const std::string &message) {
logger_((pulsar_logger_level_t)level, file_.c_str(), line, message.c_str());
logger_((pulsar_logger_level_t)level, file_.c_str(), line, message.c_str(), ctx_);
}
};

class PulsarCLoggerFactory : public pulsar::LoggerFactory {
pulsar_logger logger_;
void *ctx_;

public:
PulsarCLoggerFactory(pulsar_logger logger) : logger_(logger) {}
PulsarCLoggerFactory(pulsar_logger logger, void *ctx) : logger_(logger), ctx_(ctx) {}

pulsar::Logger *getLogger(const std::string &fileName) { return new PulsarCLogger(fileName, logger_); }
pulsar::Logger *getLogger(const std::string &fileName) {
return new PulsarCLogger(fileName, logger_, ctx_);
}
};

void pulsar_client_configuration_set_logger(pulsar_client_configuration_t *conf, pulsar_logger logger) {
conf->conf.setLogger(pulsar::LoggerFactoryPtr(new PulsarCLoggerFactory(logger)));
void pulsar_client_configuration_set_logger(pulsar_client_configuration_t *conf, pulsar_logger logger,
void *ctx) {
conf->conf.setLogger(pulsar::LoggerFactoryPtr(new PulsarCLoggerFactory(logger, ctx)));
}

void pulsar_client_configuration_set_use_tls(pulsar_client_configuration_t *conf, int useTls) {
Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-cpp/lib/c/c_Message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,9 @@ uint64_t pulsar_message_get_publish_timestamp(pulsar_message_t *message) {
uint64_t pulsar_message_get_event_timestamp(pulsar_message_t *message) {
return message->message.getEventTimestamp();
}

pulsar_string_map_t *pulsar_message_get_properties(pulsar_message_t *message) {
pulsar_string_map_t *map = pulsar_string_map_create();
map->map = message->message.getProperties();
return map;
}
6 changes: 5 additions & 1 deletion pulsar-client-cpp/lib/c/c_structs.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ static void handle_result_callback(pulsar::Result result, pulsar_result_callback
if (callback) {
callback((pulsar_result)result, ctx);
}
}
}

struct _pulsar_string_map {
std::map<std::string, std::string> map;
};
60 changes: 60 additions & 0 deletions pulsar-client-go/examples/consumer-listener/consumer-listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

package main

import (
"github.com/apache/incubator-pulsar/pulsar-client-go/pulsar"
"fmt"
"log"
)

func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
log.Fatal(err)
}

defer client.Close()

channel := make(chan pulsar.ConsumerMessage)

consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "my-subscription",
Type: pulsar.Shared,
MessageChannel: channel,
})
if err != nil {
log.Fatal(err)
}

defer consumer.Close()

// Receive messages from channel. The channel returns a struct which contains message and the consumer from where
// the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
// shared across multiple consumers as well
for cm := range channel {
msg := cm.Message
fmt.Printf("Received message msgId: %s -- content: '%s'\n",
msg.ID(), string(msg.Payload()))

consumer.Ack(msg)
}
}
Loading

0 comments on commit 9089692

Please sign in to comment.