Skip to content

Commit

Permalink
[Reopen][Issue 5597]retry when getPartitionedTopicMetadata failed (ap…
Browse files Browse the repository at this point in the history
…ache#5734)

* [Issue 5597][pulsar-client-java]retry when getPartitionedTopicMetadata failed

Signed-off-by: xiaolong.ran <[email protected]>

* remove unuse code

Signed-off-by: xiaolong.ran <[email protected]>

* fix ci error
  • Loading branch information
wolfstudy authored Dec 3, 2019
1 parent 078ba44 commit e121108
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.slf4j.Logger;
Expand All @@ -47,6 +49,7 @@
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {

Expand Down Expand Up @@ -132,5 +135,63 @@ public void testTopicInternalStats() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testGetPartitionedTopicMetaData() throws Exception {
log.info("-- Starting {} test --", methodName);

final String topicName = "persistent://my-property/my-ns/my-topic1";
final String subscriptionName = "my-subscriber-name";

try {
String url = "http://localhost:" + BROKER_WEBSERVICE_PORT;
if (isTcpLookup) {
url = "pulsar://localhost:" + BROKER_PORT;
}
PulsarClient client = newPulsarClient(url, 0);

Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName(subscriptionName)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Producer<byte[]> producer = client.newProducer().topic(topicName).create();

consumer.close();
producer.close();
client.close();
} catch (PulsarClientException pce) {
log.error("create producer or consumer error: ", pce);
fail();
}

log.info("-- Exiting {} test --", methodName);
}

@Test (timeOut = 4000)
public void testGetPartitionedTopicDataTimeout() {
log.info("-- Starting {} test --", methodName);

final String topicName = "persistent://my-property/my-ns/my-topic1";

String url = "http://localhost:51000,localhost:51001";
if (isTcpLookup) {
url = "pulsar://localhost:51000,localhost:51001";
}

PulsarClient client;
try {
client = PulsarClient.builder()
.serviceUrl(url)
.statsInterval(0, TimeUnit.SECONDS)
.operationTimeout(3, TimeUnit.SECONDS)
.build();

Producer<byte[]> producer = client.newProducer().topic(topicName).create();

fail();
} catch (PulsarClientException pce) {
log.error("create producer error: ", pce);
}

log.info("-- Exiting {} test --", methodName);
}

private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -127,8 +128,9 @@ public void close() throws IOException {
public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
final CompletableFuture<T> future = new CompletableFuture<>();
try {
String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString();
String remoteHostName = serviceNameResolver.resolveHostUri().getHost();
URI hostUri = serviceNameResolver.resolveHostUri();
String requestUrl = new URL(hostUri.toURL(), path).toString();
String remoteHostName = hostUri.getHost();
AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);

CompletableFuture<Map<String, String>> authFuture = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,17 +651,42 @@ public CompletableFuture<Integer> getNumberOfPartitions(String topic) {

public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String topic) {

CompletableFuture<PartitionedTopicMetadata> metadataFuture;
CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();

try {
TopicName topicName = TopicName.get(topic);
metadataFuture = lookup.getPartitionedTopicMetadata(topicName);
AtomicLong opTimeoutMs = new AtomicLong(conf.getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.NANOSECONDS)
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(0, TimeUnit.NANOSECONDS)
.create();
getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture);
} catch (IllegalArgumentException e) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
}
return metadataFuture;
}

private void getPartitionedTopicMetadata(TopicName topicName,
Backoff backoff,
AtomicLong remainingTime,
CompletableFuture<PartitionedTopicMetadata> future) {
lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(e);
return null;
}

timer.newTimeout( task -> {
remainingTime.addAndGet(-nextDelay);
getPartitionedTopicMetadata(topicName, backoff, remainingTime, future);
}, nextDelay, TimeUnit.MILLISECONDS);
return null;
});
}

@Override
public CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
return getPartitionedTopicMetadata(topic).thenApply(metadata -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.pulsar.client.api.PulsarClient;

import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -48,9 +50,7 @@ public void testLowTimeout() throws Exception {
Assert.fail("Shouldn't be able to connect to anything");
} catch (Exception e) {
Assert.assertFalse(defaultFuture.isDone());
Assert.assertEquals(e.getCause().getCause().getCause().getClass(),
ConnectTimeoutException.class);
Assert.assertTrue((System.nanoTime() - startNanos) < TimeUnit.SECONDS.toNanos(3));
Assert.assertEquals(e.getCause().getCause().getCause().getClass(), ConnectTimeoutException.class);
}
}
}
Expand Down

0 comments on commit e121108

Please sign in to comment.