Skip to content

Commit

Permalink
Fix broker lookup return wrong url when specify advertised listener (a…
Browse files Browse the repository at this point in the history
…pache#7737)

get advertised listener name from lookup options

add test case

Co-authored-by: wangjialing <[email protected]>
  • Loading branch information
wangjialing218 and wangjialing authored Aug 5, 2020
1 parent 9f687d3 commit 109c8bf
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -497,7 +498,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}

// Now setting the redirect url
createLookupResult(candidateBroker, authoritativeRedirect)
createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName())
.thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
Expand All @@ -511,7 +512,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}
}

protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect)
protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect, final String advertisedListenerName)
throws Exception {

CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
Expand All @@ -522,10 +523,23 @@ protected CompletableFuture<LookupResult> createLookupResult(String candidateBro
uri.getPort());
pulsar.getLocalZkCache().getDataAsync(path, pulsar.getLoadManager().get().getLoadReportDeserializer()).thenAccept(reportData -> {
if (reportData.isPresent()) {
ServiceLookupData lookupData = reportData.get();
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
lookupData.getWebServiceUrlTls(), lookupData.getPulsarServiceUrl(),
lookupData.getPulsarServiceUrlTls(), authoritativeRedirect));
LocalBrokerData lookupData = (LocalBrokerData) reportData.get();
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = lookupData.getAdvertisedListeners().get(advertisedListenerName);
if (listener == null) {
lookupFuture.completeExceptionally(
new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
} else {
URI urlTls = listener.getBrokerServiceUrlTls();
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
lookupData.getWebServiceUrlTls(), listener.getBrokerServiceUrl().toString(),
urlTls == null ? null : urlTls.toString(), authoritativeRedirect));
}
} else {
lookupFuture.complete(new LookupResult(lookupData.getWebServiceUrl(),
lookupData.getWebServiceUrlTls(), lookupData.getPulsarServiceUrl(),
lookupData.getPulsarServiceUrlTls(), authoritativeRedirect));
}
} else {
lookupFuture.completeExceptionally(new KeeperException.NoNodeException(path));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@

import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URI;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -68,6 +70,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -355,18 +358,43 @@ public void testLoadReportDeserialize() throws Exception {
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path2,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1, false).get();
LookupResult result1 = pulsar.getNamespaceService().createLookupResult(candidateBroker1, false, null).get();

// update to new load manager
LoadManager oldLoadManager = pulsar.getLoadManager()
.getAndSet(new ModularLoadManagerWrapper(new ModularLoadManagerImpl()));
oldLoadManager.stop();
LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2, false).get();
LookupResult result2 = pulsar.getNamespaceService().createLookupResult(candidateBroker2, false, null).get();
Assert.assertEquals(result1.getLookupData().getBrokerUrl(), candidateBroker1);
Assert.assertEquals(result2.getLookupData().getBrokerUrl(), candidateBroker2);
System.out.println(result2);
}

@Test
public void testCreateLookupResult() throws Exception {

final String candidateBroker = "pulsar://localhost:6650";
final String listenerUrl = "pulsar://localhost:7000";
final String listenerUrlTls = "pulsar://localhost:8000";
final String listener = "listenerName";
Map<String, AdvertisedListener> advertisedListeners = Maps.newHashMap();
advertisedListeners.put(listener, AdvertisedListener.builder().brokerServiceUrl(new URI(listenerUrl)).brokerServiceUrlTls(new URI(listenerUrlTls)).build());
LocalBrokerData ld = new LocalBrokerData(null, null, candidateBroker, null, advertisedListeners);
URI uri = new URI(candidateBroker);
String path = String.format("%s/%s:%s", LoadManager.LOADBALANCE_BROKERS_ROOT, uri.getHost(), uri.getPort());
ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);

LookupResult noListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, null).get();
LookupResult withListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, listener).get();

Assert.assertEquals(noListener.getLookupData().getBrokerUrl(), candidateBroker);
Assert.assertEquals(withListener.getLookupData().getBrokerUrl(), listenerUrl);
Assert.assertEquals(withListener.getLookupData().getBrokerUrlTls(), listenerUrlTls);
System.out.println(withListener);
}

@Test
public void testCreateNamespaceWithDefaultNumberOfBundles() throws Exception {
OwnershipCache MockOwnershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
Expand Down

0 comments on commit 109c8bf

Please sign in to comment.