Skip to content

Commit

Permalink
[fix][client] fix producer/consumer perform lookup for migrated topic (
Browse files Browse the repository at this point in the history
…apache#21356)

Co-authored-by: Rajan Dhabalia <[email protected]>
  • Loading branch information
rdhabalia and Rajan Dhabalia authored Oct 14, 2023
1 parent a5f4c1e commit d09642c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,16 @@ protected void grabCnx() {
try {
CompletableFuture<ClientCnx> cnxFuture;
if (state.redirectedClusterURI != null) {
InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
state.redirectedClusterURI.getPort());
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
if (state.topic == null) {
InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(),
state.redirectedClusterURI.getPort());
cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection);
} else {
// once, client receives redirection url, client has to perform lookup on migrated
// cluster to find the broker that owns the topic and then create connection.
// below method, performs the lookup for a given topic and then creates connection
cnxFuture = state.client.getConnection(state.topic, (state.redirectedClusterURI.toString()));
}
} else if (state.topic == null) {
cnxFuture = state.client.getConnectionToServiceUrl();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,4 @@ public interface LookupService extends AutoCloseable {
*/
CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace, Mode mode,
String topicPattern, String topicsHash);

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -104,6 +105,7 @@ public class PulsarClientImpl implements PulsarClient {

private final boolean createdScheduledProviders;
private LookupService lookup;
private Map<String, LookupService> urlLookupMap = new ConcurrentHashMap<>();
private final ConnectionPool cnxPool;
@Getter
private final Timer timer;
Expand Down Expand Up @@ -960,6 +962,23 @@ public CompletableFuture<ClientCnx> getConnection(final String topic) {
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
}

public CompletableFuture<ClientCnx> getConnection(final String topic, final String url) {
TopicName topicName = TopicName.get(topic);
return getLookup(url).getBroker(topicName)
.thenCompose(pair -> getConnection(pair.getLeft(), pair.getRight(), cnxPool.genRandomKeyToSelectCon()));
}

public LookupService getLookup(String serviceUrl) {
return urlLookupMap.computeIfAbsent(serviceUrl, url -> {
try {
return createLookup(serviceUrl);
} catch (PulsarClientException e) {
log.warn("Failed to update url to lookup service {}, {}", url, e.getMessage());
throw new IllegalStateException("Failed to update url " + url);
}
});
}

public CompletableFuture<ClientCnx> getConnectionToServiceUrl() {
if (!(lookup instanceof BinaryProtoLookupService)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidServiceURL(
Expand Down Expand Up @@ -1018,10 +1037,14 @@ public LookupService getLookup() {
}

public void reloadLookUp() throws PulsarClientException {
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
lookup = createLookup(conf.getServiceUrl());
}

public LookupService createLookup(String url) throws PulsarClientException {
if (url.startsWith("http")) {
return new HttpLookupService(conf, eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), conf.isUseTls(),
return new BinaryProtoLookupService(this, url, conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
}
}
Expand Down

0 comments on commit d09642c

Please sign in to comment.