Skip to content

Commit

Permalink
Revert "Support for python native logging from python wrapper (apache…
Browse files Browse the repository at this point in the history
…#6113)" (apache#6174)

As stated in apache#6171, change apache#6113 broke the function instance runner. This change attempts to revert apache#6113 first to make sure CI back to normal.

Additionally, it fixes and improves a bunch of integration tests. (unfortunately, we have to couple these changes to make sure it pass CI).
  • Loading branch information
sijie authored Feb 6, 2020
1 parent 4a2bceb commit 92d7102
Show file tree
Hide file tree
Showing 19 changed files with 166 additions and 162 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/ci-integration-sql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests

- name: clean docker container
if: steps.docs.outputs.changed_only == 'no'
run: docker system prune -f

- name: remove docker node image
if: steps.docs.outputs.changed_only == 'no'
run: docker rmi -f node:10 && docker rmi -f node:12 && docker rmi -f buildpack-deps:stretch

- name: remove docker builder and microsoft image
if: steps.docs.outputs.changed_only == 'no'
run: docker rmi -f jekyll/builder:latest && docker rmi -f mcr.microsoft.com/azure-pipelines/node8-typescript:latest

- name: run integration tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DredirectTestOutputToFile=false
15 changes: 15 additions & 0 deletions .github/workflows/ci-unit-adaptors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,18 @@ jobs:
- name: run unit tests pulsar storm tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -pl tests/pulsar-storm-test

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
15 changes: 15 additions & 0 deletions .github/workflows/ci-unit-broker-sasl.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,18 @@ jobs:
- name: run unit tests pulsar auth sasl
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false -pl pulsar-broker-auth-sasl

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
14 changes: 14 additions & 0 deletions .github/workflows/ci-unit-broker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,17 @@ jobs:
- name: run unit test pulsar-broker
if: steps.docs.outputs.changed_only == 'no'
run: mvn test '-Dtest=!PersistentTransactionBufferTest,!PulsarFunctionE2ESecurityTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegrationTest,!ReplicatorRateLimiterTest' -DfailIfNoTests=false -pl pulsar-broker

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
18 changes: 13 additions & 5 deletions .github/workflows/ci-unit-flaky.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,33 @@ jobs:
with:
maven-version: 3.6.1

- name: build the project
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B clean install -DskipTests

- name: run PersistentTransactionBuffer test
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -am -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=PersistentTransactionBufferTest -DfailIfNoTests=false
run: mvn -B -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=PersistentTransactionBufferTest -DfailIfNoTests=false -fn

- name: run ServerCnx test
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -am -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=ServerCnxTest -DfailIfNoTests=false
run: mvn -B -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=ServerCnxTest -DfailIfNoTests=false -fn

- name: run MessagePublishThrottling test
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -am -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=MessagePublishThrottlingTest -DfailIfNoTests=false
run: mvn -B -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=MessagePublishThrottlingTest -DfailIfNoTests=false -fn

- name: run PrimitiveSchema test
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -am -pl pulsar-client -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=PrimitiveSchemaTest -DfailIfNoTests=false
run: mvn -B -pl pulsar-client -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=PrimitiveSchemaTest -DfailIfNoTests=false -fn

- name: run BlobStoreManagedLedgerOffloaderTest
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -pl tiered-storage/jcloud -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=BlobStoreManagedLedgerOffloaderTest -DfailIfNoTests=false -fn

- name: run unit tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn -B -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install '-Dtest=PulsarFunctionE2ESecurityTest,AdminApiOffloadTest,AdminApiSchemaValidationEnforced,V1_AdminApiTest2,PulsarFunctionE2ETest,MessageIdSerialization,AdminApiTest2,PulsarFunctionLocalRunTest,PartitionedProducerConsumerTest,KafkaProducerSimpleConsumerTest,ProxyTest' -DfailIfNoTests=false
run: mvn -B -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR test '-Dtest=PulsarFunctionE2ESecurityTest,AdminApiOffloadTest,AdminApiSchemaValidationEnforced,V1_AdminApiTest2,PulsarFunctionE2ETest,MessageIdSerialization,AdminApiTest2,PulsarFunctionLocalRunTest,PartitionedProducerConsumerTest,KafkaProducerSimpleConsumerTest,ProxyTest' -DfailIfNoTests=false -fn

- name: package surefire artifacts
if: failure()
Expand Down
19 changes: 17 additions & 2 deletions .github/workflows/ci-unit-proxy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ jobs:
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy

- name: run unit tests pulsar proxy
- name: run proxy lookup throttling test
if: steps.docs.outputs.changed_only == 'no'
run: mvn test -DfailIfNoTests=false '-Dtest=ProxyTest,ProxyLookupThrottlingTest' -pl pulsar-proxy
run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
with:
name: surefire-artifacts
path: artifacts.zip
3 changes: 2 additions & 1 deletion .github/workflows/ci-unit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:

- name: run unit tests
if: steps.docs.outputs.changed_only == 'no'
run: mvn clean install -DfailIfNoTests=false '-Dtest=!KafkaProducerSimpleConsumerTest,!PrimitiveSchemaTest' -pl '!pulsar-broker,!pulsar-proxy,!pulsar-broker-auth-sasl,!pulsar-io/kafka-connect-adaptor,!tests/pulsar-storm-test'
run: mvn install -DfailIfNoTests=false '-Dtest=!KafkaProducerSimpleConsumerTest,!PrimitiveSchemaTest,!BlobStoreManagedLedgerOffloaderTest' -pl '!pulsar-broker,!pulsar-proxy,!pulsar-broker-auth-sasl,!pulsar-io/kafka-connect-adaptor,!tests/pulsar-storm-test'

- name: package surefire artifacts
if: failure()
Expand All @@ -67,6 +67,7 @@ jobs:
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts
- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception {
channel.writeInbound(clientCommand);

Object response = getResponse();
assertTrue(response instanceof CommandError);
assertTrue(response instanceof CommandError, "Response is not CommandError but " + response);
CommandError error = (CommandError) response;
assertEquals(error.getError(), ServerError.ServiceNotReady);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation(
@Test
public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws PulsarClientException, PulsarAdminException {
conf.setAllowAutoTopicCreation(false);
final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation";
final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation-" + System.currentTimeMillis();
admin.topics().createPartitionedTopic(topic, 3);
MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe();
Assert.assertNotNull(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception {
msg = subscriber1.receive(5, TimeUnit.SECONDS);

// Verify: as active-subscriber2 has not consumed messages: EntryCache must have those entries in cache
retryStrategically((test) -> entryCache.getSize() > 0, 10, 100);
assertTrue(entryCache.getSize() != 0);

// 3.b Close subscriber2: which will trigger cache to clear the cache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -343,9 +344,10 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
versionField.set(cnx, 3);

// (1) send non-batch message: consumer should be able to consume
MessageId lastNonBatchedMessageId = null;
for (int i = 0; i < numMessagesPerBatch; i++) {
String message = "my-message-" + i;
producer.send(message.getBytes());
lastNonBatchedMessageId = producer.send(message.getBytes());
}
Set<String> messageSet = Sets.newHashSet();
Message<byte[]> msg = null;
Expand All @@ -362,7 +364,7 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
consumer1.setClientCnx(null);
// (2) send batch-message which should not be able to consume: as broker will disconnect the consumer
for (int i = 0; i < numMessagesPerBatch; i++) {
String message = "my-message-" + i;
String message = "my-batch-message-" + i;
batchProducer.sendAsync(message.getBytes());
}
batchProducer.flush();
Expand All @@ -378,13 +380,14 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws
.subscriptionName(subscriptionName)
.subscriptionType(subType)
.subscribe();
consumer2.seek(lastNonBatchedMessageId);

messageSet.clear();
for (int i = 0; i < numMessagesPerBatch; i++) {
msg = consumer2.receive(1, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
String expectedMessage = "my-batch-message-" + i;
testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
consumer2.acknowledge(msg);
}
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) {

for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
consumer++) {
LOG_DEBUG("Unsubcribing Consumer - " << consumer->first);
(consumer->second)
->unsubscribeAsync(std::bind(&MultiTopicsConsumerImpl::handleUnsubscribedAsync,
shared_from_this(), std::placeholders::_1, consumerUnsubed,
Expand Down
7 changes: 0 additions & 7 deletions pulsar-client-cpp/python/pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ def send_callback(res, msg):
client.close()
"""
import logging

import _pulsar

Expand Down Expand Up @@ -336,7 +335,6 @@ def __init__(self, service_url,
message_listener_threads=1,
concurrent_lookup_requests=50000,
log_conf_file_path=None,
logger=None,
use_tls=False,
tls_trust_certs_file_path=None,
tls_allow_insecure_connection=False,
Expand Down Expand Up @@ -370,8 +368,6 @@ def __init__(self, service_url,
to prevent overload on the broker.
* `log_conf_file_path`:
Initialize log4cxx from a configuration file.
* `logger`:
Set a Python logger for this Pulsar client.
* `use_tls`:
Configure whether to use TLS encryption on the connection. This setting
is deprecated. TLS will be automatically enabled if the `serviceUrl` is
Expand All @@ -394,7 +390,6 @@ def __init__(self, service_url,
_check_type(int, message_listener_threads, 'message_listener_threads')
_check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests')
_check_type_or_none(str, log_conf_file_path, 'log_conf_file_path')
_check_type_or_none(logging.Logger, logger, 'logger')
_check_type(bool, use_tls, 'use_tls')
_check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path')
_check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection')
Expand All @@ -409,8 +404,6 @@ def __init__(self, service_url,
conf.concurrent_lookup_requests(concurrent_lookup_requests)
if log_conf_file_path:
conf.log_conf_file_path(log_conf_file_path)
if logger:
conf.set_logger(logger)
if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'):
conf.use_tls(True)
if tls_trust_certs_file_path:
Expand Down
103 changes: 0 additions & 103 deletions pulsar-client-cpp/python/src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,102 +18,6 @@
*/
#include "utils.h"

class LoggerWrapper: public Logger {
std::string _logger;
PyObject* _pyLogger;
int _currentPythonLogLevel = 10 + (Logger::LEVEL_INFO*10);

void _updateCurrentPythonLogLevel() {
PyGILState_STATE state = PyGILState_Ensure();

try {
_currentPythonLogLevel = py::call_method<int>(_pyLogger, "getEffectiveLevel");
} catch (py::error_already_set e) {
PyErr_Print();
}

PyGILState_Release(state);
};

public:

LoggerWrapper(const std::string &logger, PyObject* pyLogger) : _logger(logger) {
_pyLogger = pyLogger;
Py_XINCREF(_pyLogger);

_updateCurrentPythonLogLevel();
}

LoggerWrapper(const LoggerWrapper& other) {
_pyLogger = other._pyLogger;
Py_XINCREF(_pyLogger);
}

LoggerWrapper& operator=(const LoggerWrapper& other) {
_pyLogger = other._pyLogger;
Py_XINCREF(_pyLogger);
return *this;
}

virtual ~LoggerWrapper() {
Py_XDECREF(_pyLogger);
}

bool isEnabled(Level level) {
return 10 + (level*10) >= _currentPythonLogLevel;
}

void log(Level level, int line, const std::string& message) {
PyGILState_STATE state = PyGILState_Ensure();

try {
switch (level) {
case Logger::LEVEL_DEBUG:
py::call_method<void>(_pyLogger, "debug", message.c_str());
break;
case Logger::LEVEL_INFO:
py::call_method<void>(_pyLogger, "info", message.c_str());
break;
case Logger::LEVEL_WARN:
py::call_method<void>(_pyLogger, "warning", message.c_str());
break;
case Logger::LEVEL_ERROR:
py::call_method<void>(_pyLogger, "error", message.c_str());
break;
}

} catch (py::error_already_set e) {
PyErr_Print();
}

PyGILState_Release(state);
}
};

class LoggerWrapperFactory : public LoggerFactory {
static LoggerWrapperFactory* _instance;
PyObject* _pyLogger;

LoggerWrapperFactory(py::object pyLogger) {
_pyLogger = pyLogger.ptr();
Py_XINCREF(_pyLogger);
}

public:
virtual ~LoggerWrapperFactory() {
Py_XDECREF(_pyLogger);
}

Logger* getLogger(const std::string &fileName) {
return new LoggerWrapper(fileName, _pyLogger);
}

static LoggerFactoryPtr create(py::object pyLogger) {
return LoggerFactoryPtr(new LoggerWrapperFactory(pyLogger));
}
};


template<typename T>
struct ListenerWrapper {
PyObject* _pyListener;
Expand Down Expand Up @@ -170,12 +74,6 @@ static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfigur
return conf;
}

static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& conf,
py::object logger) {
conf.setLogger(LoggerWrapperFactory::create(logger));
return conf;
}

void export_config() {
using namespace boost::python;

Expand All @@ -191,7 +89,6 @@ void export_config() {
.def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>())
.def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath, return_value_policy<copy_const_reference>())
.def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>())
.def("set_logger", &ClientConfiguration_setLogger, return_self<>())
.def("use_tls", &ClientConfiguration::isUseTls)
.def("use_tls", &ClientConfiguration::setUseTls, return_self<>())
.def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath)
Expand Down
Loading

0 comments on commit 92d7102

Please sign in to comment.