Skip to content

Commit

Permalink
PIP-91: Separate lookup timeout from operation timeout (apache#11627)
Browse files Browse the repository at this point in the history
* PIP-91: Separate lookup timeout from operation timeout

This patch contains a number of changes.

TooManyRequests is retried for partition metadata and lookups

Lookup timeout configuration has been added. By default it matches
operation timeout.

Partition metadata timeout calculation has been fixed to calculate
the elapsed time correctly.

Small refactor on broker construction to allow a mocked ServerCnx
implementation for testing. Unfortunately, the test takes over 50
seconds, but this is unavoidable due to the fact that we're working
with timeouts here.

PulsarClientExceptions have been reworked to contain more
context (remote/local/reqid) and any previous exceptions which may
have occurred triggering retries. The previous exceptions must be
manually recorded, so this only applies to lookups on the consumer
side for now.

* Fixup for test failures

BrokerClientIntegrationTest#testCloseConnectionOnBrokerRejected was
depending on the fact that TooManyRequests was previously fatal for
partition metadata request. Now that it retries, that test was
failing. It's a bad test anyhow, depending on thread interactions and
whatnot. I've rewritten it to use the ServerCnx mock. It now actually
tests for the thing it should, that clients close the connection after
the max rejects.

The schema tests were failing because they expected a certain
exception message which has been extended. I changes endsWith to
contains.

I also added Producer retries similiar to the Consumer ones. I was
going to do as a followon PR, but decided to put in this one.

Co-authored-by: Ivan Kelly <[email protected]>
  • Loading branch information
ivankelly and Ivan Kelly authored Aug 17, 2021
1 parent 80171a7 commit b557e24
Show file tree
Hide file tree
Showing 20 changed files with 801 additions and 212 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.resourcegroup.ResourceUsageTransportManager.DISABLE_RESOURCE_USAGE_TRANSPORT_MANAGER;
import static org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl.TRANSACTION_LOG_PREFIX;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -241,7 +242,7 @@ public class PulsarService implements AutoCloseable {
private ProtocolHandlers protocolHandlers = null;

private final ShutdownService shutdownService;
private final EventLoopGroup ioEventLoopGroup;
protected final EventLoopGroup ioEventLoopGroup;

private MetricsGenerator metricsGenerator;

Expand Down Expand Up @@ -658,7 +659,7 @@ public void start() throws PulsarServerException {
config, localMetadataStore, getZkClient(), bkClientFactory, ioEventLoopGroup
);

this.brokerService = new BrokerService(this, ioEventLoopGroup);
this.brokerService = newBrokerService(this);

// Start load management service (even if load balancing is disabled)
this.loadManager.set(LoadManager.create(this));
Expand Down Expand Up @@ -1678,4 +1679,8 @@ private static boolean isTransactionSystemTopic(TopicName topicName) {
|| topic.endsWith(MLPendingAckStore.PENDING_ACK_STORE_SUFFIX);
}

@VisibleForTesting
protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
return new BrokerService(pulsar, ioEventLoopGroup);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -254,6 +255,7 @@ public class BrokerService implements Closeable {
@Getter
private final BundlesQuotas bundlesQuotas;

private PulsarChannelInitializer.Factory pulsarChannelInitFactory = PulsarChannelInitializer.DEFAULT_FACTORY;
private Channel listenChannel;
private Channel listenChannelTls;

Expand Down Expand Up @@ -410,7 +412,8 @@ public void start() throws Exception {

ServiceConfiguration serviceConfig = pulsar.getConfiguration();

bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
bootstrap.childHandler(
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, false));

Optional<Integer> port = serviceConfig.getBrokerServicePort();
if (port.isPresent()) {
Expand All @@ -427,7 +430,8 @@ public void start() throws Exception {
Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
if (tlsPort.isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, true));
tlsBootstrap.childHandler(
pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, true));
try {
listenChannelTls = tlsBootstrap.bind(new InetSocketAddress(
pulsar.getBindAddress(), tlsPort.get())).sync()
Expand Down Expand Up @@ -2647,4 +2651,9 @@ public void resumedConnections(int numberOfConnections) {
public long getPausedConnections() {
return pausedConnections.longValue();
}

@VisibleForTesting
public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) {
this.pulsarChannelInitFactory = factory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.bookkeeper.util.SafeRunnable.safeRun;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
Expand Down Expand Up @@ -129,7 +130,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
// ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling
// auto-read.
ch.pipeline().addLast("flowController", new FlowControlHandler());
ServerCnx cnx = new ServerCnx(pulsar);
ServerCnx cnx = newServerCnx(pulsar);
ch.pipeline().addLast("handler", cnx);

connections.put(ch.remoteAddress(), cnx);
Expand All @@ -144,4 +145,16 @@ private void refreshAuthenticationCredentials() {
}
});
}

@VisibleForTesting
protected ServerCnx newServerCnx(PulsarService pulsar) throws Exception {
return new ServerCnx(pulsar);
}

public interface Factory {
PulsarChannelInitializer newPulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws Exception;
}

public static final Factory DEFAULT_FACTORY =
(pulsar, tls) -> new PulsarChannelInitializer(pulsar, tls);
}
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,12 @@ protected void customizeNewPulsarAdminBuilder(PulsarAdminBuilder pulsarAdminBuil
}

protected PulsarService startBroker(ServiceConfiguration conf) throws Exception {

return startBrokerWithoutAuthorization(conf);
}

protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration conf) throws Exception {
conf.setBrokerShutdownTimeoutMs(0L);
PulsarService pulsar = spy(new PulsarService(conf));
PulsarService pulsar = spy(newPulsarService(conf));
setupBrokerMocks(pulsar);
beforePulsarStartMocks(pulsar);
pulsar.start();
Expand All @@ -295,6 +294,10 @@ protected PulsarService startBrokerWithoutAuthorization(ServiceConfiguration con
return pulsar;
}

protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
return new PulsarService(conf);
}

protected void setupBrokerMocks(PulsarService pulsar) throws Exception {
// Override default providers with mocked ones
doReturn(mockZooKeeperClientFactory).when(pulsar).getZooKeeperClientFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ private void checkLookupException(String tenant, String namespace, PulsarClient
.create();
} catch (PulsarClientException t) {
Assert.assertTrue(t instanceof PulsarClientException.LookupException);
Assert.assertEquals(t.getMessage(), "java.lang.IllegalStateException: The leader election has not yet been completed!");
Assert.assertTrue(
t.getMessage().contains(
"java.lang.IllegalStateException: The leader election has not yet been completed!"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -79,13 +82,18 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -879,42 +887,114 @@ public void testTlsAuthUseTrustCert() throws Exception {
*/
@Test
public void testLookupThrottlingForClientByClient() throws Exception {
// This test looks like it could be flakey, if the broker responds
// quickly enough, there may never be concurrency in requests
final String topicName = "persistent://prop/ns-abc/newTopic";

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.maxConcurrentLookupRequests(1)
.maxLookupRequests(2)
.build();
PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
resolver.updateServiceUrl(pulsar.getBrokerServiceUrl());
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConcurrentLookupRequest(1);
conf.setMaxLookupRequest(2);

EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
long reqId = 0xdeadbeef;
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
// for PMR
// 2 lookup will succeed
long reqId1 = reqId++;
ByteBuf request1 = Commands.newPartitionMetadataRequest(topicName, reqId1);
CompletableFuture<?> f1 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request1, reqId1));

long reqId2 = reqId++;
ByteBuf request2 = Commands.newPartitionMetadataRequest(topicName, reqId2);
CompletableFuture<?> f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request2, reqId2));

f1.get();
f2.get();

// 3 lookup will fail
long reqId3 = reqId++;
ByteBuf request3 = Commands.newPartitionMetadataRequest(topicName, reqId3);
f1 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request3, reqId3));

long reqId4 = reqId++;
ByteBuf request4 = Commands.newPartitionMetadataRequest(topicName, reqId4);
f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request4, reqId4));

long reqId5 = reqId++;
ByteBuf request5 = Commands.newPartitionMetadataRequest(topicName, reqId5);
CompletableFuture<?> f3 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request5, reqId5));

// 2 lookup will success.
try {
CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub1").subscribeAsync();
CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub2").subscribeAsync();
try {
f1.get();
f2.get();
f3.get();
fail("At least one should fail");
} catch (ExecutionException e) {
Throwable rootCause = e;
while (rootCause instanceof ExecutionException) {
rootCause = rootCause.getCause();
}
if (!(rootCause instanceof
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
throw e;
}
}

consumer1.get().close();
consumer2.get().close();
} catch (Exception e) {
fail("Subscribe should success with 2 requests");
}
// for Lookup
// 2 lookup will succeed
long reqId6 = reqId++;
ByteBuf request6 = Commands.newLookup(topicName, true, reqId6);
f1 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request6, reqId6));

long reqId7 = reqId++;
ByteBuf request7 = Commands.newLookup(topicName, true, reqId7);
f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request7, reqId7));

f1.get();
f2.get();

// 3 lookup will fail
long reqId8 = reqId++;
ByteBuf request8 = Commands.newLookup(topicName, true, reqId8);
f1 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request8, reqId8));

long reqId9 = reqId++;
ByteBuf request9 = Commands.newLookup(topicName, true, reqId9);
f2 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request9, reqId9));

long reqId10 = reqId++;
ByteBuf request10 = Commands.newLookup(topicName, true, reqId10);
f3 = pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request10, reqId10));

// 3 lookup will fail
try {
CompletableFuture<Consumer<byte[]>> consumer1 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub11").subscribeAsync();
CompletableFuture<Consumer<byte[]>> consumer2 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub22").subscribeAsync();
CompletableFuture<Consumer<byte[]>> consumer3 = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub33").subscribeAsync();

consumer1.get().close();
consumer2.get().close();
consumer3.get().close();
fail("It should fail as throttling should only receive 2 requests");
} catch (Exception e) {
if (!(e.getCause() instanceof
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
fail("Subscribe should fail with TooManyRequestsException");
try {
f1.get();
f2.get();
f3.get();
fail("At least one should fail");
} catch (ExecutionException e) {
Throwable rootCause = e;
while (rootCause instanceof ExecutionException) {
rootCause = rootCause.getCause();
}
if (!(rootCause instanceof
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException)) {
throw e;
}
}

}
}

Expand Down
Loading

0 comments on commit b557e24

Please sign in to comment.