Skip to content

Commit

Permalink
[fix][broker] fix MetadataStoreException$NotFoundException while doin…
Browse files Browse the repository at this point in the history
…g topic lookup (apache#15633)
  • Loading branch information
codelipenghui authored May 19, 2022
1 parent e97d846 commit 70551a6
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@
ResourceUnit represents any machine/unit which has resources that broker can use to serve its service units
*/
public interface ResourceUnit extends Comparable<ResourceUnit> {

String PROPERTY_KEY_BROKER_ZNODE_NAME = "__advertised_addr";

String getResourceId();

ResourceDescription getAvailableResource();

boolean canFit(ResourceDescription resourceDescription);

Object getProperty(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.loadbalance.impl;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -65,8 +66,12 @@ public LoadManagerReport generateLoadReport() {
@Override
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
return leastLoadedBroker.map(s -> new SimpleResourceUnit(getBrokerWebServiceUrl(s),
new PulsarResourceDescription()));
return leastLoadedBroker.map(s -> {
String webServiceUrl = getBrokerWebServiceUrl(s);
String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
return new SimpleResourceUnit(webServiceUrl,
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
});
}

private String getBrokerWebServiceUrl(String broker) {
Expand All @@ -78,6 +83,11 @@ private String getBrokerWebServiceUrl(String broker) {
return String.format("http://%s", broker);
}

private String getBrokerZnodeName(String broker, String webServiceUrl) {
String scheme = webServiceUrl.substring(0, webServiceUrl.indexOf("://"));
return String.format("%s://%s", scheme, broker);
}

@Override
public List<Metrics> getLoadBalancingMetrics() {
return loadManager.getLoadBalancingMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,32 @@
package org.apache.pulsar.broker.loadbalance.impl;

import com.google.common.base.MoreObjects;
import java.util.Collections;
import java.util.Map;
import org.apache.pulsar.broker.loadbalance.ResourceDescription;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;

public class SimpleResourceUnit implements ResourceUnit {

private String resourceId;
private ResourceDescription resourceDescription;
private final String resourceId;
private final ResourceDescription resourceDescription;

private final Map<String, Object> properties;

public SimpleResourceUnit(String resourceId, ResourceDescription resourceDescription) {
this.resourceId = resourceId;
this.resourceDescription = resourceDescription;
this.properties = Collections.emptyMap();
}

public SimpleResourceUnit(String resourceId, ResourceDescription resourceDescription,
Map<String, Object> properties) {
this.resourceId = resourceId;
this.resourceDescription = resourceDescription;
this.properties = properties == null ? Collections.emptyMap() : properties;
}


@Override
public String getResourceId() {
// TODO Auto-generated method stub
Expand All @@ -50,6 +63,11 @@ public boolean canFit(ResourceDescription resourceDescription) {
return this.resourceDescription.compareTo(resourceDescription) > 0;
}

@Override
public Object getProperty(String key) {
return properties.get(key);
}

@Override
public int compareTo(ResourceUnit o) {
return resourceId.compareTo(o.getResourceId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -457,6 +458,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
return;
}
String candidateBroker = null;
String candidateBrokerAdvertisedAddr = null;

LeaderElectionService les = pulsar.getLeaderElectionService();
if (les == null) {
Expand Down Expand Up @@ -517,15 +519,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}
}
if (makeLoadManagerDecisionOnThisBroker) {
Optional<String> availableBroker = getLeastLoadedFromLoadManager(bundle);
Optional<Pair<String, String>> availableBroker = getLeastLoadedFromLoadManager(bundle);
if (!availableBroker.isPresent()) {
LOG.warn("Load manager didn't return any available broker. "
+ "Returning empty result to lookup. NamespaceBundle[{}]",
bundle);
lookupFuture.complete(Optional.empty());
return;
}
candidateBroker = availableBroker.get();
candidateBroker = availableBroker.get().getLeft();
candidateBrokerAdvertisedAddr = availableBroker.get().getRight();
authoritativeRedirect = true;
} else {
// forward to leader broker to make assignment
Expand Down Expand Up @@ -596,7 +599,8 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}

// Now setting the redirect url
createLookupResult(candidateBroker, authoritativeRedirect, options.getAdvertisedListenerName())
createLookupResult(candidateBrokerAdvertisedAddr == null ? candidateBroker
: candidateBrokerAdvertisedAddr, authoritativeRedirect, options.getAdvertisedListenerName())
.thenAccept(lookupResult -> lookupFuture.complete(Optional.of(lookupResult)))
.exceptionally(ex -> {
lookupFuture.completeExceptionally(ex);
Expand Down Expand Up @@ -691,20 +695,22 @@ private Set<String> getAvailableBrokers() {
* @return
* @throws Exception
*/
private Optional<String> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnitId serviceUnit) throws Exception {
Optional<ResourceUnit> leastLoadedBroker = loadManager.get().getLeastLoaded(serviceUnit);
if (!leastLoadedBroker.isPresent()) {
LOG.warn("No broker is available for {}", serviceUnit);
return Optional.empty();
}

String lookupAddress = leastLoadedBroker.get().getResourceId();
String advertisedAddr = (String) leastLoadedBroker.get()
.getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
pulsar.getSafeWebServiceAddress(),
lookupAddress);
}
return Optional.of(lookupAddress);
return Optional.of(Pair.of(lookupAddress, advertisedAddr));
}

public CompletableFuture<Void> unloadNamespaceBundle(NamespaceBundle bundle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.loadbalance;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;

import java.net.URI;
import java.util.Optional;
import lombok.Cleanup;
Expand Down Expand Up @@ -69,14 +71,14 @@ private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
int httpsPort = PortManager.nextFreePort();

// Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
this.conf.setAdvertisedAddress(advertisedAddress);
this.conf.setAdvertisedListeners(
conf.setAdvertisedAddress(advertisedAddress);
conf.setAdvertisedListeners(
"public:pulsar://localhost:" + pulsarPort +
",public_http:http://localhost:" + httpPort +
",public_https:https://localhost:" + httpsPort);
this.conf.setBrokerServicePort(Optional.of(pulsarPort));
this.conf.setWebServicePort(Optional.of(httpPort));
this.conf.setWebServicePortTls(Optional.of(httpsPort));
conf.setBrokerServicePort(Optional.of(pulsarPort));
conf.setWebServicePort(Optional.of(httpPort));
conf.setWebServicePortTls(Optional.of(httpsPort));
}

@Test
Expand All @@ -85,6 +87,7 @@ public void testLookup() throws Exception {
new HttpGet(pulsar.getWebServiceAddress() + "/lookup/v2/topic/persistent/public/default/my-topic");
request.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
request.addHeader(HttpHeaders.ACCEPT, "application/json");
final String topic = "my-topic";

@Cleanup
CloseableHttpClient httpClient = HttpClients.createDefault();
Expand All @@ -104,14 +107,15 @@ public void testLookup() throws Exception {
// Produce data
@Cleanup
Producer<String> p = pulsarClient.newProducer(Schema.STRING)
.topic("my-topic")
.topic(topic)
.create();

p.send("hello");

// Verify we can get the correct HTTP redirect to the advertised listener
for (PulsarAdmin a : getAllAdmins()) {
TopicStats s = a.topics().getStats("my-topic");
TopicStats s = a.topics().getStats(topic);
assertNotNull(a.lookups().lookupTopic(topic));
assertEquals(s.getPublishers().size(), 1);
}
}
Expand Down

0 comments on commit 70551a6

Please sign in to comment.