Skip to content

Commit

Permalink
[C++/Python] Fix bugs that were not exposed by broken C++ CI before (a…
Browse files Browse the repository at this point in the history
…pache#11557)

Fixes apache#11551 

### Motivation

Currently there're some bugs of C++ client and some tests cannot pass:

1. Introduced from apache#10601 because it changed the behavior of the admin API to get partition metadata while the C++ implementation relies on the original behavior to create topics automatically. So any test that uses HTTP lookup will fail.
    - AuthPluginTest.testTlsDetectHttps
    - AuthPluginToken.testTokenWithHttpUrl
    - BasicEndToEndTest.testHandlerReconnectionLogic
    - BasicEndToEndTest.testV2TopicHttp
    - ClientDeduplicationTest.testProducerDeduplication
2. Introduced from apache#11029 and apache#11486 , the implementation will iterate more than once even there's only one valid resolved IP address.
    - ClientTest.testConnectTimeout

In addition, there's an existed flaky test from very early time: ClientTest.testLookupThrottling.

Python tests are also broken. Because it must run after all C++ tests passed, they're also not exposed.
1. Some tests in `pulsar_test.py` might encounter `Timeout` error when creating producers or consumers.
2. Some tests in `schema_test.py` failed because some comparisons between two `ComplexRecord`s failed. 

Since the CI test of C++ client would never fail after apache#10309 (will be fixed by apache#11575), all PRs about C++ or Python client are not verified even if CI passed. Before apache#11575 is merged, we need to fix all existed bugs of C++ client.

### Modifications

Corresponding to the above tests group, this PR adds following modifications:
1. Add the `?checkAllowAutoCreation=true` URL suffix to allow HTTP lookup to create topics automatically.
2. When iterating through a resolved IP list, increase the iterator first, then run the connection timer and try to connect the next IP.

Regarding to the flaky `testLookupThrottling`, this PR adds a `client.close()` at the end of test and fix the `ClientImpl::close` implementation. Before this PR, if there're no producers or consumers in a client, the `close()` method wouldn't call `shutdown()` to close connection poll and executors. Only after the `Client` instance was destructed would the `shutdown()` method be called. In this case, this PR calls `handleClose` instead of invoking callback directly. In addition, change the log level of this test to debug.

This PR also fixes the failed timeout Python tests, some are caused by incorrect import of classes, some are caused by `client` was not closed.

Regarding to Python schema tests, in Python2, `self.__ne__(other)` is not equivalent to `not self.__eq__(other)` when the default `__eq__` implementation is overwritten. If a `Record` object has a field whose type is also `Record`, the `Record.__ne__` method will be called, see

https://github.com/apache/pulsar/blob/ddb5fb0e062c2fe0967efce2a443a31f9cd12c07/pulsar-client-cpp/python/pulsar/schema/definition.py#L138-L139

but it just uses the default implementation to check whether they're not equal. The custom `__eq__` method won't be called. Therefore, this PR implement `Record.__ne__` explicitly to call `Record.__eq__` so that the comparison will work for Python2.

### Verifying this change

We can only check the workflow output to verify this change.
  • Loading branch information
BewareMyPower authored Aug 8, 2021
1 parent c4a2572 commit 4919a82
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 28 deletions.
39 changes: 27 additions & 12 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
consumerStatsRequestTimer_(executor_->createDeadlineTimer()),
numOfPendingLookupRequest_(0),
isTlsAllowInsecureConnection_(false) {
LOG_INFO(cnxString_ << "Create ClientConnection, timeout=" << clientConfiguration.getConnectionTimeout());
if (clientConfiguration.isUseTls()) {
#if BOOST_VERSION >= 105400
boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client);
Expand Down Expand Up @@ -433,21 +434,28 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
handleHandshake(boost::system::errc::make_error_code(boost::system::errc::success));
}
} else if (endpointIterator != tcp::resolver::iterator()) {
LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
// The connection failed. Try the next endpoint in the list.
boost::system::error_code err;
socket_->close(err); // ignore the error of close
if (err) {
boost::system::error_code closeError;
socket_->close(closeError); // ignore the error of close
if (closeError) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
connectTimeoutTask_->stop();
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
socket_->async_connect(endpoint, std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
std::placeholders::_1, ++endpointIterator));
++endpointIterator;
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
tcp::endpoint endpoint = *endpointIterator;
socket_->async_connect(endpoint,
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
std::placeholders::_1, ++endpointIterator));
} else {
close();
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
close();
return;
}
}

Expand Down Expand Up @@ -512,7 +520,7 @@ void ClientConnection::tcpConnectAsync() {
return;
}

LOG_DEBUG(cnxString_ << "Connecting to " << service_url.host() << ":" << service_url.port());
LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
resolver_->async_resolve(query, std::bind(&ClientConnection::handleResolve, shared_from_this(),
std::placeholders::_1, std::placeholders::_2));
Expand All @@ -531,12 +539,16 @@ void ClientConnection::handleResolve(const boost::system::error_code& err,
if (state_ != TcpConnected) {
LOG_ERROR(cnxString_ << "Connection was not established in " << connectTimeoutTask_->getPeriodMs()
<< " ms, close the socket");
PeriodicTask::ErrorCode ignoredError;
socket_->close(ignoredError);
PeriodicTask::ErrorCode err;
socket_->close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
}
}
connectTimeoutTask_->stop();
});

LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
connectTimeoutTask_->start();
if (endpointIterator != tcp::resolver::iterator()) {
LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
Expand Down Expand Up @@ -1455,7 +1467,10 @@ void ClientConnection::close() {
}

if (tlsSocket_) {
tlsSocket_->lowest_layer().close();
tlsSocket_->lowest_layer().close(err);
if (err) {
LOG_WARN(cnxString_ << "Failed to close TLS socket: " << err.message());
}
}

if (executor_) {
Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ void ClientImpl::closeAsync(CloseCallback callback) {
}

if (*numberOfOpenHandlers == 0 && callback) {
callback(ResultOk);
handleClose(ResultOk, numberOfOpenHandlers, callback);
}
}

Expand Down
2 changes: 1 addition & 1 deletion pulsar-client-cpp/lib/ConnectionPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void ConnectionPool::close() {
if (poolConnections_) {
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
ClientConnectionPtr cnx = cnxIt->second.lock();
if (cnx && !cnx->isClosed()) {
if (cnx) {
cnx->close();
}
}
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/lib/HTTPLookupService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync
<< '/' << PARTITION_METHOD_NAME;
}

completeUrlStream << "?checkAllowAutoCreation=true";
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest,
shared_from_this(), promise, completeUrlStream.str(),
PartitionMetaData));
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/python/pulsar/schema/definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ def __eq__(self, other):
return False
return True

def __ne__(self, other):
return not self.__eq__(other)

def __str__(self):
return str(self.__dict__)

Expand Down
34 changes: 22 additions & 12 deletions pulsar-client-cpp/python/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@
from unittest import TestCase, main
import time
import os
import pulsar
import uuid
from datetime import timedelta
from pulsar import Client, MessageId, \
CompressionType, ConsumerType, PartitionsRoutingMode, \
AuthenticationTLS, Authentication, AuthenticationToken, InitialPosition, \
CryptoKeyReader

from _pulsar import ProducerConfiguration, ConsumerConfiguration, ConnectError
from _pulsar import ProducerConfiguration, ConsumerConfiguration

from schema_test import *

Expand Down Expand Up @@ -155,6 +156,7 @@ def test_producer_send(self):
consumer.acknowledge(msg)
print('receive from {}'.format(msg.message_id()))
self.assertEqual(msg_id, msg.message_id())
client.close()

def test_producer_consumer(self):
client = Client(self.serviceUrl)
Expand Down Expand Up @@ -292,7 +294,7 @@ def test_message_properties(self):
subscription_name='my-subscription',
schema=pulsar.schema.StringSchema())
producer = client.create_producer(topic=topic,
schema=StringSchema())
schema=pulsar.schema.StringSchema())
producer.send('hello',
properties={
'a': '1',
Expand All @@ -319,10 +321,11 @@ def test_tls_auth(self):
tls_allow_insecure_connection=False,
authentication=AuthenticationTLS(certs_dir + 'client-cert.pem', certs_dir + 'client-key.pem'))

consumer = client.subscribe('my-python-topic-tls-auth',
topic = 'my-python-topic-tls-auth-' + str(time.time())
consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-tls-auth')
producer = client.create_producer(topic)
producer.send(b'hello')

msg = consumer.receive(TM)
Expand All @@ -346,10 +349,11 @@ def test_tls_auth2(self):
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams))

consumer = client.subscribe('my-python-topic-tls-auth-2',
topic = 'my-python-topic-tls-auth-2-' + str(time.time())
consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-tls-auth-2')
producer = client.create_producer(topic)
producer.send(b'hello')

msg = consumer.receive(TM)
Expand Down Expand Up @@ -392,10 +396,11 @@ def test_tls_auth3(self):
tls_allow_insecure_connection=False,
authentication=Authentication(authPlugin, authParams))

consumer = client.subscribe('my-python-topic-tls-auth-3',
topic = 'my-python-topic-tls-auth-3-' + str(time.time())
consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-tls-auth-3')
producer = client.create_producer(topic)
producer.send(b'hello')

msg = consumer.receive(TM)
Expand Down Expand Up @@ -583,6 +588,8 @@ def test_producer_sequence_after_reconnection(self):
producer.send(b'hello-%d' % i)
self.assertEqual(producer.last_sequence_id(), i)

client.close()

doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
'false')

Expand Down Expand Up @@ -630,6 +637,8 @@ def test_producer_deduplication(self):
with self.assertRaises(pulsar.Timeout):
consumer.receive(100)

client.close()

doHttpPost(self.adminUrl + '/admin/v2/namespaces/public/default/deduplication',
'false')

Expand Down Expand Up @@ -820,10 +829,11 @@ def test_reader_has_message_available(self):

def test_seek(self):
client = Client(self.serviceUrl)
consumer = client.subscribe('my-python-topic-seek',
topic = 'my-python-topic-seek-' + str(time.time())
consumer = client.subscribe(topic,
'my-sub',
consumer_type=ConsumerType.Shared)
producer = client.create_producer('my-python-topic-seek')
producer = client.create_producer(topic)

for i in range(100):
if i > 0:
Expand Down Expand Up @@ -858,7 +868,7 @@ def test_seek(self):
self.assertEqual(msg.data(), b'hello-42')

# repeat with reader
reader = client.create_reader('my-python-topic-seek', MessageId.latest)
reader = client.create_reader(topic, MessageId.latest)
with self.assertRaises(pulsar.Timeout):
reader.read_next(100)

Expand Down Expand Up @@ -1157,7 +1167,7 @@ def test_connect_timeout(self):
try:
producer = client.create_producer('test_connect_timeout')
self.fail('create_producer should not succeed')
except ConnectError as expected:
except pulsar.ConnectError as expected:
print('expected error: {} when create producer'.format(expected))
t2 = time.time()
self.assertGreater(t2 - t1, 1.0)
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/tests/BasicEndToEndTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
std::string topicName = "testLookupThrottling";
ClientConfiguration config;
config.setConcurrentLookupRequest(0);
config.setLogger(new ConsoleLoggerFactory(Logger::LEVEL_DEBUG));
Client client(lookupUrl, config);

Producer producer;
Expand All @@ -307,6 +308,8 @@ TEST(BasicEndToEndTest, testLookupThrottling) {
Consumer consumer1;
result = client.subscribe(topicName, "my-sub-name", consumer1);
ASSERT_EQ(ResultTooManyLookupRequestException, result);

client.close();
}

TEST(BasicEndToEndTest, testNonExistingTopic) {
Expand Down
4 changes: 2 additions & 2 deletions pulsar-client-cpp/tests/CustomLoggerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
// reset to previous log factory
Client client("pulsar://localhost:6650", clientConfig);
client.close();
ASSERT_EQ(logLines.size(), 2);
ASSERT_EQ(logLines.size(), 3);
LogUtils::resetLoggerFactory();
});
testThread.join();
Expand All @@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
Client client("pulsar://localhost:6650", clientConfig);
client.close();
// custom logger didn't get any new lines
ASSERT_EQ(logLines.size(), 2);
ASSERT_EQ(logLines.size(), 3);
}

TEST(CustomLoggerTest, testConsoleLoggerFactory) {
Expand Down

0 comments on commit 4919a82

Please sign in to comment.