Skip to content

Commit

Permalink
Support for python native logging from python wrapper (apache#6113)
Browse files Browse the repository at this point in the history
Fixes apache#5620

Added support for Python native logging within the Pulsar Client Python wrapper.

* added a new Logger implementation that forwards the logging to python
* updated the python module to allow setting the logger on a `pulsar.Client` via the `ClientConfiguration` object
  • Loading branch information
BlackJohnny authored and sijie committed Jan 23, 2020
1 parent 56280ea commit 0aee35a
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def send_callback(res, msg):
client.close()
"""
import logging

import _pulsar

Expand Down Expand Up @@ -335,6 +336,7 @@ def __init__(self, service_url,
message_listener_threads=1,
concurrent_lookup_requests=50000,
log_conf_file_path=None,
logger=None,
use_tls=False,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
Expand Down Expand Up @@ -368,6 +370,8 @@ def __init__(self, service_url,
to prevent overload on the broker.
* `log_conf_file_path`:
Initialize log4cxx from a configuration file.
* `logger`:
Set a Python logger for this Pulsar client.
* `use_tls`:
Configure whether to use TLS encryption on the connection. This setting
is deprecated. TLS will be automatically enabled if the `serviceUrl` is
Expand All @@ -390,6 +394,7 @@ def __init__(self, service_url,
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
_check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
_check_type_or_none(logging.Logger, logger, 'logger')
_check_type(bool, use_tls, 'use_tls')
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
Expand All @@ -404,6 +409,8 @@ def __init__(self, service_url,
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if logger:
conf.set_logger(logger)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
Expand Down
103 changes: 103 additions & 0 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,102 @@
*/
#include "utils.h"

class LoggerWrapper: public Logger {
std::string _logger;
PyObject* _pyLogger;
int _currentPythonLogLevel = 10 + (Logger::LEVEL_INFO*10);

void _updateCurrentPythonLogLevel() {
PyGILState_STATE state = PyGILState_Ensure();

try {
_currentPythonLogLevel = py::call_method<int>(_pyLogger, "getEffectiveLevel");
} catch (py::error_already_set e) {
PyErr_Print();
}

PyGILState_Release(state);
};

public:

LoggerWrapper(const std::string &logger, PyObject* pyLogger) : _logger(logger) {
_pyLogger = pyLogger;
Py_XINCREF(_pyLogger);

_updateCurrentPythonLogLevel();
}

LoggerWrapper(const LoggerWrapper& other) {
_pyLogger = other._pyLogger;
Py_XINCREF(_pyLogger);
}

LoggerWrapper& operator=(const LoggerWrapper& other) {
_pyLogger = other._pyLogger;
Py_XINCREF(_pyLogger);
return *this;
}

virtual ~LoggerWrapper() {
Py_XDECREF(_pyLogger);
}

bool isEnabled(Level level) {
return 10 + (level*10) >= _currentPythonLogLevel;
}

void log(Level level, int line, const std::string& message) {
PyGILState_STATE state = PyGILState_Ensure();

try {
switch (level) {
case Logger::LEVEL_DEBUG:
py::call_method<void>(_pyLogger, "debug", message.c_str());
break;
case Logger::LEVEL_INFO:
py::call_method<void>(_pyLogger, "info", message.c_str());
break;
case Logger::LEVEL_WARN:
py::call_method<void>(_pyLogger, "warning", message.c_str());
break;
case Logger::LEVEL_ERROR:
py::call_method<void>(_pyLogger, "error", message.c_str());
break;
}

} catch (py::error_already_set e) {
PyErr_Print();
}

PyGILState_Release(state);
}
};

class LoggerWrapperFactory : public LoggerFactory {
static LoggerWrapperFactory* _instance;
PyObject* _pyLogger;

LoggerWrapperFactory(py::object pyLogger) {
_pyLogger = pyLogger.ptr();
Py_XINCREF(_pyLogger);
}

public:
virtual ~LoggerWrapperFactory() {
Py_XDECREF(_pyLogger);
}

Logger* getLogger(const std::string &fileName) {
return new LoggerWrapper(fileName, _pyLogger);
}

static LoggerFactoryPtr create(py::object pyLogger) {
return LoggerFactoryPtr(new LoggerWrapperFactory(pyLogger));
}
};


template<typename T>
struct ListenerWrapper {
PyObject* _pyListener;
Expand Down Expand Up @@ -74,6 +170,12 @@ static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfigur
return conf;
}

static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& conf,
py::object logger) {
conf.setLogger(LoggerWrapperFactory::create(logger));
return conf;
}

void export_config() {
using namespace boost::python;

Expand All @@ -89,6 +191,7 @@ void export_config() {
.def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>())
.def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath, return_value_policy<copy_const_reference>())
.def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>())
.def("set_logger", &ClientConfiguration_setLogger, return_self<>())
.def("use_tls", &ClientConfiguration::isUseTls)
.def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
.def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath)
Expand Down

0 comments on commit 0aee35a

Please sign in to comment.