Skip to content

Commit

Permalink
Fix topic getting recreated immediately after deletion (apache#7524)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Jul 14, 2020
1 parent 1de5d38 commit dba0b65
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
Expand Down Expand Up @@ -79,7 +80,7 @@ protected void internalLookupTopicAsync(TopicName topicName, boolean authoritati
}

CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
.getBrokerServiceUrlAsync(topicName, authoritative);
.getBrokerServiceUrlAsync(topicName, LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build());

lookupFuture.thenAccept(optionalResult -> {
if (optionalResult == null || !optionalResult.isPresent()) {
Expand Down Expand Up @@ -251,7 +252,12 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
if (validationFailureResponse != null) {
lookupfuture.complete(validationFailureResponse);
} else {
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, authoritative, advertisedListenerName)
LookupOptions options = LookupOptions.builder()
.authoritative(authoritative)
.advertisedListenerName(advertisedListenerName)
.loadTopicsInBundle(true)
.build();
pulsarService.getNamespaceService().getBrokerServiceUrlAsync(topicName, options)
.thenAccept(lookupResult -> {

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.namespace;

import lombok.Builder;
import lombok.Data;

import org.apache.commons.lang3.StringUtils;

@Data
@Builder
public class LookupOptions {
/**
* If authoritative, it means the lookup had already been redirected here by a different broker
*/
private final boolean authoritative;

/**
* If read-only, do not attempt to acquire ownership
*/
private final boolean readOnly;

/**
* After acquiring the ownership, load all the topics
*/
private final boolean loadTopicsInBundle;

/**
* The lookup request was made through HTTPs
*/
private final boolean requestHttps;

private final String advertisedListenerName;

public boolean hasAdvertisedListenerName() {
return StringUtils.isNotBlank(advertisedListenerName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,9 @@ public void initialize() {
}
}

public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic,
boolean authoritative) {
return getBrokerServiceUrlAsync(topic, authoritative, null);
}

public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, boolean authoritative,
final String advertisedListenerName) {
public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic, LookupOptions options) {
return getBundleAsync(topic)
.thenCompose(bundle -> findBrokerServiceUrl(bundle, authoritative, false /* read-only */, advertisedListenerName));
.thenCompose(bundle -> findBrokerServiceUrl(bundle, options));
}

public CompletableFuture<NamespaceBundle> getBundleAsync(TopicName topic) {
Expand Down Expand Up @@ -210,38 +204,36 @@ private NamespaceBundle getFullBundle(NamespaceName fqnn) throws Exception {
*
* If the service unit is not owned, return an empty optional
*/
public Optional<URL> getWebServiceUrl(ServiceUnitId suName, boolean authoritative, boolean isRequestHttps,
boolean readOnly) throws Exception {
public Optional<URL> getWebServiceUrl(ServiceUnitId suName, LookupOptions options) throws Exception {
if (suName instanceof TopicName) {
TopicName name = (TopicName) suName;
if (LOG.isDebugEnabled()) {
LOG.debug("Getting web service URL of topic: {} - auth: {}", name, authoritative);
LOG.debug("Getting web service URL of topic: {} - options: {}", name, options);
}
return this.internalGetWebServiceUrl(getBundle(name), authoritative, isRequestHttps, readOnly)
return this.internalGetWebServiceUrl(getBundle(name), options)
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}

if (suName instanceof NamespaceName) {
return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), authoritative, isRequestHttps,
readOnly).get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
return this.internalGetWebServiceUrl(getFullBundle((NamespaceName) suName), options)
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}

if (suName instanceof NamespaceBundle) {
return this.internalGetWebServiceUrl((NamespaceBundle) suName, authoritative, isRequestHttps, readOnly)
return this.internalGetWebServiceUrl((NamespaceBundle) suName, options)
.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
}

throw new IllegalArgumentException("Unrecognized class of NamespaceBundle: " + suName.getClass().getName());
}

private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, boolean authoritative,
boolean isRequestHttps, boolean readOnly) {
private CompletableFuture<Optional<URL>> internalGetWebServiceUrl(NamespaceBundle bundle, LookupOptions options) {

return findBrokerServiceUrl(bundle, authoritative, readOnly).thenApply(lookupResult -> {
return findBrokerServiceUrl(bundle, options).thenApply(lookupResult -> {
if (lookupResult.isPresent()) {
try {
LookupData lookupData = lookupResult.get().getLookupData();
final String redirectUrl = isRequestHttps ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
final String redirectUrl = options.isRequestHttps() ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl();
return Optional.of(new URL(redirectUrl));
} catch (Exception e) {
// just log the exception, nothing else to do
Expand Down Expand Up @@ -328,19 +320,6 @@ public boolean registerNamespace(String namespace, boolean ensureOwned) throws P
private final ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> findingBundlesNotAuthoritative
= new ConcurrentOpenHashMap<>();

/**
* Main internal method to lookup and setup ownership of service unit to a broker.
*
* @param bundle
* @param authoritative
* @param readOnly
* @return
*/
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
boolean readOnly) {
return findBrokerServiceUrl(bundle, authoritative, readOnly, null);
}

/**
* Main internal method to lookup and setup ownership of service unit to a broker
*
Expand All @@ -351,14 +330,13 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
* @return
* @throws PulsarServerException
*/
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, boolean authoritative,
boolean readOnly, final String advertisedListenerName) {
private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(NamespaceBundle bundle, LookupOptions options) {
if (LOG.isDebugEnabled()) {
LOG.debug("findBrokerServiceUrl: {} - read-only: {}", bundle, readOnly);
LOG.debug("findBrokerServiceUrl: {} - options: {}", bundle, options);
}

ConcurrentOpenHashMap<NamespaceBundle, CompletableFuture<Optional<LookupResult>>> targetMap;
if (authoritative) {
if (options.isAuthoritative()) {
targetMap = findingBundlesAuthoritative;
} else {
targetMap = findingBundlesNotAuthoritative;
Expand All @@ -372,13 +350,13 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
if (!nsData.isPresent()) {
// No one owns this bundle

if (readOnly) {
if (options.isReadOnly()) {
// Do not attempt to acquire ownership
future.complete(Optional.empty());
} else {
// Now, no one owns the namespace yet. Hence, we will try to dynamically assign it
pulsar.getExecutor().execute(() -> {
searchForCandidateBroker(bundle, future, authoritative, advertisedListenerName);
searchForCandidateBroker(bundle, future, options);
});
}
} else if (nsData.get().isDisabled()) {
Expand All @@ -389,11 +367,11 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
LOG.debug("Namespace bundle {} already owned by {} ", bundle, nsData);
}
// find the target
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = nsData.get().getAdvertisedListeners().get(advertisedListenerName);
if (options.hasAdvertisedListenerName()) {
AdvertisedListener listener = nsData.get().getAdvertisedListeners().get(options.getAdvertisedListenerName());
if (listener == null) {
future.completeExceptionally(
new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
new PulsarServerException("the broker do not have " + options.getAdvertisedListenerName() + " listener"));
} else {
future.complete(Optional.of(new LookupResult(nsData.get(),
listener.getBrokerServiceUrl().toString(), listener.getBrokerServiceUrlTls().toString())));
Expand All @@ -418,13 +396,8 @@ private CompletableFuture<Optional<LookupResult>> findBrokerServiceUrl(Namespace
}

private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative) {
searchForCandidateBroker(bundle, lookupFuture, authoritative, null);
}

private void searchForCandidateBroker(NamespaceBundle bundle,
CompletableFuture<Optional<LookupResult>> lookupFuture, boolean authoritative,
final String advertisedListenerName) {
CompletableFuture<Optional<LookupResult>> lookupFuture,
LookupOptions options) {
String candidateBroker = null;
boolean authoritativeRedirect = pulsar.getLeaderElectionService().isLeader();

Expand All @@ -440,7 +413,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}

if (candidateBroker == null) {
if (authoritative) {
if (options.isAuthoritative()) {
// leader broker already assigned the current broker as owner
candidateBroker = pulsar.getSafeWebServiceAddress();
} else if (!this.loadManager.get().isCentralized()
Expand Down Expand Up @@ -486,14 +459,16 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
} else {
// Found owner for the namespace bundle

// Schedule the task to pre-load topics
pulsar.loadNamespaceTopics(bundle);
if (options.isLoadTopicsInBundle()) {
// Schedule the task to pre-load topics
pulsar.loadNamespaceTopics(bundle);
}
// find the target
if (StringUtils.isNotBlank(advertisedListenerName)) {
AdvertisedListener listener = ownerInfo.getAdvertisedListeners().get(advertisedListenerName);
if (options.hasAdvertisedListenerName()) {
AdvertisedListener listener = ownerInfo.getAdvertisedListeners().get(options.getAdvertisedListenerName());
if (listener == null) {
lookupFuture.completeExceptionally(
new PulsarServerException("the broker do not have " + advertisedListenerName + " listener"));
new PulsarServerException("the broker do not have " + options.getAdvertisedListenerName() + " listener"));
return;
} else {
lookupFuture.complete(Optional.of(new LookupResult(ownerInfo, listener.getBrokerServiceUrl().toString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -488,8 +489,14 @@ protected boolean isBundleOwnedByAnyBroker(NamespaceName fqnn, BundlesData bundl
String bundleRange) {
NamespaceBundle nsBundle = validateNamespaceBundleRange(fqnn, bundles, bundleRange);
NamespaceService nsService = pulsar().getNamespaceService();

LookupOptions options = LookupOptions.builder()
.authoritative(false)
.requestHttps(isRequestHttps())
.readOnly(true)
.loadTopicsInBundle(false).build();
try {
return nsService.getWebServiceUrl(nsBundle, /*authoritative */ false, isRequestHttps(), /* read-only */ true).isPresent();
return nsService.getWebServiceUrl(nsBundle, options).isPresent();
} catch (Exception e) {
log.error("[{}] Failed to check whether namespace bundle is owned {}/{}", clientAppId(), fqnn.toString(), bundleRange, e);
throw new RestException(e);
Expand Down Expand Up @@ -525,7 +532,12 @@ public void validateBundleOwnership(NamespaceBundle bundle, boolean authoritativ
// - If authoritative is false and this broker is not leader, forward to leader
// - If authoritative is false and this broker is leader, determine owner and forward w/ authoritative=true
// - If authoritative is true, own the namespace and continue
Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, authoritative, isRequestHttps(), readOnly);
LookupOptions options = LookupOptions.builder()
.authoritative(authoritative)
.requestHttps(isRequestHttps())
.readOnly(readOnly)
.loadTopicsInBundle(false).build();
Optional<URL> webUrl = nsService.getWebServiceUrl(bundle, options);
// Ensure we get a url
if (webUrl == null || !webUrl.isPresent()) {
log.warn("Unable to get web service url");
Expand Down Expand Up @@ -581,7 +593,12 @@ protected void validateTopicOwnership(TopicName topicName, boolean authoritative

try {
// per function name, this is trying to acquire the whole namespace ownership
Optional<URL> webUrl = nsService.getWebServiceUrl(topicName, authoritative, isRequestHttps(), false);
LookupOptions options = LookupOptions.builder()
.authoritative(authoritative)
.requestHttps(isRequestHttps())
.readOnly(false)
.loadTopicsInBundle(false).build();
Optional<URL> webUrl = nsService.getWebServiceUrl(topicName, options);
// Ensure we get a url
if (webUrl == null || !webUrl.isPresent()) {
log.info("Unable to get web service url");
Expand Down
Loading

0 comments on commit dba0b65

Please sign in to comment.