Skip to content

Commit

Permalink
[transaction-coordinator] Ownership change listeners (apache#5457)
Browse files Browse the repository at this point in the history
### Motivation

This is the first part to implement transaction coordinator startup, this PR added ownership change listeners, using the ownership change listeners can get namespace bundle/topics is owned or unload by this broker.  transaction coordinator can add listeners to create  transaction coordinator instance, also some other component can use it to get the change event of ownership.

### Modifications

Add namespace bundle ownership listener
Add topic ownership listener
  • Loading branch information
codelipenghui authored and sijie committed Oct 29, 2019
1 parent df19f26 commit 025f99a
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.namespace;

import org.apache.pulsar.common.naming.NamespaceBundle;

import java.util.function.Predicate;

/**
* Listener for <code>NamespaceBundle</code> ownership changes
*/
public interface NamespaceBundleOwnershipListener extends Predicate<NamespaceBundle> {

/**
* Will be call after a <code>NamespaceBundle</code> owned by broker
* @param bundle owned bundle
*/
void onLoad(NamespaceBundle bundle);

/**
* Will be call after a <code>NamespaceBundle</code> unloaded from broker
* @param bundle owned bundle
*/
void unLoad(NamespaceBundle bundle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.hash.Hashing;
import io.netty.channel.EventLoopGroup;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -32,7 +33,6 @@
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
Expand All @@ -50,6 +50,7 @@
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.ServiceUnitId;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.NamespaceIsolationPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
Expand All @@ -59,6 +60,7 @@
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.policies.data.loadbalancer.ServiceLookupData;
Expand All @@ -77,9 +79,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
Expand All @@ -91,10 +92,12 @@
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.admin.AdminResource.PARTITIONED_TOPIC_PATH_ZNODE;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import static org.apache.pulsar.common.naming.NamespaceBundleFactory.getBundlesData;
import static org.apache.pulsar.common.util.Codec.decode;

/**
* The <code>NamespaceService</code> provides resource ownership lookup as well as resource ownership claiming services
Expand Down Expand Up @@ -141,6 +144,8 @@ public enum AddressType {

private final ConcurrentOpenHashMap<ClusterData, PulsarClientImpl> namespaceClients;

private final List<NamespaceBundleOwnershipListener> bundleOwnershipListeners;

/**
* Default constructor.
*
Expand All @@ -153,8 +158,9 @@ public NamespaceService(PulsarService pulsar) {
this.loadManager = pulsar.getLoadManager();
ServiceUnitZkUtils.initZK(pulsar.getLocalZkCache().getZooKeeper(), pulsar.getSafeBrokerServiceUrl());
this.bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory);
this.ownershipCache = new OwnershipCache(pulsar, bundleFactory, this);
this.namespaceClients = new ConcurrentOpenHashMap<>();
this.bundleOwnershipListeners = new CopyOnWriteArrayList<>();
}

public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicName topic,
Expand Down Expand Up @@ -858,6 +864,49 @@ public void removeOwnedServiceUnits(NamespaceName nsName, BundlesData bundleData
bundleFactory.invalidateBundleCache(nsName);
}

protected void onNamespaceBundleOwned(NamespaceBundle bundle) {
for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) {
notifyNamespaceBundleOwnershipListener(bundle, bundleOwnedListener);
}
}

protected void onNamespaceBundleUnload(NamespaceBundle bundle) {
for (NamespaceBundleOwnershipListener bundleOwnedListener : bundleOwnershipListeners) {
try {
if (bundleOwnedListener.test(bundle)) {
bundleOwnedListener.unLoad(bundle);
}
} catch (Throwable t) {
LOG.error("Call bundle {} ownership lister error", bundle, t);
}
}
}

public void addNamespaceBundleOwnershipListener(NamespaceBundleOwnershipListener... listeners) {
checkNotNull(listeners);
for (NamespaceBundleOwnershipListener listener : listeners) {
if (listener != null) {
bundleOwnershipListeners.add(listener);
}
}
getOwnedServiceUnits().forEach(bundle -> notifyNamespaceBundleOwnershipListener(bundle, listeners));
}

private void notifyNamespaceBundleOwnershipListener(NamespaceBundle bundle,
NamespaceBundleOwnershipListener... listeners) {
if (listeners != null) {
for (NamespaceBundleOwnershipListener listener : listeners) {
try {
if (listener.test(bundle)) {
listener.onLoad(bundle);
}
} catch (Throwable t) {
LOG.error("Call bundle {} ownership lister error", bundle, t);
}
}
}
}

public NamespaceBundleFactory getNamespaceBundleFactory() {
return bundleFactory;
}
Expand All @@ -874,6 +923,25 @@ public CompletableFuture<List<String>> getFullListOfTopics(NamespaceName namespa
});
}

public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(NamespaceBundle bundle) {
return getFullListOfTopics(bundle.getNamespaceObject()).thenCompose(topics ->
CompletableFuture.completedFuture(
topics.stream()
.filter(topic -> bundle.includes(TopicName.get(topic)))
.collect(Collectors.toList())))
.thenCombine(getAllPartitions(bundle.getNamespaceObject()).thenCompose(topics ->
CompletableFuture.completedFuture(
topics.stream().filter(topic -> bundle.includes(TopicName.get(topic)))
.collect(Collectors.toList()))), (left, right) -> {
for (String topic : right) {
if (!left.contains(topic)) {
left.add(topic);
}
}
return left;
});
}

public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
if (topic.isPersistent()) {
return pulsar.getLocalZkCacheService()
Expand All @@ -897,6 +965,57 @@ public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceNa
}
}

public CompletableFuture<List<String>> getAllPartitions(NamespaceName namespaceName) {
return getPartitions(namespaceName, TopicDomain.persistent)
.thenCombine(getPartitions(namespaceName, TopicDomain.non_persistent),
ListUtils::union);
}


public CompletableFuture<List<String>> getPartitions(NamespaceName namespaceName, TopicDomain topicDomain) {
String path = PulsarWebResource.path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(),
topicDomain.toString());

if (LOG.isDebugEnabled()) {
LOG.debug("Getting children from partitioned-topics now: {}", path);
}

return pulsar.getLocalZkCache().getChildrenAsync(path, null).thenCompose(topics -> {
CompletableFuture<List<String>> result = new CompletableFuture<>();
List<String> resultPartitions = Collections.synchronizedList(Lists.newArrayList());
if (CollectionUtils.isNotEmpty(topics)) {
List<CompletableFuture<List<String>>> futures = Lists.newArrayList();
for (String topic : topics) {
String partitionedTopicName = String.format("%s://%s/%s", topicDomain.value(),
namespaceName.toString(), decode(topic));
CompletableFuture<List<String>> future = getPartitionsForTopic(TopicName.get(partitionedTopicName));
futures.add(future);
future.thenAccept(resultPartitions::addAll);
}
FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
} else {
result.complete(resultPartitions);
}
});
} else {
result.complete(resultPartitions);
}
return result;
});
}

private CompletableFuture<List<String>> getPartitionsForTopic(TopicName topicName) {
return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenCompose(meta -> {
List<String> result = Lists.newArrayList();
for (int i = 0; i < meta.partitions; i++) {
result.add(topicName.getPartition(i).toString());
}
return CompletableFuture.completedFuture(result);
});
}

public CompletableFuture<List<String>> getListOfPersistentTopics(NamespaceName namespaceName) {
// For every topic there will be a managed ledger created.
String path = String.format("/managed-ledgers/%s/persistent", namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public class OwnershipCache {
*/
private final NamespaceBundleFactory bundleFactory;

/**
* The <code>NamespaceService</code> which using <code>OwnershipCache</code>
*/
private NamespaceService namespaceService;

private class OwnedServiceUnitCacheLoader implements AsyncCacheLoader<String, OwnedBundle> {

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -149,7 +154,8 @@ public CompletableFuture<OwnedBundle> asyncLoad(String namespaceBundleZNode, Exe
* @param ownerUrl
* the local broker URL that will be set as owner for the <code>ServiceUnit</code>
*/
public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory) {
public OwnershipCache(PulsarService pulsar, NamespaceBundleFactory bundleFactory, NamespaceService namespaceService) {
this.namespaceService = namespaceService;
this.ownerBrokerUrl = pulsar.getSafeBrokerServiceUrl();
this.ownerBrokerUrlTls = pulsar.getBrokerServiceUrlTls();
this.selfOwnerInfo = new NamespaceEphemeralData(ownerBrokerUrl, ownerBrokerUrlTls,
Expand Down Expand Up @@ -211,6 +217,9 @@ public CompletableFuture<NamespaceEphemeralData> tryAcquiringOwnership(Namespace
// service unit
ownedBundlesCache.get(path).thenAccept(namespaceBundle -> {
LOG.info("Successfully acquired ownership of {}", path);
if (namespaceService != null) {
namespaceService.onNamespaceBundleOwned(bundle);
}
future.complete(selfOwnerInfo);
}).exceptionally(exception -> {
// Failed to acquire ownership
Expand Down Expand Up @@ -260,6 +269,9 @@ public CompletableFuture<Void> removeOwnership(NamespaceBundle bundle) {
LOG.info("[{}] Removed zk lock for service unit: {}", key, KeeperException.Code.get(rc));
ownedBundlesCache.synchronous().invalidate(key);
ownershipReadOnlyCache.invalidate(key);
if (namespaceService != null) {
namespaceService.onNamespaceBundleUnload(bundle);
}
result.complete(null);
} else {
LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key,
Expand Down
Loading

0 comments on commit 025f99a

Please sign in to comment.