Skip to content

Commit

Permalink
Add subscription auth mode by prefix (apache#899)
Browse files Browse the repository at this point in the history
* Add subscription auth mode by prefix

* Fix tests

* Throws exceptions when auth role prefix is missed
  • Loading branch information
yush1ga authored and massakam committed Jan 15, 2018
1 parent 8aa1f6f commit 36226c7
Show file tree
Hide file tree
Showing 15 changed files with 246 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.zookeeper.ZooKeeperCache.cacheTimeOutInSec;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.common.naming.DestinationName;
Expand Down Expand Up @@ -78,14 +80,52 @@ public boolean canProduce(DestinationName destination, String role) throws Excep
* the fully qualified destination name associated with the destination.
* @param role
* the app id used to receive messages from the destination.
* @param subscription
* the subscription name defined by the client
*/
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role) {
return checkAuthorization(destination, role, AuthAction.consume);
public CompletableFuture<Boolean> canConsumeAsync(DestinationName destination, String role, String subscription) {
CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
try {
configCache.policiesCache().getAsync(POLICY_ROOT + destination.getNamespace()).thenAccept(policies -> {
if (!policies.isPresent()) {
if (log.isDebugEnabled()) {
log.debug("Policies node couldn't be found for destination : {}", destination);
}
} else {
if (isNotBlank(subscription)) {
switch (policies.get().subscription_auth_mode) {
case Prefix:
if (!subscription.startsWith(role)) {
PulsarServerException ex = new PulsarServerException(
String.format("Failed to create consumer - The subscription name needs to be prefixed by the authentication role, like %s-xxxx for destination: %s", role, destination));
permissionFuture.completeExceptionally(ex);
return;
}
break;
default:
break;
}
}
}
checkAuthorization(destination, role, AuthAction.consume).thenAccept(isAuthorized -> {
permissionFuture.complete(isAuthorized);
});
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination,
ex);
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for destination - {}", role, destination, e);
permissionFuture.completeExceptionally(e);
}
return permissionFuture;
}

public boolean canConsume(DestinationName destination, String role) throws Exception {
public boolean canConsume(DestinationName destination, String role, String subscription) throws Exception {
try {
return canConsumeAsync(destination, role).get(cacheTimeOutInSec, SECONDS);
return canConsumeAsync(destination, role, subscription).get(cacheTimeOutInSec, SECONDS);
} catch (InterruptedException e) {
log.warn("Time-out {} sec while checking authorization on {} ", cacheTimeOutInSec, destination);
throw e;
Expand All @@ -107,7 +147,7 @@ public boolean canConsume(DestinationName destination, String role) throws Excep
* @throws Exception
*/
public boolean canLookup(DestinationName destination, String role) throws Exception {
return canProduce(destination, role) || canConsume(destination, role);
return canProduce(destination, role) || canConsume(destination, role, null);
}

private CompletableFuture<Boolean> checkAuthorization(DestinationName destination, String role,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
Expand Down Expand Up @@ -1443,6 +1444,49 @@ public void unsubscribeNamespaceBundle(@PathParam("property") String property, @
nsName.toString(), bundleRange);
}

@POST
@Path("/{property}/{cluster}/{namespace}/subscriptionAuthMode")
@ApiOperation(value = " Set a subscription auth mode for all the destinations on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void setSubscriptionAuthMode(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, SubscriptionAuthMode subscriptionAuthMode) {
validateAdminAccessOnProperty(property);
validatePoliciesReadOnlyAccess();

if (subscriptionAuthMode == null) {
subscriptionAuthMode = SubscriptionAuthMode.None;
}

try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, property, cluster, namespace);
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
policies.subscription_auth_mode = subscriptionAuthMode;
globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, property, cluster, namespace));
log.info("[{}] Successfully updated subscription auth mode: namespace={}/{}/{}, map={}", clientAppId(), property,
cluster, namespace, jsonMapper().writeValueAsString(policies.backlog_quota_map));

} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update subscription auth mode for namespace {}/{}/{}: does not exist", clientAppId(),
property, cluster, namespace);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update subscription auth mode for namespace {}/{}/{}: concurrent modification",
clientAppId(), property, cluster, namespace);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update subscription auth mode for namespace {}/{}/{}", clientAppId(), property,
cluster, namespace, e);
throw new RestException(e);
}
}

private void clearBacklog(NamespaceName nsName, String bundleRange, String subscription) {
try {
List<Topic> topicList = pulsar().getBrokerService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public void checkPermissions() {
DestinationName destination = DestinationName.get(subscription.getDestination());
if (cnx.getBrokerService().getAuthorizationManager() != null) {
try {
if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId)) {
if (cnx.getBrokerService().getAuthorizationManager().canConsume(destination, appId, subscription.getName())) {
return;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
Expand Down Expand Up @@ -349,7 +350,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (service.isAuthorizationEnabled()) {
authorizationFuture = service.getAuthorizationManager().canConsumeAsync(
DestinationName.get(subscribe.getTopic()),
originalPrincipal != null ? originalPrincipal : authRole);
originalPrincipal != null ? originalPrincipal : authRole,
subscribe.getSubscription());
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
Expand Down Expand Up @@ -460,6 +462,15 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
}
return null;
}).exceptionally(ex -> {
String msg = String.format("[%s] %s with role %s", remoteAddress, ex.getMessage(), authRole);
if (ex.getCause() instanceof PulsarServerException) {
log.info(msg);
} else {
log.warn(msg);
}
ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, ex.getMessage()));
return null;
});
}

Expand Down
Loading

0 comments on commit 36226c7

Please sign in to comment.