Skip to content

Commit

Permalink
Proxy forward auth data (apache#1169)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Asher authored Feb 8, 2018
1 parent 85cce73 commit b429931
Show file tree
Hide file tree
Showing 15 changed files with 933 additions and 39 deletions.
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ enableRunBookieTogether=false
enableRunBookieAutoRecoveryTogether=false

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
proxyRoles=

# If this flag is set then the broker authenticates the original Auth data
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false

# Enable TLS
tlsEnabled=false
Expand Down
4 changes: 4 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ brokerClientAuthenticationParameters=
# operations and publish/consume from all topics (comma-separated)
superUserRoles=

# Forward client authorization Credentials to Broker for re authorization
# make sure authentication is enabled for this to take effect
forwardAuthorizationCredentials=false

##### --- TLS --- #####

# Enable TLS in the proxy
Expand Down
7 changes: 7 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ enablePersistentTopics=true
enableNonPersistentTopics=true

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
proxyRoles=

# If this flag is set then the broker authenticates the original Auth data
# else it just accepts the originalPrincipal and authorizes it (if required).
authenticateOriginalAuthData=false

# Enable authentication
authenticationEnabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ public class ServiceConfiguration implements PulsarConfiguration {
// role as proxyRoles - it will demand to see the original client role or certificate.
private Set<String> proxyRoles = Sets.newTreeSet();

// If this flag is set then the broker authenticates the original Auth data
// else it just accepts the originalPrincipal and authorizes it (if required).
private boolean authenticateOriginalAuthData = false;

// Allow wildcard matching in authorization
// (wildcard matching only applicable if wildcard-char:
// * presents at first or last position eg: *.pulsar.service, pulsar.service.*)
Expand Down Expand Up @@ -1377,4 +1381,12 @@ public boolean exposeTopicLevelMetricsInPrometheus() {
public void setExposeTopicLevelMetricsInPrometheus(boolean exposeTopicLevelMetricsInPrometheus) {
this.exposeTopicLevelMetricsInPrometheus = exposeTopicLevelMetricsInPrometheus;
}

public boolean authenticateOriginalAuthData() {
return authenticateOriginalAuthData;
}

public void setAuthenticateOriginalAuthData(boolean authenticateOriginalAuthData) {
this.authenticateOriginalAuthData = authenticateOriginalAuthData;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ public class ServerCnx extends PulsarHandler {
private int nonPersistentPendingMessages = 0;
private final int MaxNonPersistentPendingMessages;
private String originalPrincipal = null;
private Set<String> proxyRoles = Sets.newHashSet();

private Set<String> proxyRoles;
private boolean authenticateOriginalAuthData;

enum State {
Start, Connected, Failed
}
Expand All @@ -124,6 +125,7 @@ public ServerCnx(BrokerService service) {
this.MaxNonPersistentPendingMessages = service.pulsar().getConfiguration()
.getMaxConcurrentNonPersistentMessagePerConnection();
this.proxyRoles = service.pulsar().getConfiguration().getProxyRoles();
this.authenticateOriginalAuthData = service.pulsar().getConfiguration().authenticateOriginalAuthData();
}

@Override
Expand Down Expand Up @@ -214,11 +216,24 @@ protected void handleLookup(CommandLookupTopic lookup) {
if (topicName == null) {
return;
}


String originalPrincipal = null;
if (authenticateOriginalAuthData && lookup.hasOriginalAuthData()) {
originalPrincipal = validateOriginalPrincipal(
lookup.hasOriginalAuthData() ? lookup.getOriginalAuthData() : null,
lookup.hasOriginalAuthMethod() ? lookup.getOriginalAuthMethod() : null,
lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal, requestId,
lookup);

if (originalPrincipal == null) {
return;
}
} else {
originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal() : this.originalPrincipal;
}

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
final String originalPrincipal = lookup.hasOriginalPrincipal() ? lookup.getOriginalPrincipal()
: this.originalPrincipal;
if (lookupSemaphore.tryAcquire()) {
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for lookup ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
Expand All @@ -234,11 +249,11 @@ protected void handleLookup(CommandLookupTopic lookup) {
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}

String finalOriginalPrincipal = originalPrincipal;
isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
if (isProxyAuthorized) {
lookupDestinationAsync(getBrokerService().pulsar(), topicName,
lookup.getAuthoritative(), originalPrincipal != null ? originalPrincipal : authRole,
lookup.getAuthoritative(), finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole,
lookup.getRequestId()).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
Expand Down Expand Up @@ -287,11 +302,24 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
if (topicName == null) {
return;
}

String originalPrincipal = null;
if (authenticateOriginalAuthData && partitionMetadata.hasOriginalAuthData()) {
originalPrincipal = validateOriginalPrincipal(
partitionMetadata.hasOriginalAuthData() ? partitionMetadata.getOriginalAuthData() : null,
partitionMetadata.hasOriginalAuthMethod() ? partitionMetadata.getOriginalAuthMethod() : null,
partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal()
: this.originalPrincipal,
requestId, partitionMetadata);

if (originalPrincipal == null) {
return;
}
} else {
originalPrincipal = partitionMetadata.hasOriginalPrincipal() ? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
}

final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
if (lookupSemaphore.tryAcquire()) {
final String originalPrincipal = partitionMetadata.hasOriginalPrincipal()
? partitionMetadata.getOriginalPrincipal() : this.originalPrincipal;
if (invalidOriginalPrincipal(originalPrincipal)) {
final String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest ";
log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", remoteAddress, msg, authRole,
Expand All @@ -308,10 +336,11 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa
} else {
isProxyAuthorizedFuture = CompletableFuture.completedFuture(true);
}
String finalOriginalPrincipal = originalPrincipal;
isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
if (isProxyAuthorized) {
getPartitionedTopicMetadata(getBrokerService().pulsar(),
originalPrincipal != null ? originalPrincipal : authRole, topicName)
finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, topicName)
.handle((metadata, ex) -> {
if (ex == null) {
int partitions = metadata.partitions;
Expand Down Expand Up @@ -410,6 +439,39 @@ CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consum

return commandConsumerStatsResponseBuilder;
}

private String validateOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, Long requestId, GeneratedMessageLite request) {
ChannelHandler sslHandler = ctx.channel().pipeline().get(PulsarChannelInitializer.TLS_HANDLER);
SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}
try {
return getOriginalPrincipal(originalAuthData, originalAuthMethod, originalPrincipal, sslSession);
} catch (AuthenticationException e) {
String msg = "Unable to authenticate original authdata ";
log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage());
if (request instanceof CommandLookupTopic) {
ctx.writeAndFlush(newLookupErrorResponse(ServerError.AuthenticationError, msg, requestId));
} else if (request instanceof CommandPartitionedTopicMetadata) {
ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthenticationError, msg, requestId));
}
return null;
}
}

private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal,
SSLSession sslSession) throws AuthenticationException {
if (authenticateOriginalAuthData) {
if (originalAuthData != null) {
originalPrincipal = getBrokerService().getAuthenticationService().authenticate(
new AuthenticationDataCommand(originalAuthData, remoteAddress, sslSession), originalAuthMethod);
} else {
originalPrincipal = null;
}
}
return originalPrincipal;
}

@Override
protected void handleConnect(CommandConnect connect) {
Expand All @@ -430,8 +492,11 @@ protected void handleConnect(CommandConnect connect) {
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
}

originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
originalPrincipal = getOriginalPrincipal(
connect.hasOriginalAuthData() ? connect.getOriginalAuthData() : null,
connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null,
connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null,
sslSession);
authRole = getBrokerService().getAuthenticationService()
.authenticate(new AuthenticationDataCommand(authData, remoteAddress, sslSession), authMethod);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testKeepAliveNotEnforcedWithOlderClients() throws Exception {
assertEquals(serverCnx.getState(), State.Start);

// test server response to CONNECT
ByteBuf clientCommand = Commands.newConnect("none", "", ProtocolVersion.v0.getNumber(), null, null, null);
ByteBuf clientCommand = Commands.newConnect("none", "", ProtocolVersion.v0.getNumber(), null, null, null, null, null);
channel.writeInbound(clientCommand);

assertEquals(serverCnx.getState(), State.Connected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,23 @@ public class Commands {
private static final int checksumSize = 4;

public static ByteBuf newConnect(String authMethodName, String authData, String libVersion) {
return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, null /* target broker */, null /* originalPrincipal */);
return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, null /* target broker */,
null /* originalPrincipal */, null /* Client Auth Data */, null /* Client Auth Method */);
}

public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker) {
return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker, null);
return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker, null, null, null);
}

public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker, String originalPrincipal) {
return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker, originalPrincipal);
public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker,
String originalPrincipal, String clientAuthData, String clientAuthMethod) {
return newConnect(authMethodName, authData, getCurrentProtocolVersion(), libVersion, targetBroker,
originalPrincipal, clientAuthData, clientAuthMethod);
}

public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion,
String targetBroker, String originalPrincipal) {
String targetBroker, String originalPrincipal, String originalAuthData,
String originalAuthMethod) {
CommandConnect.Builder connectBuilder = CommandConnect.newBuilder();
connectBuilder.setClientVersion(libVersion != null ? libVersion : "Pulsar Client");
connectBuilder.setAuthMethodName(authMethodName);
Expand All @@ -116,6 +120,13 @@ public static ByteBuf newConnect(String authMethodName, String authData, int pro
connectBuilder.setOriginalPrincipal(originalPrincipal);
}

if (originalAuthData != null) {
connectBuilder.setOriginalAuthData(originalAuthData);
}

if (originalAuthMethod != null) {
connectBuilder.setOriginalAuthMethod(originalAuthMethod);
}
connectBuilder.setProtocolVersion(protocolVersion);
CommandConnect connect = connectBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.CONNECT).setConnect(connect));
Expand Down Expand Up @@ -450,7 +461,7 @@ public static ByteBuf newPartitionMetadataResponse(ServerError error, String err
}

public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
return Commands.newPartitionMetadataRequest(topic, requestId, null);
return Commands.newPartitionMetadataRequest(topic, requestId, null, null, null);
}

public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
Expand All @@ -469,7 +480,7 @@ public static ByteBuf newPartitionMetadataResponse(int partitions, long requestI
}

public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
return Commands.newLookup(topic, authoritative, null, requestId);
return Commands.newLookup(topic, authoritative, null, null, null, requestId);
}

public static ByteBuf newLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative,
Expand Down Expand Up @@ -875,12 +886,20 @@ public static enum ChecksumType {
None;
}

public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, String clientAuthRole) {
public static ByteBuf newPartitionMetadataRequest(String topic, long requestId, String originalAuthRole,
String originalAuthData, String originalAuthMethod) {
CommandPartitionedTopicMetadata.Builder partitionMetadataBuilder = CommandPartitionedTopicMetadata.newBuilder();
partitionMetadataBuilder.setTopic(topic);
partitionMetadataBuilder.setRequestId(requestId);
if (clientAuthRole != null) {
partitionMetadataBuilder.setOriginalPrincipal(clientAuthRole);
if (originalAuthRole != null) {
partitionMetadataBuilder.setOriginalPrincipal(originalAuthRole);
}
if (originalAuthData != null) {
partitionMetadataBuilder.setOriginalAuthData(originalAuthData);
}

if (originalAuthMethod != null) {
partitionMetadataBuilder.setOriginalAuthMethod(originalAuthMethod);
}
CommandPartitionedTopicMetadata partitionMetadata = partitionMetadataBuilder.build();
ByteBuf res = serializeWithSize(
Expand All @@ -890,13 +909,21 @@ public static ByteBuf newPartitionMetadataRequest(String topic, long requestId,
return res;
}

public static ByteBuf newLookup(String topic, boolean authoritative, String clientAuthRole, long requestId) {
public static ByteBuf newLookup(String topic, boolean authoritative, String originalAuthRole,
String originalAuthData, String originalAuthMethod, long requestId) {
CommandLookupTopic.Builder lookupTopicBuilder = CommandLookupTopic.newBuilder();
lookupTopicBuilder.setTopic(topic);
lookupTopicBuilder.setRequestId(requestId);
lookupTopicBuilder.setAuthoritative(authoritative);
if (clientAuthRole != null) {
lookupTopicBuilder.setOriginalPrincipal(clientAuthRole);
if (originalAuthRole != null) {
lookupTopicBuilder.setOriginalPrincipal(originalAuthRole);
}
if (originalAuthData != null) {
lookupTopicBuilder.setOriginalAuthData(originalAuthData);
}

if (originalAuthMethod != null) {
lookupTopicBuilder.setOriginalAuthMethod(originalAuthMethod);
}
CommandLookupTopic lookupBroker = lookupTopicBuilder.build();
ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.LOOKUP).setLookupTopic(lookupBroker));
Expand Down
Loading

0 comments on commit b429931

Please sign in to comment.