From 92d71023ca2e5a5eb58f5954a5506ab5d4ccd22c Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 6 Feb 2020 15:27:36 -0800 Subject: [PATCH] Revert "Support for python native logging from python wrapper (#6113)" (#6174) As stated in #6171, change #6113 broke the function instance runner. This change attempts to revert #6113 first to make sure CI back to normal. Additionally, it fixes and improves a bunch of integration tests. (unfortunately, we have to couple these changes to make sure it pass CI). --- .github/workflows/ci-integration-sql.yaml | 12 ++ .github/workflows/ci-unit-adaptors.yml | 15 +++ .github/workflows/ci-unit-broker-sasl.yml | 15 +++ .github/workflows/ci-unit-broker.yml | 14 +++ .github/workflows/ci-unit-flaky.yaml | 18 ++- .github/workflows/ci-unit-proxy.yaml | 19 +++- .github/workflows/ci-unit.yaml | 3 +- .../pulsar/broker/service/ServerCnxTest.java | 2 +- .../client/api/PartitionCreationTest.java | 2 +- .../api/v1/V1_ProducerConsumerTest.java | 1 + .../impl/BrokerClientIntegrationTest.java | 9 +- .../lib/MultiTopicsConsumerImpl.cc | 1 - pulsar-client-cpp/python/pulsar/__init__.py | 7 -- pulsar-client-cpp/python/src/config.cc | 103 ------------------ pulsar-client-cpp/tests/BasicEndToEndTest.cc | 7 +- .../api/v2/FunctionApiV2ResourceTest.java | 24 ++-- .../api/v3/FunctionApiV3ResourceTest.java | 24 ++-- .../apache/pulsar/proxy/server/ProxyTest.java | 49 +++++---- .../integration/cli/FunctionsCLITest.java | 3 +- 19 files changed, 166 insertions(+), 162 deletions(-) diff --git a/.github/workflows/ci-integration-sql.yaml b/.github/workflows/ci-integration-sql.yaml index 9fe12b68c8fa3..8965dcd1f8dc1 100644 --- a/.github/workflows/ci-integration-sql.yaml +++ b/.github/workflows/ci-integration-sql.yaml @@ -56,6 +56,18 @@ jobs: if: steps.docs.outputs.changed_only == 'no' run: mvn -B install -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR -Pdocker -DskipTests + - name: clean docker container + if: steps.docs.outputs.changed_only == 'no' + run: docker system prune -f + + - name: remove docker node image + if: steps.docs.outputs.changed_only == 'no' + run: docker rmi -f node:10 && docker rmi -f node:12 && docker rmi -f buildpack-deps:stretch + + - name: remove docker builder and microsoft image + if: steps.docs.outputs.changed_only == 'no' + run: docker rmi -f jekyll/builder:latest && docker rmi -f mcr.microsoft.com/azure-pipelines/node8-typescript:latest + - name: run integration tests if: steps.docs.outputs.changed_only == 'no' run: mvn -B -f tests/pom.xml test -DintegrationTestSuiteFile=pulsar-sql.xml -DintegrationTests -DredirectTestOutputToFile=false diff --git a/.github/workflows/ci-unit-adaptors.yml b/.github/workflows/ci-unit-adaptors.yml index b43d6a1784390..83ebcd218bd8e 100644 --- a/.github/workflows/ci-unit-adaptors.yml +++ b/.github/workflows/ci-unit-adaptors.yml @@ -63,3 +63,18 @@ jobs: - name: run unit tests pulsar storm tests if: steps.docs.outputs.changed_only == 'no' run: mvn test -pl tests/pulsar-storm-test + + - name: package surefire artifacts + if: failure() + run: | + rm -rf artifacts + mkdir artifacts + find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \; + zip -r artifacts.zip artifacts + + - uses: actions/upload-artifact@master + name: upload surefire-artifacts + if: failure() + with: + name: surefire-artifacts + path: artifacts.zip diff --git a/.github/workflows/ci-unit-broker-sasl.yml b/.github/workflows/ci-unit-broker-sasl.yml index a2aaf21372774..57aceb9c8bf37 100644 --- a/.github/workflows/ci-unit-broker-sasl.yml +++ b/.github/workflows/ci-unit-broker-sasl.yml @@ -59,3 +59,18 @@ jobs: - name: run unit tests pulsar auth sasl if: steps.docs.outputs.changed_only == 'no' run: mvn test -DfailIfNoTests=false -pl pulsar-broker-auth-sasl + + - name: package surefire artifacts + if: failure() + run: | + rm -rf artifacts + mkdir artifacts + find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \; + zip -r artifacts.zip artifacts + + - uses: actions/upload-artifact@master + name: upload surefire-artifacts + if: failure() + with: + name: surefire-artifacts + path: artifacts.zip diff --git a/.github/workflows/ci-unit-broker.yml b/.github/workflows/ci-unit-broker.yml index 5aefa3d9ac7d0..4538af439650f 100644 --- a/.github/workflows/ci-unit-broker.yml +++ b/.github/workflows/ci-unit-broker.yml @@ -87,3 +87,17 @@ jobs: - name: run unit test pulsar-broker if: steps.docs.outputs.changed_only == 'no' run: mvn test '-Dtest=!PersistentTransactionBufferTest,!PulsarFunctionE2ESecurityTest,!ServerCnxTest,!AdminApiOffloadTest,!AdminApiSchemaValidationEnforced,!V1_AdminApiTest2,!ProxyPublishConsumeTlsTest,!PulsarFunctionE2ETest,!MessageIdSerialization,!AdminApiTest2,!PulsarFunctionLocalRunTest,!PartitionedProducerConsumerTest,!KafkaProducerSimpleConsumerTest,!MessagePublishThrottlingTest,!ReaderTest,!RackAwareTest,!SimpleProducerConsumerTest,!V1_ProducerConsumerTest,!PersistentFailoverE2ETest,!BrokerClientIntegrationTest,!ReplicatorRateLimiterTest' -DfailIfNoTests=false -pl pulsar-broker + + - name: package surefire artifacts + if: failure() + run: | + rm -rf artifacts + mkdir artifacts + find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \; + zip -r artifacts.zip artifacts + - uses: actions/upload-artifact@master + name: upload surefire-artifacts + if: failure() + with: + name: surefire-artifacts + path: artifacts.zip diff --git a/.github/workflows/ci-unit-flaky.yaml b/.github/workflows/ci-unit-flaky.yaml index 76ce7e84fcba4..8d3f0fb5e8895 100644 --- a/.github/workflows/ci-unit-flaky.yaml +++ b/.github/workflows/ci-unit-flaky.yaml @@ -52,25 +52,33 @@ jobs: with: maven-version: 3.6.1 + - name: build the project + if: steps.docs.outputs.changed_only == 'no' + run: mvn -B clean install -DskipTests + - name: run PersistentTransactionBuffer test if: steps.docs.outputs.changed_only == 'no' - run: mvn -B -am -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=PersistentTransactionBufferTest -DfailIfNoTests=false + run: mvn -B -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=PersistentTransactionBufferTest -DfailIfNoTests=false -fn - name: run ServerCnx test if: steps.docs.outputs.changed_only == 'no' - run: mvn -B -am -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=ServerCnxTest -DfailIfNoTests=false + run: mvn -B -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=ServerCnxTest -DfailIfNoTests=false -fn - name: run MessagePublishThrottling test if: steps.docs.outputs.changed_only == 'no' - run: mvn -B -am -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=MessagePublishThrottlingTest -DfailIfNoTests=false + run: mvn -B -pl pulsar-broker -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=MessagePublishThrottlingTest -DfailIfNoTests=false -fn - name: run PrimitiveSchema test if: steps.docs.outputs.changed_only == 'no' - run: mvn -B -am -pl pulsar-client -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=PrimitiveSchemaTest -DfailIfNoTests=false + run: mvn -B -pl pulsar-client -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=PrimitiveSchemaTest -DfailIfNoTests=false -fn + + - name: run BlobStoreManagedLedgerOffloaderTest + if: steps.docs.outputs.changed_only == 'no' + run: mvn -B -pl tiered-storage/jcloud -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install -Dtest=BlobStoreManagedLedgerOffloaderTest -DfailIfNoTests=false -fn - name: run unit tests if: steps.docs.outputs.changed_only == 'no' - run: mvn -B -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR clean install '-Dtest=PulsarFunctionE2ESecurityTest,AdminApiOffloadTest,AdminApiSchemaValidationEnforced,V1_AdminApiTest2,PulsarFunctionE2ETest,MessageIdSerialization,AdminApiTest2,PulsarFunctionLocalRunTest,PartitionedProducerConsumerTest,KafkaProducerSimpleConsumerTest,ProxyTest' -DfailIfNoTests=false + run: mvn -B -Dorg.slf4j.simpleLogger.defaultLogLevel=ERROR test '-Dtest=PulsarFunctionE2ESecurityTest,AdminApiOffloadTest,AdminApiSchemaValidationEnforced,V1_AdminApiTest2,PulsarFunctionE2ETest,MessageIdSerialization,AdminApiTest2,PulsarFunctionLocalRunTest,PartitionedProducerConsumerTest,KafkaProducerSimpleConsumerTest,ProxyTest' -DfailIfNoTests=false -fn - name: package surefire artifacts if: failure() diff --git a/.github/workflows/ci-unit-proxy.yaml b/.github/workflows/ci-unit-proxy.yaml index 8b4d08ec628fa..83e3f3d1112e6 100644 --- a/.github/workflows/ci-unit-proxy.yaml +++ b/.github/workflows/ci-unit-proxy.yaml @@ -60,6 +60,21 @@ jobs: if: steps.docs.outputs.changed_only == 'no' run: mvn test -DfailIfNoTests=false '-Dtest=!ProxyTest,!ProxyLookupThrottlingTest' -pl pulsar-proxy - - name: run unit tests pulsar proxy + - name: run proxy lookup throttling test if: steps.docs.outputs.changed_only == 'no' - run: mvn test -DfailIfNoTests=false '-Dtest=ProxyTest,ProxyLookupThrottlingTest' -pl pulsar-proxy + run: mvn test -DfailIfNoTests=false '-Dtest=ProxyLookupThrottlingTest' -pl pulsar-proxy + + - name: package surefire artifacts + if: failure() + run: | + rm -rf artifacts + mkdir artifacts + find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \; + zip -r artifacts.zip artifacts + + - uses: actions/upload-artifact@master + name: upload surefire-artifacts + if: failure() + with: + name: surefire-artifacts + path: artifacts.zip diff --git a/.github/workflows/ci-unit.yaml b/.github/workflows/ci-unit.yaml index f79b78528c38a..b429d1639bcb0 100644 --- a/.github/workflows/ci-unit.yaml +++ b/.github/workflows/ci-unit.yaml @@ -58,7 +58,7 @@ jobs: - name: run unit tests if: steps.docs.outputs.changed_only == 'no' - run: mvn clean install -DfailIfNoTests=false '-Dtest=!KafkaProducerSimpleConsumerTest,!PrimitiveSchemaTest' -pl '!pulsar-broker,!pulsar-proxy,!pulsar-broker-auth-sasl,!pulsar-io/kafka-connect-adaptor,!tests/pulsar-storm-test' + run: mvn install -DfailIfNoTests=false '-Dtest=!KafkaProducerSimpleConsumerTest,!PrimitiveSchemaTest,!BlobStoreManagedLedgerOffloaderTest' -pl '!pulsar-broker,!pulsar-proxy,!pulsar-broker-auth-sasl,!pulsar-io/kafka-connect-adaptor,!tests/pulsar-storm-test' - name: package surefire artifacts if: failure() @@ -67,6 +67,7 @@ jobs: mkdir artifacts find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \; zip -r artifacts.zip artifacts + - uses: actions/upload-artifact@master name: upload surefire-artifacts if: failure() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index 1a4231e83e98b..f01f771149a1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -749,7 +749,7 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception { channel.writeInbound(clientCommand); Object response = getResponse(); - assertTrue(response instanceof CommandError); + assertTrue(response instanceof CommandError, "Response is not CommandError but " + response); CommandError error = (CommandError) response; assertEquals(error.getError(), ServerError.ServiceNotReady); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java index 2647bbba380a8..8fad844bc8a8e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java @@ -106,7 +106,7 @@ public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation( @Test public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws PulsarClientException, PulsarAdminException { conf.setAllowAutoTopicCreation(false); - final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation"; + final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation-" + System.currentTimeMillis(); admin.topics().createPartitionedTopic(topic, 3); MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl) pulsarClient.newConsumer().topic(topic).subscriptionName("sub-1").subscribe(); Assert.assertNotNull(consumer); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index ac174d29dafe0..a54dcd853c701 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -717,6 +717,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { msg = subscriber1.receive(5, TimeUnit.SECONDS); // Verify: as active-subscriber2 has not consumed messages: EntryCache must have those entries in cache + retryStrategically((test) -> entryCache.getSize() > 0, 10, 100); assertTrue(entryCache.getSize() != 0); // 3.b Close subscriber2: which will trigger cache to clear the cache diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index 40a24f988380b..a1622e1fd5317 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -70,6 +70,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; @@ -343,9 +344,10 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws versionField.set(cnx, 3); // (1) send non-batch message: consumer should be able to consume + MessageId lastNonBatchedMessageId = null; for (int i = 0; i < numMessagesPerBatch; i++) { String message = "my-message-" + i; - producer.send(message.getBytes()); + lastNonBatchedMessageId = producer.send(message.getBytes()); } Set messageSet = Sets.newHashSet(); Message msg = null; @@ -362,7 +364,7 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws consumer1.setClientCnx(null); // (2) send batch-message which should not be able to consume: as broker will disconnect the consumer for (int i = 0; i < numMessagesPerBatch; i++) { - String message = "my-message-" + i; + String message = "my-batch-message-" + i; batchProducer.sendAsync(message.getBytes()); } batchProducer.flush(); @@ -378,13 +380,14 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws .subscriptionName(subscriptionName) .subscriptionType(subType) .subscribe(); + consumer2.seek(lastNonBatchedMessageId); messageSet.clear(); for (int i = 0; i < numMessagesPerBatch; i++) { msg = consumer2.receive(1, TimeUnit.SECONDS); String receivedMessage = new String(msg.getData()); log.debug("Received message: [{}]", receivedMessage); - String expectedMessage = "my-message-" + i; + String expectedMessage = "my-batch-message-" + i; testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); consumer2.acknowledge(msg); } diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc index 25addb97a8ba5..c1ee3e945753b 100644 --- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc +++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc @@ -256,7 +256,6 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback callback) { for (ConsumerMap::const_iterator consumer = consumers_.begin(); consumer != consumers_.end(); consumer++) { - LOG_DEBUG("Unsubcribing Consumer - " << consumer->first); (consumer->second) ->unsubscribeAsync(std::bind(&MultiTopicsConsumerImpl::handleUnsubscribedAsync, shared_from_this(), std::placeholders::_1, consumerUnsubed, diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index 2794a1ce8d3e1..1477515969ec5 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -98,7 +98,6 @@ def send_callback(res, msg): client.close() """ -import logging import _pulsar @@ -336,7 +335,6 @@ def __init__(self, service_url, message_listener_threads=1, concurrent_lookup_requests=50000, log_conf_file_path=None, - logger=None, use_tls=False, tls_trust_certs_file_path=None, tls_allow_insecure_connection=False, @@ -370,8 +368,6 @@ def __init__(self, service_url, to prevent overload on the broker. * `log_conf_file_path`: Initialize log4cxx from a configuration file. - * `logger`: - Set a Python logger for this Pulsar client. * `use_tls`: Configure whether to use TLS encryption on the connection. This setting is deprecated. TLS will be automatically enabled if the `serviceUrl` is @@ -394,7 +390,6 @@ def __init__(self, service_url, _check_type(int, message_listener_threads, 'message_listener_threads') _check_type(int, concurrent_lookup_requests, 'concurrent_lookup_requests') _check_type_or_none(str, log_conf_file_path, 'log_conf_file_path') - _check_type_or_none(logging.Logger, logger, 'logger') _check_type(bool, use_tls, 'use_tls') _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') @@ -409,8 +404,6 @@ def __init__(self, service_url, conf.concurrent_lookup_requests(concurrent_lookup_requests) if log_conf_file_path: conf.log_conf_file_path(log_conf_file_path) - if logger: - conf.set_logger(logger) if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): conf.use_tls(True) if tls_trust_certs_file_path: diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 44d981d552bd6..306886e1bea9e 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -18,102 +18,6 @@ */ #include "utils.h" -class LoggerWrapper: public Logger { - std::string _logger; - PyObject* _pyLogger; - int _currentPythonLogLevel = 10 + (Logger::LEVEL_INFO*10); - - void _updateCurrentPythonLogLevel() { - PyGILState_STATE state = PyGILState_Ensure(); - - try { - _currentPythonLogLevel = py::call_method(_pyLogger, "getEffectiveLevel"); - } catch (py::error_already_set e) { - PyErr_Print(); - } - - PyGILState_Release(state); - }; - -public: - - LoggerWrapper(const std::string &logger, PyObject* pyLogger) : _logger(logger) { - _pyLogger = pyLogger; - Py_XINCREF(_pyLogger); - - _updateCurrentPythonLogLevel(); - } - - LoggerWrapper(const LoggerWrapper& other) { - _pyLogger = other._pyLogger; - Py_XINCREF(_pyLogger); - } - - LoggerWrapper& operator=(const LoggerWrapper& other) { - _pyLogger = other._pyLogger; - Py_XINCREF(_pyLogger); - return *this; - } - - virtual ~LoggerWrapper() { - Py_XDECREF(_pyLogger); - } - - bool isEnabled(Level level) { - return 10 + (level*10) >= _currentPythonLogLevel; - } - - void log(Level level, int line, const std::string& message) { - PyGILState_STATE state = PyGILState_Ensure(); - - try { - switch (level) { - case Logger::LEVEL_DEBUG: - py::call_method(_pyLogger, "debug", message.c_str()); - break; - case Logger::LEVEL_INFO: - py::call_method(_pyLogger, "info", message.c_str()); - break; - case Logger::LEVEL_WARN: - py::call_method(_pyLogger, "warning", message.c_str()); - break; - case Logger::LEVEL_ERROR: - py::call_method(_pyLogger, "error", message.c_str()); - break; - } - - } catch (py::error_already_set e) { - PyErr_Print(); - } - - PyGILState_Release(state); - } -}; - -class LoggerWrapperFactory : public LoggerFactory { - static LoggerWrapperFactory* _instance; - PyObject* _pyLogger; - - LoggerWrapperFactory(py::object pyLogger) { - _pyLogger = pyLogger.ptr(); - Py_XINCREF(_pyLogger); - } - -public: - virtual ~LoggerWrapperFactory() { - Py_XDECREF(_pyLogger); - } - - Logger* getLogger(const std::string &fileName) { - return new LoggerWrapper(fileName, _pyLogger); - } - - static LoggerFactoryPtr create(py::object pyLogger) { - return LoggerFactoryPtr(new LoggerWrapperFactory(pyLogger)); - } -}; - - template struct ListenerWrapper { PyObject* _pyListener; @@ -170,12 +74,6 @@ static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfigur return conf; } -static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& conf, - py::object logger) { - conf.setLogger(LoggerWrapperFactory::create(logger)); - return conf; -} - void export_config() { using namespace boost::python; @@ -191,7 +89,6 @@ void export_config() { .def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>()) .def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath, return_value_policy()) .def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>()) - .def("set_logger", &ClientConfiguration_setLogger, return_self<>()) .def("use_tls", &ClientConfiguration::isUseTls) .def("use_tls", &ClientConfiguration::setUseTls, return_self<>()) .def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath) diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 54077eaa055dd..c1af51cdbbf95 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -2108,9 +2108,12 @@ TEST(BasicEndToEndTest, testPatternEmptyUnsubscribe) { ASSERT_EQ(consumer.getSubscriptionName(), subName); LOG_INFO("created topics consumer on a pattern that match 0 topics"); - ASSERT_EQ(ResultOk, consumer.unsubscribe()); + result = consumer.unsubscribe(); + LOG_INFO("unsubscribed topics consumer : " << result); + ASSERT_EQ(ResultOk, result) << "expected " << ResultOk << " but found " << result; - client.shutdown(); + // TODO: flaky test + // client.shutdown(); } // create a pattern consumer, which contains no match topics at beginning. diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 9865111572a5f..90c134d902c94 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -41,6 +41,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Paths; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -1031,10 +1033,12 @@ public void testUpdateFunctionSuccess() throws Exception { } @Test - public void testUpdateFunctionWithUrl() { + public void testUpdateFunctionWithUrl() throws Exception { Configurator.setRootLevel(Level.DEBUG); - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String fileLocation = file.getAbsolutePath(); String filePackageUrl = "file://" + fileLocation; FunctionConfig functionConfig = new FunctionConfig(); @@ -1447,7 +1451,9 @@ public void testDownloadFunctionHttpUrl() throws Exception { @Test public void testDownloadFunctionFile() throws Exception { - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String fileLocation = file.getAbsolutePath(); String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService); StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation, null).getEntity(); @@ -1461,10 +1467,12 @@ public void testDownloadFunctionFile() throws Exception { } @Test - public void testRegisterFunctionFileUrlWithValidSinkClass() { + public void testRegisterFunctionFileUrlWithValidSinkClass() throws Exception { Configurator.setRootLevel(Level.DEBUG); - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String fileLocation = file.getAbsolutePath(); String filePackageUrl = "file://" + fileLocation; when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); @@ -1492,14 +1500,16 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() { } @Test - public void testRegisterFunctionWithConflictingFields() { + public void testRegisterFunctionWithConflictingFields() throws Exception { Configurator.setRootLevel(Level.DEBUG); String actualTenant = "DIFFERENT_TENANT"; String actualNamespace = "DIFFERENT_NAMESPACE"; String actualName = "DIFFERENT_NAME"; this.namespaceList.add(actualTenant + "/" + actualNamespace); - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String fileLocation = file.getAbsolutePath(); String filePackageUrl = "file://" + fileLocation; when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java index b65d5402f7b35..16e5e3077f72e 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java @@ -37,6 +37,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.net.URL; +import java.nio.file.Paths; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -1488,7 +1490,9 @@ public void testDownloadFunctionHttpUrl() throws Exception { @Test public void testDownloadFunctionFile() throws Exception { - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String fileLocation = file.getAbsolutePath(); String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath(); WorkerService worker = mock(WorkerService.class); doReturn(true).when(worker).isInitialized(); @@ -1507,10 +1511,12 @@ public void testDownloadFunctionFile() throws Exception { } @Test - public void testRegisterFunctionFileUrlWithValidSinkClass() { + public void testRegisterFunctionFileUrlWithValidSinkClass() throws Exception { Configurator.setRootLevel(Level.DEBUG); - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String fileLocation = file.getAbsolutePath(); String filePackageUrl = "file://" + fileLocation; when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); @@ -1533,14 +1539,16 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() { } @Test - public void testRegisterFunctionWithConflictingFields() { + public void testRegisterFunctionWithConflictingFields() throws Exception { Configurator.setRootLevel(Level.DEBUG); String actualTenant = "DIFFERENT_TENANT"; String actualNamespace = "DIFFERENT_NAMESPACE"; String actualName = "DIFFERENT_NAME"; this.namespaceList.add(actualTenant + "/" + actualNamespace); - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String fileLocation = file.getAbsolutePath(); String filePackageUrl = "file://" + fileLocation; when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true); when(mockedManager.containsFunction(eq(actualTenant), eq(actualNamespace), eq(actualName))).thenReturn(false); @@ -1563,10 +1571,12 @@ public void testRegisterFunctionWithConflictingFields() { } @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function language runtime is either not set or cannot be determined") - public void testCreateFunctionWithoutSettingRuntime() { + public void testCreateFunctionWithoutSettingRuntime() throws Exception { Configurator.setRootLevel(Level.DEBUG); - String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + URL fileUrl = getClass().getClassLoader().getResource("test_worker_config.yml"); + File file = Paths.get(fileUrl.toURI()).toFile(); + String fileLocation = file.getAbsolutePath(); String filePackageUrl = "file://" + fileLocation; when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false); diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java index f9e4e11203cc3..eb5a32e32720e 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyTest.java @@ -45,6 +45,7 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ClientCnx; import org.apache.pulsar.client.impl.ConnectionPool; @@ -112,22 +113,27 @@ protected void cleanup() throws Exception { @Test public void testProducer() throws Exception { + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) .build(); - Producer producer = client.newProducer(Schema.BYTES).topic("persistent://sample/test/local/producer-topic") - .create(); + + @Cleanup + Producer producer = client.newProducer() + .topic("persistent://sample/test/local/producer-topic") + .create(); for (int i = 0; i < 10; i++) { producer.send("test".getBytes()); } - - client.close(); } @Test public void testProducerConsumer() throws Exception { + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) .build(); + + @Cleanup Producer producer = client.newProducer(Schema.BYTES) .topic("persistent://sample/test/local/producer-consumer-topic") .enableBatching(false) @@ -135,6 +141,7 @@ public void testProducerConsumer() throws Exception { .create(); // Create a consumer directly attached to broker + @Cleanup Consumer consumer = pulsarClient.newConsumer() .topic("persistent://sample/test/local/producer-consumer-topic").subscriptionName("my-sub").subscribe(); @@ -150,24 +157,24 @@ public void testProducerConsumer() throws Exception { Message msg = consumer.receive(0, TimeUnit.SECONDS); checkArgument(msg == null); - - consumer.close(); - client.close(); } @Test public void testPartitions() throws Exception { admin.tenants().createTenant("sample", new TenantInfo()); + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) .build(); admin.topics().createPartitionedTopic("persistent://sample/test/local/partitioned-topic", 2); + @Cleanup Producer producer = client.newProducer(Schema.BYTES) .topic("persistent://sample/test/local/partitioned-topic") .enableBatching(false) .messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create(); // Create a consumer directly attached to broker + @Cleanup Consumer consumer = pulsarClient.newConsumer().topic("persistent://sample/test/local/partitioned-topic") .subscriptionName("my-sub").subscribe(); @@ -179,8 +186,6 @@ public void testPartitions() throws Exception { Message msg = consumer.receive(1, TimeUnit.SECONDS); checkNotNull(msg); } - - client.close(); } @Test @@ -191,12 +196,12 @@ public void testRegexSubscription() throws Exception { // create two topics by subscribing to a topic and closing it try (Consumer ignored = client.newConsumer() - .topic("persistent://sample/test/local/topic1") + .topic("persistent://sample/test/local/regex-sub-topic1") .subscriptionName("proxy-ignored") .subscribe()) { } try (Consumer ignored = client.newConsumer() - .topic("persistent://sample/test/local/topic2") + .topic("persistent://sample/test/local/regex-sub-topic2") .subscriptionName("proxy-ignored") .subscribe()) { } @@ -204,11 +209,12 @@ public void testRegexSubscription() throws Exception { String subName = "regex-sub-proxy-test-" + System.currentTimeMillis(); // make sure regex subscription - String regexSubscriptionPattern = "persistent://sample/test/local/topic.*"; + String regexSubscriptionPattern = "persistent://sample/test/local/regex-sub-topic.*"; log.info("Regex subscribe to topics {}", regexSubscriptionPattern); try (Consumer consumer = client.newConsumer() .topicsPattern(regexSubscriptionPattern) .subscriptionName(subName) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe()) { log.info("Successfully subscribe to topics using regex {}", regexSubscriptionPattern); @@ -231,13 +237,14 @@ public void testRegexSubscription() throws Exception { @Test public void testGetSchema() throws Exception { + @Cleanup PulsarClient client = PulsarClient.builder().serviceUrl(proxyService.getServiceUrl()) .build(); - Producer producer; Schema schema = Schema.AVRO(Foo.class); try { - producer = client.newProducer(schema).topic("persistent://sample/test/local/get-schema") - .create(); + try (Producer ignored = client.newProducer(schema).topic("persistent://sample/test/local/get-schema") + .create()) { + } } catch (Exception ex) { Assert.fail("Should not have failed since can acquire LookupRequestSemaphore"); } @@ -246,10 +253,9 @@ public void testGetSchema() throws Exception { for (int i = 0; i<8; i++){ schemaVersion[i] = b; } - SchemaInfo schemaInfo = ((PulsarClientImpl)client).getLookup() + SchemaInfo schemaInfo = ((PulsarClientImpl) client).getLookup() .getSchema(TopicName.get("persistent://sample/test/local/get-schema"), schemaVersion).get().orElse(null); Assert.assertEquals(new String(schemaInfo.getSchema()), new String(schema.getSchemaInfo().getSchema())); - client.close(); } @Test @@ -259,9 +265,14 @@ public void testProtocolVersionAdvertisement() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl(proxyService.getServiceUrl()); + + @Cleanup PulsarClient client = getClientActiveConsumerChangeNotSupported(conf); + @Cleanup Producer producer = client.newProducer().topic(topic).create(); + + @Cleanup Consumer consumer = client.newConsumer().topic(topic).subscriptionName(sub) .subscriptionType(SubscriptionType.Failover).subscribe(); @@ -274,10 +285,6 @@ public void testProtocolVersionAdvertisement() throws Exception { checkNotNull(msg); consumer.acknowledge(msg); } - - producer.close(); - consumer.close(); - client.close(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/FunctionsCLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/FunctionsCLITest.java index 181876911974a..74503b5306519 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/FunctionsCLITest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/FunctionsCLITest.java @@ -58,7 +58,8 @@ public String uploadFunction() throws Exception { return bkPkgPath; } - @Test + // Flaky Test: https://github.com/apache/pulsar/issues/6179 + // @Test public void testUploadDownload() throws Exception { String bkPkgPath = uploadFunction(); String localPkgFile = "/tmp/checkdownload-" + randomName(16);