Skip to content

Commit

Permalink
[pulsar-broker] Authorization service uses metadata-store api (apache…
Browse files Browse the repository at this point in the history
…#9586)

fix discovery test

fix test

add timeout
  • Loading branch information
rdhabalia authored Feb 20, 2021
1 parent ec59e93 commit 1fab5aa
Show file tree
Hide file tree
Showing 59 changed files with 531 additions and 344 deletions.
6 changes: 6 additions & 0 deletions pulsar-broker-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-metadata</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,18 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.common.util.ObjectMapperFactory.getThreadLocal;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import javax.ws.rs.core.Response;
import com.google.common.base.Joiner;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
Expand All @@ -46,16 +44,11 @@
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.Stat;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.Response;

/**
* Default authorization provider that stores authorization policies under local-zookeeper.
*
Expand All @@ -64,8 +57,9 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
private static final Logger log = LoggerFactory.getLogger(PulsarAuthorizationProvider.class);

public ServiceConfiguration conf;
public ConfigurationCacheService configCache;
private PulsarResources pulsarResources;
private static final String POLICY_ROOT = "/admin/policies/";
public static final String POLICIES = "policies";
private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";

public PulsarAuthorizationProvider() {
Expand All @@ -81,7 +75,7 @@ public void initialize(ServiceConfiguration conf, ConfigurationCacheService conf
checkNotNull(conf, "ServiceConfiguration can't be null");
checkNotNull(configCache, "ConfigurationCacheService can't be null");
this.conf = conf;
this.configCache = configCache;
this.pulsarResources = configCache.getPulsarResources();

}

Expand Down Expand Up @@ -115,7 +109,7 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
AuthenticationDataSource authenticationData, String subscription) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
configCache.policiesCache().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> {
pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for topic : {}", topicName);
Expand Down Expand Up @@ -153,6 +147,11 @@ public CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String ro
// check namespace and topic level consume-permissions
checkAuthorization(topicName, role, AuthAction.consume).thenAccept(isAuthorized -> {
permissionFuture.complete(isAuthorized);
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
Expand Down Expand Up @@ -237,7 +236,7 @@ private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName
AuthAction authAction) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
configCache.policiesCache().getAsync(POLICY_ROOT + namespaceName.toString()).thenAccept(policies -> {
pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + namespaceName.toString()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
Expand Down Expand Up @@ -292,27 +291,19 @@ public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespaceName,
result.completeExceptionally(e);
}

ZooKeeper globalZk = configCache.getZooKeeper();
final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespaceName.toString());

try {
Stat nodeStat = new Stat();
byte[] content = globalZk.getData(policiesPath, null, nodeStat);
Policies policies = getThreadLocal().readValue(content, Policies.class);
policies.auth_policies.namespace_auth.put(role, actions);

// Write back the new policies into zookeeper
globalZk.setData(policiesPath, getThreadLocal().writeValueAsBytes(policies), nodeStat.getVersion());

configCache.policiesCache().invalidate(policiesPath);

pulsarResources.getNamespaceResources().set(policiesPath, (policies)->{
policies.auth_policies.namespace_auth.put(role, actions);
return policies;
});
log.info("[{}] Successfully granted access for role {}: {} - namespace {}", role, role, actions,
namespaceName);
result.complete(null);
} catch (KeeperException.NoNodeException e) {
} catch (NotFoundException e) {
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", role, namespaceName);
result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespaceName));
} catch (KeeperException.BadVersionException e) {
} catch (BadVersionException e) {
log.warn("[{}] Failed to set permissions for namespace {}: concurrent modification", role, namespaceName);
result.completeExceptionally(new IllegalStateException(
"Concurrent modification on zk path: " + policiesPath + ", " + e.getMessage()));
Expand Down Expand Up @@ -347,13 +338,11 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
result.completeExceptionally(e);
}

ZooKeeper globalZk = configCache.getZooKeeper();
final String policiesPath = String.format("/%s/%s/%s", "admin", POLICIES, namespace.toString());

try {
Stat nodeStat = new Stat();
byte[] content = globalZk.getData(policiesPath, null, nodeStat);
Policies policies = getThreadLocal().readValue(content, Policies.class);
Policies policies = pulsarResources.getNamespaceResources().get(policiesPath)
.orElseThrow(() -> new NotFoundException(policiesPath + " not found"));
if (remove) {
if (policies.auth_policies.subscription_auth_roles.get(subscriptionName) != null) {
policies.auth_policies.subscription_auth_roles.get(subscriptionName).removeAll(roles);
Expand All @@ -365,18 +354,14 @@ private CompletableFuture<Void> updateSubscriptionPermissionAsync(NamespaceName
} else {
policies.auth_policies.subscription_auth_roles.put(subscriptionName, roles);
}

// Write back the new policies into zookeeper
globalZk.setData(policiesPath, getThreadLocal().writeValueAsBytes(policies), nodeStat.getVersion());

configCache.policiesCache().invalidate(policiesPath);
pulsarResources.getNamespaceResources().set(policiesPath, (data)->policies);

log.info("[{}] Successfully granted access for role {} for sub = {}", namespace, subscriptionName, roles);
result.complete(null);
} catch (KeeperException.NoNodeException e) {
} catch (NotFoundException e) {
log.warn("[{}] Failed to set permissions for namespace {}: does not exist", subscriptionName, namespace);
result.completeExceptionally(new IllegalArgumentException("Namespace does not exist" + namespace));
} catch (KeeperException.BadVersionException e) {
} catch (BadVersionException e) {
log.warn("[{}] Failed to set permissions for {} on namespace {}: concurrent modification", subscriptionName, roles, namespace);
result.completeExceptionally(new IllegalStateException(
"Concurrent modification on zk path: " + policiesPath + ", " + e.getMessage()));
Expand Down Expand Up @@ -409,7 +394,7 @@ private boolean checkCluster(TopicName topicName) {
public CompletableFuture<Boolean> checkPermission(TopicName topicName, String role, AuthAction action) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
configCache.policiesCache().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> {
pulsarResources.getNamespaceResources().getAsync(POLICY_ROOT + topicName.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for topic : {}", topicName);
Expand All @@ -425,7 +410,7 @@ public CompletableFuture<Boolean> checkPermission(TopicName topicName, String ro

Map<String, Set<AuthAction>> topicRoles = policies.get().auth_policies.destination_auth
.get(topicName.toString());
if (topicRoles != null) {
if (topicRoles != null && role != null) {
// Topic has custom policy
Set<AuthAction> topicActions = topicRoles.get(role);
if (topicActions != null && topicActions.contains(action)) {
Expand Down Expand Up @@ -497,10 +482,9 @@ public void close() throws IOException {

private void validatePoliciesReadOnlyAccess() {
boolean arePoliciesReadOnly = true;
ZooKeeperCache globalZkCache = configCache.cache();

try {
arePoliciesReadOnly = globalZkCache.exists(POLICIES_READONLY_FLAG_PATH);
arePoliciesReadOnly = pulsarResources.getNamespaceResources().exists(POLICIES_READONLY_FLAG_PATH);
} catch (Exception e) {
log.warn("Unable to fetch contents of [{}] from global zookeeper", POLICIES_READONLY_FLAG_PATH, e);
throw new IllegalStateException("Unable to fetch content from global zk");
Expand All @@ -511,17 +495,6 @@ private void validatePoliciesReadOnlyAccess() {
log.debug("Policies are read-only. Broker cannot do read-write operations");
}
throw new IllegalStateException("policies are in readonly mode");
} else {
// Make sure the broker is connected to the global zookeeper before writing. If not, throw an exception.
if (globalZkCache.getZooKeeper().getState() != States.CONNECTED) {
if (log.isDebugEnabled()) {
log.debug("Broker is not connected to the global zookeeper");
}
throw new IllegalStateException("not connected woith global zookeeper");
} else {
// Do nothing, just log the message.
log.debug("Broker is allowed to make read-write operations");
}
}
}

Expand Down Expand Up @@ -606,7 +579,7 @@ private CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName,
String role,
AuthenticationDataSource authData) {
try {
TenantInfo tenantInfo = configCache.propertiesCache()
TenantInfo tenantInfo = pulsarResources.getTenatResources()
.get(path(POLICIES, tenantName))
.orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Tenant does not exist"));

Expand All @@ -616,7 +589,7 @@ private CompletableFuture<Boolean> validateTenantAdminAccess(String tenantName,
return isRoleSuperUserFuture
.thenCombine(isRoleTenantAdminFuture, (isRoleSuperUser, isRoleTenantAdmin) ->
isRoleSuperUser || isRoleTenantAdmin);
} catch (KeeperException.NoNodeException e) {
} catch (NotFoundException e) {
log.warn("Failed to get tenant info data for non existing tenant {}", tenantName);
throw new RestException(Response.Status.NOT_FOUND, "Tenant does not exist");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.bookkeeper.util.ZkUtils;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
Expand All @@ -42,6 +43,8 @@

import com.fasterxml.jackson.core.type.TypeReference;

import lombok.Getter;

/**
* ConfigurationCacheService maintains a local in-memory cache of all the configurations and policies stored in
* zookeeper.
Expand All @@ -58,6 +61,8 @@ public class ConfigurationCacheService {
private ZooKeeperChildrenCache failureDomainListCache;
private ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache;
private ZooKeeperDataCache<FailureDomain> failureDomainCache;
@Getter
private PulsarResources pulsarResources;

public static final String POLICIES = "policies";
public static final String FAILURE_DOMAIN = "failureDomain";
Expand All @@ -67,12 +72,18 @@ public class ConfigurationCacheService {

public static final String PARTITIONED_TOPICS_ROOT = "/admin/partitioned-topics";

public ConfigurationCacheService(ZooKeeperCache cache) throws PulsarServerException {
this(cache, null);
public ConfigurationCacheService(ZooKeeperCache cache, String configuredClusterName) throws PulsarServerException {
this(cache, configuredClusterName, null);
}

public ConfigurationCacheService(ZooKeeperCache cache, String configuredClusterName) throws PulsarServerException {
public ConfigurationCacheService(ZooKeeperCache cache, String configuredClusterName,
PulsarResources pulsarResources) throws PulsarServerException {
this.cache = cache;
this.pulsarResources = pulsarResources;
this.CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" + configuredClusterName + "/" + FAILURE_DOMAIN;
if (cache == null) {
return;
}

initZK();

Expand All @@ -99,7 +110,6 @@ public ClusterData deserialize(String path, byte[] content) throws Exception {

this.clustersListCache = new ZooKeeperChildrenCache(cache, CLUSTERS_ROOT);

CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" + configuredClusterName + "/" + FAILURE_DOMAIN;
if (isNotBlank(configuredClusterName)) {
createFailureDomainRoot(cache.getZooKeeper(), CLUSTER_FAILURE_DOMAIN_ROOT);
this.failureDomainListCache = new ZooKeeperChildrenCache(cache, CLUSTER_FAILURE_DOMAIN_ROOT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.cache;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.zookeeper.ZooKeeper;

/**
* Configuration Cache service serves pulsar metadata.
*/
public class ConfigurationMetadataCacheService extends ConfigurationCacheService {

public ConfigurationMetadataCacheService(PulsarResources pulsarResources, String configuredClusterName) throws PulsarServerException {
super(null, configuredClusterName, pulsarResources);
}

@Override
public ZooKeeperCache cache() {
throw new UnsupportedOperationException();
}

public ZooKeeperDataCache<TenantInfo> propertiesCache() {
throw new UnsupportedOperationException();
}

public ZooKeeperDataCache<Policies> policiesCache() {
throw new UnsupportedOperationException();
}

public ZooKeeperDataCache<ClusterData> clustersCache() {
throw new UnsupportedOperationException();
}

public ZooKeeperChildrenCache clustersListCache() {
throw new UnsupportedOperationException();
}

public ZooKeeperChildrenCache failureDomainListCache() {
throw new UnsupportedOperationException();
}

public ZooKeeper getZooKeeper() {
throw new UnsupportedOperationException();
}

public ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache() {
throw new UnsupportedOperationException();
}

public ZooKeeperDataCache<FailureDomain> failureDomainCache() {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;
package org.apache.pulsar.broker.resources;

import com.fasterxml.jackson.core.type.TypeReference;
import java.util.List;
Expand Down
Loading

0 comments on commit 1fab5aa

Please sign in to comment.