Skip to content

Commit

Permalink
Proxy roles enforcement (apache#1168)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Asher authored Feb 6, 2018
1 parent d0a4ff3 commit 15e6655
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
// do all admin operations and publish/consume from all topics
private Set<String> superUserRoles = Sets.newTreeSet();

// Role names that are treated as "proxy roles". If the broker sees a request with
// role as proxyRoles - it will demand to see the original client role or certificate.
private Set<String> proxyRoles = Sets.newTreeSet();

// Allow wildcard matching in authorization
// (wildcard matching only applicable if wildcard-char:
// * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
Expand Down Expand Up @@ -794,7 +798,15 @@ public void setAuthorizationEnabled(boolean authorizationEnabled) {
public Set<String> getSuperUserRoles() {
return superUserRoles;
}


public Set<String> getProxyRoles() {
return proxyRoles;
}

public void setProxyRoles(Set<String> proxyRoles) {
this.proxyRoles = proxyRoles;
}

public boolean getAuthorizationAllowWildcardsMatching() {
return authorizationAllowWildcardsMatching;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,28 @@ public CompletableFuture<Boolean> canLookupAsync(DestinationName destination, St
finalResult.complete(produceAuthorized);
return;
}
} else if (log.isDebugEnabled()) {
log.debug("Destination [{}] Role [{}] exception occured while trying to check Produce permissions. {}",
destination.toString(), role, ex.getMessage());
} else {
if (log.isDebugEnabled()) {
log.debug(
"Destination [{}] Role [{}] exception occured while trying to check Produce permissions. {}",
destination.toString(), role, ex.getMessage());
}
}
canConsumeAsync(destination, role, null).whenComplete((consumeAuthorized, e) -> {
if (e == null) {
if (consumeAuthorized) {
finalResult.complete(consumeAuthorized);
return;
}
} else if (log.isDebugEnabled()) {
log.debug(
"Destination [{}] Role [{}] exception occured while trying to check Consume permissions. {}",
destination.toString(), role, e.getMessage());
} else {
if (log.isDebugEnabled()) {
log.debug(
"Destination [{}] Role [{}] exception occured while trying to check Consume permissions. {}",
destination.toString(), role, e.getMessage());

}
finalResult.completeExceptionally(e);
return;
}
finalResult.complete(false);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand All @@ -37,6 +38,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -77,6 +79,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Sets;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -89,7 +93,7 @@ public class ServerCnx extends PulsarHandler {
private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
private State state;
private volatile boolean isActive = true;
private String authRole = null;
String authRole = null;

// Max number of pending requests per connections. If multiple producers are sharing the same connection the flow
// control done by a single producer might not be enough to prevent write spikes on the broker.
Expand All @@ -101,7 +105,8 @@ public class ServerCnx extends PulsarHandler {
private int nonPersistentPendingMessages = 0;
private final int MaxNonPersistentPendingMessages;
private String originalPrincipal = null;

private Set<String> proxyRoles = Sets.newHashSet();

enum State {
Start, Connected, Failed
}
Expand All @@ -117,6 +122,7 @@ public ServerCnx(BrokerService service) {
this.replicatorPrefix = service.pulsar().getConfiguration().getReplicatorPrefix();
this.MaxNonPersistentPendingMessages = service.pulsar().getConfiguration()
.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
}

@Override
Expand Down Expand Up @@ -180,6 +186,19 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.close();
}

private boolean validateOriginalPrincipal(String originalPrincipal, ByteBuf errorResponse, String topicName,
String msg) {
if (service.isAuthenticationEnabled() && service.isAuthorizationEnabled() && proxyRoles.contains(authRole)
&& (StringUtils.isBlank(originalPrincipal) || proxyRoles.contains(originalPrincipal))) {
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
originalPrincipal, topicName);
ctx.writeAndFlush(errorResponse);
return false;
}

return true;
}

// ////
// // Incoming commands handling
// ////
Expand All @@ -196,6 +215,13 @@ protected void handleLookup(CommandLookupTopic lookup) {
if (lookupSemaphore.tryAcquire()) {
final String originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal()
: this.originalPrincipal;
if (!validateOriginalPrincipal(originalPrincipal,
newLookupErrorResponse(ServerError.AuthorizationError,
"Valid Proxy Client role should be provided for lookup ", requestId),
topicName, "Valid Proxy Client role should be provided for lookup ")) {
lookupSemaphore.release();
return;
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationManager()
Expand Down Expand Up @@ -253,9 +279,15 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
}
final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {

final String originalPrincipal = partitionMetadata.hasOriginalPrincipal()
? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
if (!validateOriginalPrincipal(originalPrincipal,
Commands.newPartitionMetadataResponse(ServerError.AuthorizationError,
"Valid Proxy Client role should be provided for getPartitionMetadataRequest ", requestId),
topicName, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ")) {
lookupSemaphore.release();
return;
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationManager()
Expand Down Expand Up @@ -417,7 +449,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final String topicName = subscribe.getTopic();
final long requestId = subscribe.getRequestId();
final long consumerId = subscribe.getConsumerId();

if (!validateOriginalPrincipal(originalPrincipal,
Commands.newError(requestId, ServerError.AuthorizationError,
"Valid Proxy Client role should be provided while subscribing "),
topicName, "Valid Proxy Client role should be provided while subscribing ")) {
return;
}
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationManager().canConsumeAsync(DestinationName.get(topicName),
Expand Down Expand Up @@ -570,6 +607,13 @@ protected void handleProducer(final CommandProducer cmdProducer) {
final long producerId = cmdProducer.getProducerId();
final long requestId = cmdProducer.getRequestId();

if (!validateOriginalPrincipal(originalPrincipal,
Commands.newError(requestId, ServerError.AuthorizationError,
"Valid Proxy Client role should be provided while creating producer "),
topicName, "Valid Proxy Client role should be provided while creating producer ")) {
return;
}

CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
isProxyAuthorizedFuture = service.getAuthorizationManager().canProduceAsync(DestinationName.get(topicName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1341,6 +1341,7 @@ private void resetChannel() throws Exception {
channel.close().get();
}
serverCnx = new ServerCnx(brokerService);
serverCnx.authRole = "";
channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4), serverCnx);
}

Expand Down
Loading

0 comments on commit 15e6655

Please sign in to comment.