Skip to content

Commit

Permalink
Converted and duplicated AdminApiTest to v2 topics (apache#1574)
Browse files Browse the repository at this point in the history
* Converted and duplicated AdminApiTest to v2 topics

* Fixed CreateSubscriptionTest
  • Loading branch information
merlimat authored Apr 14, 2018
1 parent aa9e306 commit 2267ed4
Show file tree
Hide file tree
Showing 41 changed files with 4,770 additions and 521 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,7 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName,
}

ZooKeeper globalZk = configCache.getZooKeeper();
final String property = namespaceName.getProperty();
final String cluster = namespaceName.getCluster();
final String namespace = namespaceName.getLocalName();
final String policiesPath = String.format("/%s/%s/%s/%s/%s", "admin", POLICIES, property, cluster, namespace);
final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespaceName.toString());

try {
Stat nodeStat = new Stat();
Expand All @@ -225,22 +222,20 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName,

configCache.policiesCache().invalidate(policiesPath);

log.info("[{}] Successfully granted access for role {}: {} - namespace {}/{}/{}", role, role, actions,
property, cluster, namespace);
log.info("[{}] Successfully granted access for role {}: {} - namespace {}", role, role, actions,
namespaceName);
result.complete(null);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: does not exist", role, property, cluster,
namespace);
result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespace));
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", role, namespaceName);
result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespaceName));
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to set permissions for namespace {}/{}/{}: concurrent modification", role, property,
cluster, namespace);
log.warn("[{}] Failed to set permissions for namespace {}: concurrent modification", role, namespaceName);
result.completeExceptionally(new IllegalStateException(
"Concurrent modification on zk path: " + policiesPath + ", " + e.getMessage()));
} catch (Exception e) {
log.error("[{}] Failed to get permissions for namespace {}/{}/{}", role, property, cluster, namespace, e);
log.error("[{}] Failed to get permissions for namespace {}", role, namespaceName, e);
result.completeExceptionally(
new IllegalStateException("Failed to get permissions for namespace " + namespace));
new IllegalStateException("Failed to get permissions for namespace " + namespaceName));
}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ public void close() throws PulsarServerException {

configurationCacheService = null;
localZkCacheService = null;
localZkCache = null;
if (localZkCache != null) {
localZkCache.stop();
localZkCache = null;
}

if (adminClient != null) {
adminClient.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,37 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
Expand All @@ -50,11 +67,11 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
Expand All @@ -71,10 +88,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;

public abstract class NamespacesBase extends AdminResource {

private static final long MAX_BUNDLES = ((long) 1) << 32;
Expand Down Expand Up @@ -1090,6 +1103,115 @@ protected void internalModifyEncryptionRequired(boolean encryptionRequired) {
}
}


protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
validateAdminAccessOnProperty(namespaceName.getProperty());
validatePoliciesReadOnlyAccess();

log.info("[{}] Setting anti-affinity group {} for {}", clientAppId(), antiAffinityGroup, namespaceName);

if (isBlank(antiAffinityGroup)) {
throw new RestException(Status.PRECONDITION_FAILED, "antiAffinityGroup can't be empty");
}

Map.Entry<Policies, Stat> policiesNode = null;

try {
// Force to read the data s.t. the watch to the cache content is setup.
policiesNode = policiesCache().getWithStat(path(POLICIES, namespaceName.toString())).orElseThrow(
() -> new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
policiesNode.getKey().antiAffinityGroup = antiAffinityGroup;

// Write back the new policies into zookeeper
globalZk().setData(path(POLICIES, namespaceName.toString()),
jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));

log.info("[{}] Successfully updated the antiAffinityGroup {} on namespace {}", clientAppId(),
antiAffinityGroup, namespaceName);
} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update the antiAffinityGroup for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
"[{}] Failed to update the antiAffinityGroup on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());

throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to update the antiAffinityGroup on namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}

protected String internalGetNamespaceAntiAffinityGroup() {
validateAdminAccessOnProperty(namespaceName.getProperty());
return getNamespacePolicies(namespaceName).antiAffinityGroup;
}

protected void internalRemoveNamespaceAntiAffinityGroup() {
validateAdminAccessOnProperty(namespaceName.getProperty());
validatePoliciesReadOnlyAccess();

log.info("[{}] Deleting anti-affinity group for {}", clientAppId(), namespaceName);

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
policies.antiAffinityGroup = null;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully removed anti-affinity group for a namespace={}", clientAppId(), namespaceName);

} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to remove anti-affinity group for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to remove anti-affinity group for namespace {}: concurrent modification",
clientAppId(), namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
log.error("[{}] Failed to remove anti-affinity group for namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}

protected List<String> internalGetAntiAffinityNamespaces(String cluster, String antiAffinityGroup,
String property) {
validateAdminAccessOnProperty(property);

log.info("[{}]-{} Finding namespaces for {} in {}", clientAppId(), property, antiAffinityGroup, cluster);

if (isBlank(antiAffinityGroup)) {
throw new RestException(Status.PRECONDITION_FAILED, "anti-affinity group can't be empty.");
}
validateClusterExists(cluster);

try {
List<String> namespaces = getListOfNamespaces(property);

return namespaces.stream().filter(ns -> {
Optional<Policies> policies;
try {
policies = policiesCache().get(AdminResource.path(POLICIES, ns.toString()));
} catch (Exception e) {
throw new RuntimeException(e);
}

String storedAntiAffinityGroup = policies.orElse(new Policies()).antiAffinityGroup;
return antiAffinityGroup.equalsIgnoreCase(storedAntiAffinityGroup);
}).collect(Collectors.toList());

} catch (Exception e) {
log.warn("Failed to list of properties/namespace from global-zk", e);
throw new RestException(e);
}
}

private void validatePersistencePolicies(PersistencePolicies persistence) {
try {
checkNotNull(persistence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ protected void internalUpdatePartitionedTopic(int numPartitions) {
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e.getCause());
throw new RestException(e.getCause());
log.error("[{}] Failed to update partitioned topic {}", clientAppId(), topicName, e);
throw new RestException(e);
}
}

Expand Down Expand Up @@ -1213,8 +1213,7 @@ private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int
* : number partitions for the topics
*/
private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getProperty(), topicName.getCluster(),
topicName.getNamespacePortion(), domain(), topicName.getEncodedLocalName());
String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getPersistenceNamingEncoding());
CompletableFuture<Void> result = new CompletableFuture<>();
fetchPartitionedTopicMetadataAsync(pulsar(), path).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions <= 1) {
Expand Down
Loading

0 comments on commit 2267ed4

Please sign in to comment.