Skip to content

Commit

Permalink
Issue apache#1117: handle race in concurrent bundle split
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Asher authored May 2, 2018
1 parent 5d14788 commit dd5c425
Show file tree
Hide file tree
Showing 8 changed files with 306 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;

import com.google.common.collect.Maps;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

Expand All @@ -38,6 +40,7 @@
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -79,20 +82,28 @@ public LocalPolicies deserialize(String path, byte[] content) throws Exception {

@Override
public CompletableFuture<Optional<LocalPolicies>> getAsync(String path) {
CompletableFuture<Optional<LocalPolicies>> future = new CompletableFuture<>();
return getWithStatAsync(path).thenApply(entry -> entry.map(e -> e.getKey()));
}

@Override
public CompletableFuture<Optional<Entry<LocalPolicies, Stat>>> getWithStatAsync(String path) {
CompletableFuture<Optional<Entry<LocalPolicies, Stat>>> future = new CompletableFuture<>();

// First check in local-zk cache
super.getAsync(path).thenAccept(localPolicies -> {
super.getWithStatAsync(path).thenAccept(result -> {
Optional<LocalPolicies> localPolicies = result.map(Entry::getKey);
if (localPolicies.isPresent()) {
future.complete(localPolicies);
future.complete(result);
} else {
// create new policies node under Local ZK by coping it from Global ZK
createPolicies(path, true).thenAccept(p -> {
LOG.info("Successfully created local policies for {} -- {}", path, p);
// local-policies have been created but it's not part of policiesCache. so, call
// super.getAsync() which will load it and set the watch on local-policies path
super.getAsync(path);
future.complete(p);
super.getWithStatAsync(path);
Stat stat = new Stat();
stat.setVersion(-1);
future.complete(Optional.of(Maps.immutableEntry(p.orElse(null), stat)));
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
Expand All @@ -37,12 +36,12 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;


import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
Expand All @@ -52,6 +51,7 @@
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.lookup.data.LookupData;
Expand All @@ -72,6 +72,7 @@
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -111,6 +112,8 @@ public enum AddressType {

private final String host;

private static final int BUNDLE_SPLIT_RETRY_LIMIT = 7;

public static final String SLA_NAMESPACE_PROPERTY = "sla-monitor";
public static final Pattern HEARTBEAT_NAMESPACE_PATTERN = Pattern.compile("pulsar/[^/]+/([^:]+:\\d+)");
public static final Pattern SLA_NAMESPACE_PATTERN = Pattern.compile(SLA_NAMESPACE_PROPERTY + "/[^/]+/([^:]+:\\d+)");
Expand Down Expand Up @@ -546,87 +549,141 @@ public boolean isNamespaceBundleDisabled(NamespaceBundle bundle) throws Exceptio

/**
* 1. split the given bundle into two bundles 2. assign ownership of both the bundles to current broker 3. update
* policies with newly created bundles into LocalZK 4. disable original bundle and refresh the cache
* policies with newly created bundles into LocalZK 4. disable original bundle and refresh the cache.
*
* It will call splitAndOwnBundleOnceAndRetry to do the real retry work, which will retry "retryTimes".
*
* @param bundle
* @return
* @throws Exception
*/
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, final boolean unload) throws Exception {
public CompletableFuture<Void> splitAndOwnBundle(NamespaceBundle bundle, boolean unload)
throws Exception {

final CompletableFuture<Void> unloadFuture = new CompletableFuture<>();
final AtomicInteger counter = new AtomicInteger(BUNDLE_SPLIT_RETRY_LIMIT);

splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture);

return unloadFuture;
}

void splitAndOwnBundleOnceAndRetry(NamespaceBundle bundle,
boolean unload,
AtomicInteger counter,
CompletableFuture<Void> unloadFuture) {
CompletableFuture<NamespaceBundles> updateFuture = new CompletableFuture<>();

final Pair<NamespaceBundles, List<NamespaceBundle>> splittedBundles = bundleFactory.splitBundles(bundle,
2 /* by default split into 2 */);
2 /* by default split into 2 */);

// Split and updateNamespaceBundles. Update may fail because of concurrent write to Zookeeper.
if (splittedBundles != null) {
checkNotNull(splittedBundles.getLeft());
checkNotNull(splittedBundles.getRight());
checkArgument(splittedBundles.getRight().size() == 2, "bundle has to be split in two bundles");
NamespaceName nsname = bundle.getNamespaceObject();
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] splitAndOwnBundleOnce: {}, counter: {}, 2 bundles: {}, {}",
nsname.toString(), bundle.getBundleRange(), counter.get(),
splittedBundles != null ? splittedBundles.getRight().get(0).getBundleRange() : "null splittedBundles",
splittedBundles != null ? splittedBundles.getRight().get(1).getBundleRange() : "null splittedBundles");
}
try {
// take ownership of newly split bundles
for (NamespaceBundle sBundle : splittedBundles.getRight()) {
checkNotNull(ownershipCache.tryAcquiringOwnership(sBundle));
}
updateNamespaceBundles(nsname, splittedBundles.getLeft(),
(rc, path, zkCtx, stat) -> pulsar.getOrderedExecutor().submit(safeRun(() -> {
if (rc == KeeperException.Code.OK.intValue()) {
try {
// disable old bundle in memory
getOwnershipCache().updateBundleState(bundle, false);
// invalidate cache as zookeeper has new split
// namespace bundle
bundleFactory.invalidateBundleCache(nsname);
// update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
loadManager.get().setLoadReportForceUpdateFlag();
unloadFuture.complete(null);
} catch (Exception e) {
String msg1 = format(
"failed to disable bundle %s under namespace [%s] with error %s",
nsname.toString(), bundle.toString(), e.getMessage());
LOG.warn(msg1, e);
unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1));
}
} else {
String msg2 = format("failed to update namespace [%s] policies due to %s",
nsname.toString(),
KeeperException.create(KeeperException.Code.get(rc)).getMessage());
LOG.warn(msg2);
unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2));
}
})));
(rc, path, zkCtx, stat) -> {
if (rc == Code.OK.intValue()) {
// invalidate cache as zookeeper has new split
// namespace bundle
bundleFactory.invalidateBundleCache(bundle.getNamespaceObject());

updateFuture.complete(splittedBundles.getLeft());
} else if (rc == Code.BADVERSION.intValue()) {
KeeperException keeperException = KeeperException.create(KeeperException.Code.get(rc));
String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s " +
"due to %s, counter: %d",
nsname.toString(), bundle.getBundleRange(),
keeperException.getMessage(), counter.get());
LOG.warn(msg);
updateFuture.completeExceptionally(new ServerMetadataException(keeperException));
} else {
String msg = format("failed to update namespace policies [%s], NamespaceBundle: %s due to %s",
nsname.toString(), bundle.getBundleRange(),
KeeperException.create(KeeperException.Code.get(rc)).getMessage());
LOG.warn(msg);
updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
}
});
} catch (Exception e) {
String msg = format("failed to aquire ownership of split bundle for namespace [%s], %s",
nsname.toString(), e.getMessage());
String msg = format("failed to acquire ownership of split bundle for namespace [%s], %s",
nsname.toString(), e.getMessage());
LOG.warn(msg, e);
unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
}

} else {
String msg = format("bundle %s not found under namespace", bundle.toString());
unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
LOG.warn(msg);
updateFuture.completeExceptionally(new ServiceUnitNotReadyException(msg));
}

return unloadFuture.thenApply(res -> {
if (!unload) {
return null;
// If success updateNamespaceBundles, then do invalidateBundleCache and unload.
// Else retry splitAndOwnBundleOnceAndRetry.
updateFuture.whenCompleteAsync((r, t)-> {
if (t != null) {
// retry several times on BadVersion
if ((t instanceof ServerMetadataException) && (counter.decrementAndGet() >= 0)) {
pulsar.getOrderedExecutor().submit(
() -> splitAndOwnBundleOnceAndRetry(bundle, unload, counter, unloadFuture));
} else {
// Retry enough, or meet other exception
String msg2 = format(" %s not success update nsBundles, counter %d, reason %s",
bundle.toString(), counter.get(), t.getMessage());
LOG.warn(msg2);
unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg2));
}
return;
}
// unload new split bundles
splittedBundles.getRight().forEach(splitBundle -> {
try {
unloadNamespaceBundle(splitBundle);
} catch (Exception e) {
LOG.warn("Failed to unload split bundle {}", splitBundle, e);
throw new RuntimeException("Failed to unload split bundle " + splitBundle, e);

// success updateNamespaceBundles
try {
// disable old bundle in memory
getOwnershipCache().updateBundleState(bundle, false);

// update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
loadManager.get().setLoadReportForceUpdateFlag();

if (unload) {
// unload new split bundles
r.getBundles().forEach(splitBundle -> {
try {
unloadNamespaceBundle(splitBundle);
} catch (Exception e) {
LOG.warn("Failed to unload split bundle {}", splitBundle, e);
throw new RuntimeException("Failed to unload split bundle " + splitBundle, e);
}
});
}
});
return null;
});

unloadFuture.complete(null);
} catch (Exception e) {
String msg1 = format(
"failed to disable bundle %s under namespace [%s] with error %s",
bundle.getNamespaceObject().toString(), bundle.toString(), e.getMessage());
LOG.warn(msg1, e);
unloadFuture.completeExceptionally(new ServiceUnitNotReadyException(msg1));
}
return;
}, pulsar.getOrderedExecutor());
}

/**
* update new bundle-range to LocalZk (create a new node if not present)
* Update new bundle-range to LocalZk (create a new node if not present).
* Update may fail because of concurrent write to Zookeeper.
*
* @param nsname
* @param nsBundles
Expand All @@ -643,12 +700,16 @@ private void updateNamespaceBundles(NamespaceName nsname, NamespaceBundles nsBun
if (!policies.isPresent()) {
// if policies is not present into localZk then create new policies
this.pulsar.getLocalZkCacheService().createPolicies(path, false).get(cacheTimeOutInSec, SECONDS);
policies = this.pulsar.getLocalZkCacheService().policiesCache().get(path);
}

policies.get().bundles = getBundlesData(nsBundles);
this.pulsar.getLocalZkCache().getZooKeeper().setData(path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies.get()), -1, callback, null);
long version = nsBundles.getVersion();
LocalPolicies local = new LocalPolicies();
local.bundles = getBundlesData(nsBundles);
byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(local);

this.pulsar.getLocalZkCache().getZooKeeper()
.setData(path, data, Math.toIntExact(version), callback, null);

// invalidate namespace's local-policies
this.pulsar.getLocalZkCacheService().policiesCache().invalidate(path);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1010,7 +1011,17 @@ public AuthenticationService getAuthenticationService() {
}

public List<Topic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) {
return multiLayerTopicsMap.get(namespace).get(bundle).values();
ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> map1 = multiLayerTopicsMap.get(namespace);
if (map1 == null) {
return Collections.emptyList();
}

ConcurrentOpenHashMap<String, Topic> map2 = map1.get(bundle);
if (map2 == null) {
return Collections.emptyList();
}

return map2.values();
}

public ZooKeeperDataCache<Map<String, String>> getDynamicConfigurationCache() {
Expand Down
Loading

0 comments on commit dd5c425

Please sign in to comment.