diff --git a/conf/broker.conf b/conf/broker.conf index 47eb3357a3507..6fcd02cdf5051 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -381,6 +381,9 @@ authenticationEnabled=false # Autentication provider name list, which is comma separated list of class names authenticationProviders= +# Interval of time for checking for expired authentication credentials +authenticationRefreshCheckSeconds=60 + # Enforce authorization authorizationEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 52c226a2b1b50..54cbe621b9675 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -679,6 +679,12 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private Set authenticationProviders = Sets.newTreeSet(); + @FieldContext( + category = CATEGORY_AUTHENTICATION, + doc = "Interval of time for checking for expired authentication credentials" + ) + private int authenticationRefreshCheckSeconds = 60; + @FieldContext( category = CATEGORY_AUTHORIZATION, doc = "Enforce authorization" diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java index 7ad39fc716ca1..faf868ff7b9e6 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java @@ -18,21 +18,26 @@ */ package org.apache.pulsar.broker.authentication; -import io.jsonwebtoken.Claims; -import io.jsonwebtoken.Jwt; -import io.jsonwebtoken.JwtException; -import io.jsonwebtoken.Jwts; -import io.jsonwebtoken.SignatureAlgorithm; +import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; +import java.net.SocketAddress; import java.security.Key; import javax.naming.AuthenticationException; +import javax.net.ssl.SSLSession; -import io.jsonwebtoken.security.SignatureException; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.common.api.AuthData; + +import io.jsonwebtoken.Claims; +import io.jsonwebtoken.Jwt; +import io.jsonwebtoken.JwtException; +import io.jsonwebtoken.Jwts; +import io.jsonwebtoken.SignatureAlgorithm; +import io.jsonwebtoken.security.SignatureException; public class AuthenticationProviderToken implements AuthenticationProvider { @@ -81,7 +86,13 @@ public String authenticate(AuthenticationDataSource authData) throws Authenticat String token = getToken(authData); // Parse Token by validating - return parseToken(token); + return getPrincipal(authenticateToken(token)); + } + + @Override + public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession) + throws AuthenticationException { + return new TokenAuthenticationState(this, authData, remoteAddress, sslSession); } public static String getToken(AuthenticationDataSource authData) throws AuthenticationException { @@ -112,19 +123,21 @@ private static String validateToken(final String token) throws AuthenticationExc } } - private String parseToken(final String token) throws AuthenticationException { + @SuppressWarnings("unchecked") + private Jwt authenticateToken(final String token) throws AuthenticationException { try { - @SuppressWarnings("unchecked") - Jwt jwt = Jwts.parser() + return Jwts.parser() .setSigningKey(validationKey) .parse(token); - - return jwt.getBody().get(roleClaim, String.class); } catch (JwtException e) { throw new AuthenticationException("Failed to authentication token: " + e.getMessage()); } } + private String getPrincipal(Jwt jwt) { + return jwt.getBody().get(roleClaim, String.class); + } + /** * Try to get the validation key for tokens from several possible config options. */ @@ -166,4 +179,62 @@ private SignatureAlgorithm getPublicKeyAlgType(ServiceConfiguration conf) throws return SignatureAlgorithm.RS256; } } + + private static final class TokenAuthenticationState implements AuthenticationState { + private final AuthenticationProviderToken provider; + private AuthenticationDataSource authenticationDataSource; + private Jwt jwt; + private final SocketAddress remoteAddress; + private final SSLSession sslSession; + private long expiration; + + TokenAuthenticationState( + AuthenticationProviderToken provider, + AuthData authData, + SocketAddress remoteAddress, + SSLSession sslSession) throws AuthenticationException { + this.provider = provider; + this.remoteAddress = remoteAddress; + this.sslSession = sslSession; + this.authenticate(authData); + } + + @Override + public String getAuthRole() throws AuthenticationException { + return provider.getPrincipal(jwt); + } + + @Override + public AuthData authenticate(AuthData authData) throws AuthenticationException { + String token = new String(authData.getBytes(), UTF_8); + + this.jwt = provider.authenticateToken(token); + this.authenticationDataSource = new AuthenticationDataCommand(token, remoteAddress, sslSession); + if (jwt.getBody().getExpiration() != null) { + this.expiration = jwt.getBody().getExpiration().getTime(); + } else { + // Disable expiration + this.expiration = Long.MAX_VALUE; + } + + // There's no additional auth stage required + return null; + } + + @Override + public AuthenticationDataSource getAuthDataSource() { + return authenticationDataSource; + } + + @Override + public boolean isComplete() { + // The authentication of tokens is always done in one single stage + return true; + } + + @Override + public boolean isExpired() { + return expiration < System.currentTimeMillis(); + } + } } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java index 5042ef881d098..ac881ac6aa222 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationState.java @@ -20,7 +20,6 @@ package org.apache.pulsar.broker.authentication; import javax.naming.AuthenticationException; -import javax.servlet.http.HttpServletRequest; import org.apache.pulsar.common.api.AuthData; @@ -59,4 +58,25 @@ public interface AuthenticationState { default long getStateId() { return -1L; } + + /** + * If the authentication state is expired, it will force the connection to be re-authenticated. + */ + default boolean isExpired() { + return false; + } + + /** + * If the authentication state supports refreshing and the credentials are expired, + * the auth provider will call this method ot initiate the refresh process. + *

+ * The auth state here will return the broker side data that will be used to send + * a challenge to the client. + * + * @return the {@link AuthData} for the broker challenge to client + * @throws AuthenticationException + */ + default AuthData refreshAuthentication() throws AuthenticationException { + return null; + } } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java index b5183e34357a8..f26777683489b 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTokenTest.java @@ -19,7 +19,10 @@ package org.apache.pulsar.broker.authentication; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.jsonwebtoken.Claims; @@ -28,6 +31,7 @@ import io.jsonwebtoken.SignatureAlgorithm; import io.jsonwebtoken.io.Decoders; import io.jsonwebtoken.security.Keys; +import lombok.Cleanup; import java.io.File; import java.io.IOException; @@ -46,6 +50,7 @@ import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils; +import org.apache.pulsar.common.api.AuthData; import org.testng.annotations.Test; public class AuthenticationProviderTokenTest { @@ -533,4 +538,36 @@ public void testValidationWhenPublicKeyAlgIsInvalid() throws IOException { new AuthenticationProviderToken().initialize(conf); } + + + @Test + public void testExpiringToken() throws Exception { + SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256); + + @Cleanup + AuthenticationProviderToken provider = new AuthenticationProviderToken(); + + Properties properties = new Properties(); + properties.setProperty(AuthenticationProviderToken.CONF_TOKEN_SECRET_KEY, + AuthTokenUtils.encodeKeyBase64(secretKey)); + + ServiceConfiguration conf = new ServiceConfiguration(); + conf.setProperties(properties); + provider.initialize(conf); + + // Create a token that will expire in 3 seconds + String expiringToken = AuthTokenUtils.createToken(secretKey, SUBJECT, + Optional.of(new Date(System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(3)))); + + AuthenticationState authState = provider.newAuthState(AuthData.of(expiringToken.getBytes()), null, null); + assertTrue(authState.isComplete()); + assertFalse(authState.isExpired()); + + Thread.sleep(TimeUnit.SECONDS.toMillis(6)); + assertTrue(authState.isExpired()); + assertTrue(authState.isComplete()); + + AuthData brokerData = authState.refreshAuthentication(); + assertNull(brokerData); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java index 9fb81ec2b1631..ce16a7ea99f42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java @@ -18,17 +18,27 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.bookkeeper.util.SafeRunnable.safeRun; + +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; + import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.common.protocol.ByteBufPair; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.NettySslContextBuilder; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.flow.FlowControlHandler; +import lombok.extern.slf4j.Slf4j; +@Slf4j public class PulsarChannelInitializer extends ChannelInitializer { public static final String TLS_HANDLER = "tls"; @@ -38,6 +48,14 @@ public class PulsarChannelInitializer extends ChannelInitializer private final NettySslContextBuilder sslCtxRefresher; private final ServiceConfiguration brokerConf; + // This cache is used to maintain a list of active connections to iterate over them + // We keep weak references to have the cache to be auto cleaned up when the connections + // objects are GCed. + private final Cache connections = Caffeine.newBuilder() + .weakKeys() + .weakValues() + .build(); + /** * @param pulsar * An instance of {@link PulsarService} @@ -59,6 +77,10 @@ public PulsarChannelInitializer(PulsarService pulsar, boolean enableTLS) throws this.sslCtxRefresher = null; } this.brokerConf = pulsar.getConfiguration(); + + pulsar.getExecutor().scheduleAtFixedRate(safeRun(this::refreshAuthenticationCredentials), + pulsar.getConfig().getAuthenticationRefreshCheckSeconds(), + pulsar.getConfig().getAuthenticationRefreshCheckSeconds(), TimeUnit.SECONDS); } @Override @@ -78,6 +100,19 @@ protected void initChannel(SocketChannel ch) throws Exception { // ServerCnx ends up reading higher number of messages and broker can not throttle the messages by disabling // auto-read. ch.pipeline().addLast("flowController", new FlowControlHandler()); - ch.pipeline().addLast("handler", new ServerCnx(pulsar)); + ServerCnx cnx = new ServerCnx(pulsar); + ch.pipeline().addLast("handler", cnx); + + connections.put(ch.remoteAddress(), cnx); + } + + private void refreshAuthenticationCredentials() { + connections.asMap().values().forEach(cnx -> { + try { + cnx.refreshAuthenticationCredentials(); + } catch (Throwable t) { + log.warn("[{}] Failed to refresh auth credentials", cnx.getRemoteAddress()); + } + }); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 2a78508ed0304..d79f41bacbaa3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -51,7 +51,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; -import org.apache.pulsar.broker.authentication.AuthenticationDataCommand; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; @@ -94,6 +93,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; +import org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; @@ -127,6 +127,10 @@ public class ServerCnx extends PulsarHandler { AuthenticationDataSource authenticationData; AuthenticationProvider authenticationProvider; AuthenticationState authState; + // In case of proxy, if the authentication credentials are forwardable, + // it will hold the credentials of the original client + AuthenticationState originalAuthState; + private boolean pendingAuthChallengeResponse = false; // Max number of pending requests per connections. If multiple producers are sharing the same connection the flow // control done by a single producer might not be enough to prevent write spikes on the broker. @@ -143,9 +147,10 @@ public class ServerCnx extends PulsarHandler { private final boolean schemaValidationEnforced; private String authMethod = "none"; private final int maxMessageSize; - + // Flag to manage throttling-rate by atomically enable/disable read-channel. private volatile boolean autoReadDisabledRateLimiting = false; + private FeatureFlags features; enum State { Start, Connected, Failed, Connecting @@ -452,19 +457,6 @@ CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consum return commandConsumerStatsResponseBuilder; } - private String getOriginalPrincipal(String originalAuthData, String originalAuthMethod, String originalPrincipal, - SSLSession sslSession) throws AuthenticationException { - if (authenticateOriginalAuthData) { - if (originalAuthData != null) { - originalPrincipal = getBrokerService().getAuthenticationService().authenticate( - new AuthenticationDataCommand(originalAuthData, remoteAddress, sslSession), originalAuthMethod); - } else { - originalPrincipal = null; - } - } - return originalPrincipal; - } - // complete the connect and sent newConnected command private void completeConnect(int clientProtoVersion, String clientVersion) { ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, maxMessageSize)); @@ -476,19 +468,53 @@ private void completeConnect(int clientProtoVersion, String clientVersion) { } // According to auth result, send newConnected or newAuthChallenge command. - private void doAuthentication(AuthData clientData, - int clientProtocolVersion, - String clientVersion) throws Exception { + private State doAuthentication(AuthData clientData, + int clientProtocolVersion, + String clientVersion) throws Exception { + + // The original auth state can only be set on subsequent auth attempts (and only + // in presence of a proxy and if the proxy is forwarding the credentials). + // In this case, the re-validation needs to be done against the original client + // credentials. + boolean useOriginalAuthState = (originalAuthState != null); + AuthenticationState authState = useOriginalAuthState ? originalAuthState : this.authState; + String authRole = useOriginalAuthState ? originalPrincipal : this.authRole; AuthData brokerData = authState.authenticate(clientData); - // authentication has completed, will send newConnected command. + + if (authState.isComplete()) { - authRole = authState.getAuthRole(); + // Authentication has completed. It was either: + // 1. the 1st time the authentication process was done, in which case we'll + // a `CommandConnected` response + // 2. an authentication refresh, in which case we don't need to do anything else + + String newAuthRole = authState.getAuthRole(); + + if (!useOriginalAuthState) { + this.authRole = newAuthRole; + } + if (log.isDebugEnabled()) { log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", - remoteAddress, authMethod, authRole, originalPrincipal); + remoteAddress, authMethod, authRole, originalPrincipal); } - completeConnect(clientProtocolVersion, clientVersion); - return; + + if (state != State.Connected) { + // First time authentication is done + completeConnect(clientProtocolVersion, clientVersion); + } else { + // If the connection was already ready, it means we're doing a refresh + if (!StringUtils.isEmpty(authRole)) { + if (!authRole.equals(newAuthRole)) { + log.warn("[{}] Principal cannot be changed during an authentication refresh", remoteAddress); + ctx.close(); + } else { + log.info("[{}] Refreshed authentication credentials", remoteAddress); + } + } + } + + return State.Connected; } // auth not complete, continue auth with client side. @@ -497,8 +523,60 @@ private void doAuthentication(AuthData clientData, log.debug("[{}] Authentication in progress client by method {}.", remoteAddress, authMethod); } - state = State.Connecting; - return; + return State.Connecting; + } + + public void refreshAuthenticationCredentials() { + AuthenticationState authState = this.originalAuthState != null ? originalAuthState : this.authState; + + if (authState == null) { + // Authentication is disabled or there's no local state to refresh + return; + } else if (getState() != State.Connected || !isActive) { + // Connection is either still being established or already closed. + return; + } else if (authState != null && !authState.isExpired()) { + // Credentials are still valid. Nothing to do at this point + return; + } else if (originalPrincipal != null && originalAuthState == null) { + log.info( + "[{}] Cannot revalidate user credential when using proxy and not forwarding the credentials. Closing connection", + remoteAddress); + return; + } + + ctx.executor().execute(SafeRun.safeRun(() -> { + log.info("[{}] Refreshing authentication credentials", remoteAddress); + + if (!supportsAuthenticationRefresh()) { + log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", remoteAddress); + ctx.close(); + return; + } + + if (pendingAuthChallengeResponse) { + log.warn("[{}] Closing connection after timeout on refreshing auth credentials", remoteAddress); + ctx.close(); + return; + } + + try { + AuthData brokerData = authState.refreshAuthentication(); + + ctx.writeAndFlush(Commands.newAuthChallenge(authMethod, brokerData, remoteEndpointProtocolVersion)); + if (log.isDebugEnabled()) { + log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", + remoteAddress, authMethod); + } + + pendingAuthChallengeResponse = true; + + } catch (AuthenticationException e) { + log.warn("[{}] Failed to refresh authentication: ", + remoteAddress, e.getMessage()); + ctx.close(); + } + })); } @Override @@ -512,6 +590,7 @@ protected void handleConnect(CommandConnect connect) { String clientVersion = connect.getClientVersion(); int clientProtocolVersion = connect.getProtocolVersion(); + features = connect.getFeatureFlags(); if (!service.isAuthenticationEnabled()) { completeConnect(clientProtocolVersion, clientVersion); @@ -551,18 +630,34 @@ protected void handleConnect(CommandConnect connect) { if (sslHandler != null) { sslSession = ((SslHandler) sslHandler).engine().getSession(); } - originalPrincipal = getOriginalPrincipal( - connect.hasOriginalAuthData() ? connect.getOriginalAuthData() : null, - connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : null, - connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null, - sslSession); authState = authenticationProvider.newAuthState(clientData, remoteAddress, sslSession); authenticationData = authState.getAuthDataSource(); - doAuthentication(clientData, clientProtocolVersion, clientVersion); + state = doAuthentication(clientData, clientProtocolVersion, clientVersion); + + // This will fail the check if: + // 1. client is coming through a proxy + // 2. we require to validate the original credentials + // 3. no credentials were passed + if (connect.hasOriginalPrincipal() && service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) { + AuthenticationProvider originalAuthenticationProvider = getBrokerService() + .getAuthenticationService() + .getAuthenticationProvider(authMethod); + originalAuthState = originalAuthenticationProvider.newAuthState( + AuthData.of(connect.getOriginalAuthData().getBytes()), + remoteAddress, + sslSession); + originalPrincipal = originalAuthState.getAuthRole(); + } else { + originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null; + } } catch (Exception e) { String msg = "Unable to authenticate"; - log.warn("[{}] {} ", remoteAddress, msg, e); + if (e instanceof AuthenticationException) { + log.warn("[{}] {}: {}", remoteAddress, msg, e.getMessage()); + } else { + log.warn("[{}] {}", remoteAddress, msg, e); + } ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg)); close(); } @@ -570,10 +665,11 @@ protected void handleConnect(CommandConnect connect) { @Override protected void handleAuthResponse(CommandAuthResponse authResponse) { - checkArgument(state == State.Connecting); checkArgument(authResponse.hasResponse()); checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName()); + pendingAuthChallengeResponse = false; + if (log.isDebugEnabled()) { log.debug("Received AuthResponse from {}, auth method: {}", remoteAddress, authResponse.getResponse().getAuthMethodName()); @@ -582,10 +678,14 @@ protected void handleAuthResponse(CommandAuthResponse authResponse) { try { AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData().toByteArray()); doAuthentication(clientData, authResponse.getProtocolVersion(), authResponse.getClientVersion()); + } catch (AuthenticationException e) { + log.warn("[{}] Authentication failed: {} ", remoteAddress, e.getMessage()); + ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, e.getMessage())); + close(); } catch (Exception e) { String msg = "Unable to handleAuthResponse"; log.warn("[{}] {} ", remoteAddress, msg, e); - ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, msg)); + ctx.writeAndFlush(Commands.newError(-1, ServerError.UnknownError, msg)); close(); } } @@ -1611,7 +1711,7 @@ public void enableCnxAutoRead() { autoReadDisabledRateLimiting = false; } } - + private ServerError getErrorCode(CompletableFuture future) { ServerError error = ServerError.UnknownError; try { @@ -1692,6 +1792,10 @@ public boolean isBatchMessageCompatibleVersion() { return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber(); } + boolean supportsAuthenticationRefresh() { + return features != null && features.getSupportsAuthRefresh(); + } + public String getClientVersion() { return clientVersion; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index e0bba366bb655..dfa60e8294df5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -336,7 +336,10 @@ protected void handleAuthChallenge(CommandAuthChallenge authChallenge) { connectionFuture.completeExceptionally(writeFuture.cause()); } }); - state = State.Connecting; + + if (state == State.SentConnectFrame) { + state = State.Connecting; + } } catch (Exception e) { log.error("{} Error mutual verify: {}", ctx.channel(), e); connectionFuture.completeExceptionally(e); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 1e9ffd608fa6d..e282ddc1b2fe7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -6374,6 +6374,10 @@ public interface CommandConnectOrBuilder // optional string original_auth_method = 9; boolean hasOriginalAuthMethod(); String getOriginalAuthMethod(); + + // optional .pulsar.proto.FeatureFlags feature_flags = 10; + boolean hasFeatureFlags(); + org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags getFeatureFlags(); } public static final class CommandConnect extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -6632,6 +6636,16 @@ private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getOriginal } } + // optional .pulsar.proto.FeatureFlags feature_flags = 10; + public static final int FEATURE_FLAGS_FIELD_NUMBER = 10; + private org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags featureFlags_; + public boolean hasFeatureFlags() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + public org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags getFeatureFlags() { + return featureFlags_; + } + private void initFields() { clientVersion_ = ""; authMethod_ = org.apache.pulsar.common.api.proto.PulsarApi.AuthMethod.AuthMethodNone; @@ -6642,6 +6656,7 @@ private void initFields() { originalPrincipal_ = ""; originalAuthData_ = ""; originalAuthMethod_ = ""; + featureFlags_ = org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -6691,6 +6706,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000100) == 0x00000100)) { output.writeBytes(9, getOriginalAuthMethodBytes()); } + if (((bitField0_ & 0x00000200) == 0x00000200)) { + output.writeMessage(10, featureFlags_); + } } private int memoizedSerializedSize = -1; @@ -6735,6 +6753,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeBytesSize(9, getOriginalAuthMethodBytes()); } + if (((bitField0_ & 0x00000200) == 0x00000200)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeMessageSize(10, featureFlags_); + } memoizedSerializedSize = size; return size; } @@ -6866,6 +6888,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000080); originalAuthMethod_ = ""; bitField0_ = (bitField0_ & ~0x00000100); + featureFlags_ = org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.getDefaultInstance(); + bitField0_ = (bitField0_ & ~0x00000200); return this; } @@ -6935,6 +6959,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect buildPartial( to_bitField0_ |= 0x00000100; } result.originalAuthMethod_ = originalAuthMethod_; + if (((from_bitField0_ & 0x00000200) == 0x00000200)) { + to_bitField0_ |= 0x00000200; + } + result.featureFlags_ = featureFlags_; result.bitField0_ = to_bitField0_; return result; } @@ -6968,6 +6996,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandCon if (other.hasOriginalAuthMethod()) { setOriginalAuthMethod(other.getOriginalAuthMethod()); } + if (other.hasFeatureFlags()) { + mergeFeatureFlags(other.getFeatureFlags()); + } return this; } @@ -7050,6 +7081,16 @@ public Builder mergeFrom( originalAuthMethod_ = input.readBytes(); break; } + case 82: { + org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.newBuilder(); + if (hasFeatureFlags()) { + subBuilder.mergeFrom(getFeatureFlags()); + } + input.readMessage(subBuilder, extensionRegistry); + setFeatureFlags(subBuilder.buildPartial()); + subBuilder.recycle(); + break; + } } } } @@ -7341,6 +7382,49 @@ void setOriginalAuthMethod(org.apache.pulsar.shaded.com.google.protobuf.v241.Byt } + // optional .pulsar.proto.FeatureFlags feature_flags = 10; + private org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags featureFlags_ = org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.getDefaultInstance(); + public boolean hasFeatureFlags() { + return ((bitField0_ & 0x00000200) == 0x00000200); + } + public org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags getFeatureFlags() { + return featureFlags_; + } + public Builder setFeatureFlags(org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags value) { + if (value == null) { + throw new NullPointerException(); + } + featureFlags_ = value; + + bitField0_ |= 0x00000200; + return this; + } + public Builder setFeatureFlags( + org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.Builder builderForValue) { + featureFlags_ = builderForValue.build(); + + bitField0_ |= 0x00000200; + return this; + } + public Builder mergeFeatureFlags(org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags value) { + if (((bitField0_ & 0x00000200) == 0x00000200) && + featureFlags_ != org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.getDefaultInstance()) { + featureFlags_ = + org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.newBuilder(featureFlags_).mergeFrom(value).buildPartial(); + } else { + featureFlags_ = value; + } + + bitField0_ |= 0x00000200; + return this; + } + public Builder clearFeatureFlags() { + featureFlags_ = org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.getDefaultInstance(); + + bitField0_ = (bitField0_ & ~0x00000200); + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandConnect) } @@ -7352,6 +7436,326 @@ void setOriginalAuthMethod(org.apache.pulsar.shaded.com.google.protobuf.v241.Byt // @@protoc_insertion_point(class_scope:pulsar.proto.CommandConnect) } + public interface FeatureFlagsOrBuilder + extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder { + + // optional bool supports_auth_refresh = 1 [default = false]; + boolean hasSupportsAuthRefresh(); + boolean getSupportsAuthRefresh(); + } + public static final class FeatureFlags extends + org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite + implements FeatureFlagsOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage { + // Use FeatureFlags.newBuilder() to construct. + private io.netty.util.Recycler.Handle handle; + private FeatureFlags(io.netty.util.Recycler.Handle handle) { + this.handle = handle; + } + + private static final io.netty.util.Recycler RECYCLER = new io.netty.util.Recycler() { + protected FeatureFlags newObject(Handle handle) { + return new FeatureFlags(handle); + } + }; + + public void recycle() { + this.initFields(); + this.memoizedIsInitialized = -1; + this.bitField0_ = 0; + this.memoizedSerializedSize = -1; + if (handle != null) { RECYCLER.recycle(this, handle); } + } + + private FeatureFlags(boolean noInit) {} + + private static final FeatureFlags defaultInstance; + public static FeatureFlags getDefaultInstance() { + return defaultInstance; + } + + public FeatureFlags getDefaultInstanceForType() { + return defaultInstance; + } + + private int bitField0_; + // optional bool supports_auth_refresh = 1 [default = false]; + public static final int SUPPORTS_AUTH_REFRESH_FIELD_NUMBER = 1; + private boolean supportsAuthRefresh_; + public boolean hasSupportsAuthRefresh() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public boolean getSupportsAuthRefresh() { + return supportsAuthRefresh_; + } + + private void initFields() { + supportsAuthRefresh_ = false; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output) + throws java.io.IOException { + throw new RuntimeException("Cannot use CodedOutputStream"); + } + + public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBool(1, supportsAuthRefresh_); + } + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeBoolSize(1, supportsAuthRefresh_); + } + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseFrom(byte[] data) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseFrom( + byte[] data, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseFrom( + java.io.InputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseDelimitedFrom( + java.io.InputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder< + org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags, Builder> + implements org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlagsOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder { + // Construct using org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.newBuilder() + private final io.netty.util.Recycler.Handle handle; + private Builder(io.netty.util.Recycler.Handle handle) { + this.handle = handle; + maybeForceBuilderInitialization(); + } + private final static io.netty.util.Recycler RECYCLER = new io.netty.util.Recycler() { + protected Builder newObject(io.netty.util.Recycler.Handle handle) { + return new Builder(handle); + } + }; + + public void recycle() { + clear(); + if (handle != null) {RECYCLER.recycle(this, handle);} + } + + private void maybeForceBuilderInitialization() { + } + private static Builder create() { + return RECYCLER.get(); + } + + public Builder clear() { + super.clear(); + supportsAuthRefresh_ = false; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags getDefaultInstanceForType() { + return org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.getDefaultInstance(); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags build() { + org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags buildParsed() + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags buildPartial() { + org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags result = org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.RECYCLER.get(); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.supportsAuthRefresh_ = supportsAuthRefresh_; + result.bitField0_ = to_bitField0_; + return result; + } + + public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags other) { + if (other == org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags.getDefaultInstance()) return this; + if (other.hasSupportsAuthRefresh()) { + setSupportsAuthRefresh(other.getSupportsAuthRefresh()); + } + return this; + } + + public final boolean isInitialized() { + return true; + } + + public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + throw new java.io.IOException("Merge from CodedInputStream is disabled"); + } + public Builder mergeFrom( + org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + + return this; + default: { + if (!input.skipField(tag)) { + + return this; + } + break; + } + case 8: { + bitField0_ |= 0x00000001; + supportsAuthRefresh_ = input.readBool(); + break; + } + } + } + } + + private int bitField0_; + + // optional bool supports_auth_refresh = 1 [default = false]; + private boolean supportsAuthRefresh_ ; + public boolean hasSupportsAuthRefresh() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public boolean getSupportsAuthRefresh() { + return supportsAuthRefresh_; + } + public Builder setSupportsAuthRefresh(boolean value) { + bitField0_ |= 0x00000001; + supportsAuthRefresh_ = value; + + return this; + } + public Builder clearSupportsAuthRefresh() { + bitField0_ = (bitField0_ & ~0x00000001); + supportsAuthRefresh_ = false; + + return this; + } + + // @@protoc_insertion_point(builder_scope:pulsar.proto.FeatureFlags) + } + + static { + defaultInstance = new FeatureFlags(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:pulsar.proto.FeatureFlags) + } + public interface CommandConnectedOrBuilder extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 5b8c91321279c..4acd7dc8e54cb 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -25,16 +25,20 @@ import static org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8; import com.google.common.annotations.VisibleForTesting; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; + import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; + import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; + import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.KeySharedPolicy; import org.apache.pulsar.client.api.Range; @@ -99,6 +103,7 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess; import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe; +import org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags; import org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue; import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; @@ -146,6 +151,12 @@ public static ByteBuf newConnect(String authMethodName, String authData, String originalPrincipal, clientAuthData, clientAuthMethod); } + public static FeatureFlags getFeatureFlags() { + FeatureFlags.Builder flags = FeatureFlags.newBuilder(); + flags.setSupportsAuthRefresh(true); + return flags.build(); + } + public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion, String targetBroker, String originalPrincipal, String originalAuthData, String originalAuthMethod) { @@ -181,6 +192,8 @@ public static ByteBuf newConnect(String authMethodName, String authData, int pro connectBuilder.setOriginalAuthMethod(originalAuthMethod); } connectBuilder.setProtocolVersion(protocolVersion); + + connectBuilder.setFeatureFlags(getFeatureFlags()); CommandConnect connect = connectBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.CONNECT).setConnect(connect)); connect.recycle(); @@ -216,6 +229,7 @@ public static ByteBuf newConnect(String authMethodName, AuthData authData, int p connectBuilder.setOriginalAuthMethod(originalAuthMethod); } connectBuilder.setProtocolVersion(protocolVersion); + connectBuilder.setFeatureFlags(getFeatureFlags()); CommandConnect connect = connectBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.CONNECT).setConnect(connect)); connect.recycle(); @@ -258,9 +272,11 @@ public static ByteBuf newAuthChallenge(String authMethod, AuthData brokerData, i challengeBuilder.setProtocolVersion(versionToAdvertise); + byte[] authData = brokerData != null ? brokerData.getBytes() : new byte[0]; + CommandAuthChallenge challenge = challengeBuilder .setChallenge(PulsarApi.AuthData.newBuilder() - .setAuthData(copyFrom(brokerData.getBytes())) + .setAuthData(copyFrom(authData)) .setAuthMethodName(authMethod) .build()) .build(); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 8a44fca60875a..43a813c3b1710 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -120,7 +120,7 @@ message MessageMetadata { optional bool partition_key_b64_encoded = 17 [ default = false ]; // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. optional bytes ordering_key = 18; - + // Mark the message to be delivered at or after the specified timestamp optional int64 deliver_at_time = 19; @@ -235,6 +235,12 @@ message CommandConnect { optional string original_auth_data = 8; optional string original_auth_method = 9; + // Feature flags + optional FeatureFlags feature_flags = 10; +} + +message FeatureFlags { + optional bool supports_auth_refresh = 1 [default = false]; } message CommandConnected { @@ -290,7 +296,7 @@ message CommandSubscribe { // Signal wether the subscription should be backed by a // durable cursor or not optional bool durable = 8 [default = true]; - + // If specified, the subscription will position the cursor // markd-delete position on the particular message id and // will send messages from that point @@ -314,15 +320,15 @@ message CommandSubscribe { // to periodically sync the state of replicated subscriptions // across different clusters (when using geo-replication). optional bool replicate_subscription_state = 14; - - // If true, the subscribe operation will cause a topic to be + + // If true, the subscribe operation will cause a topic to be // created if it does not exist already (and if topic auto-creation // is allowed by broker. - // If false, the subscribe operation will fail if the topic + // If false, the subscribe operation will fail if the topic // does not exist. optional bool force_topic_creation = 15 [default = true]; - // If specified, the subscription will reset cursor's position back + // If specified, the subscription will reset cursor's position back // to specified seconds and will send messages from that point optional uint64 start_message_rollback_duration_sec = 16 [default = 0]; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java index 118cb3ba252a2..ad0fa83ce6c68 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java @@ -24,11 +24,9 @@ import java.util.Collections; import java.util.EnumSet; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; -import lombok.Cleanup; -import lombok.extern.slf4j.Slf4j; - import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; @@ -51,8 +49,12 @@ import org.testng.ITest; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; + @Slf4j public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTestBase implements ITest { @@ -64,24 +66,27 @@ public abstract class PulsarTokenAuthenticationBaseSuite extends PulsarClusterTe protected abstract void configureBroker(BrokerContainer brokerContainer) throws Exception; protected abstract void configureProxy(ProxyContainer proxyContainer) throws Exception; + protected abstract String createClientTokenWithExpiry(long expiryTime, TimeUnit unit) throws Exception; + protected static final String SUPER_USER_ROLE = "super-user"; protected static final String PROXY_ROLE = "proxy"; protected static final String REGULAR_USER_ROLE = "client"; + protected ZKContainer cmdContainer; + @BeforeSuite @Override public void setupCluster() throws Exception { // Before starting the cluster, generate the secret key and the token // Use Zk container to have 1 container available before starting the cluster - try (ZKContainer zkContainer = new ZKContainer<>("cli-setup")) { - zkContainer + this.cmdContainer = new ZKContainer<>("cli-setup"); + cmdContainer .withNetwork(Network.newNetwork()) .withNetworkAliases(ZKContainer.NAME) .withEnv("zkServers", ZKContainer.NAME); - zkContainer.start(); + cmdContainer.start(); - createKeysAndTokens(zkContainer); - } + createKeysAndTokens(cmdContainer); final String clusterName = Stream.of(this.getClass().getSimpleName(), randomName(5)) .filter(s -> s != null && !s.isEmpty()) @@ -108,6 +113,8 @@ public void setupCluster() throws Exception { brokerContainer.withEnv("superUserRoles", SUPER_USER_ROLE + "," + PROXY_ROLE); brokerContainer.withEnv("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName()); brokerContainer.withEnv("brokerClientAuthenticationParameters", "token:" + superUserAuthToken); + brokerContainer.withEnv("authenticationRefreshCheckSeconds", "1"); + brokerContainer.withEnv("authenticateOriginalAuthData", "true"); } ProxyContainer proxyContainer = pulsarCluster.getProxy(); @@ -118,6 +125,7 @@ public void setupCluster() throws Exception { proxyContainer.withEnv("authorizationEnabled", "true"); proxyContainer.withEnv("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName()); proxyContainer.withEnv("brokerClientAuthenticationParameters", "token:" + proxyAuthToken); + proxyContainer.withEnv("forwardAuthorizationCredentials", "true"); pulsarCluster.start(); @@ -128,6 +136,7 @@ public void setupCluster() throws Exception { @Override public void tearDownCluster() { super.tearDownCluster(); + cmdContainer.close(); } @Override @@ -253,4 +262,71 @@ public void testProxyRedirectWithTokenAuth() throws Exception { admin.topics().getStats(topic); } } + + @DataProvider(name = "shouldRefreshToken") + public static Object[][] shouldRefreshToken() { + return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; + } + + @Test(dataProvider = "shouldRefreshToken") + public void testExpiringToken(boolean shouldRefreshToken) throws Exception { + final String tenant = "token-test-tenant" + randomName(4); + final String namespace = tenant + "/ns-1"; + final String topic = "persistent://" + namespace + "/topic-1"; + + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(pulsarCluster.getHttpServiceUrl()) + .authentication(AuthenticationFactory.token(superUserAuthToken)) + .build(); + + admin.tenants().createTenant(tenant, + new TenantInfo(Collections.singleton(REGULAR_USER_ROLE), + Collections.singleton(pulsarCluster.getClusterName()))); + + admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName())); + admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE, EnumSet.allOf(AuthAction.class)); + + String initialToken = this.createClientTokenWithExpiry(5, TimeUnit.SECONDS); + + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .authentication(AuthenticationFactory.token(() -> { + if (shouldRefreshToken) { + try { + return createClientTokenWithExpiry(5, TimeUnit.SECONDS); + } catch (Exception e) { + return null; + } + } else { + return initialToken; + } + })) + .build(); + + @Cleanup + Producer producer = client.newProducer(Schema.STRING) + .topic(topic) + .sendTimeout(1, TimeUnit.SECONDS) + .create(); + + // Initially the token is valid and producer will be able to publish + producer.send("hello-1"); + + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + + if (shouldRefreshToken) { + // The token will have been refreshed, so the app won't see any error + producer.send("hello-2"); + } else { + // The token has expired, so this next message will be rejected + try { + producer.send("hello-2"); + fail("Publish should have failed"); + } catch (PulsarClientException e) { + // Expected + } + } + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java index 7f1a03aa9cd8b..f252e29f140d5 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithPublicPrivateKeys.java @@ -21,6 +21,7 @@ import com.google.common.io.Files; import java.io.File; +import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; @@ -87,4 +88,14 @@ protected void configureProxy(ProxyContainer proxyContainer) throws Exception { proxyContainer.withFileSystemBind(publicKeyFile.toString(), PUBLIC_KEY_PATH_INSIDE_CONTAINER); proxyContainer.withEnv("tokenPublicKey", "file://" + PUBLIC_KEY_PATH_INSIDE_CONTAINER); } + + @Override + protected String createClientTokenWithExpiry(long expiryTime, TimeUnit unit) throws Exception { + return cmdContainer + .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create", + "--private-key", "file://" + PRIVATE_KEY_PATH_INSIDE_CONTAINER, + "--subject", REGULAR_USER_ROLE, + "--expiry-time", unit.toSeconds(expiryTime) + "s") + .getStdout().trim(); + } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java index 96efa6c2ec13f..8a4a19da12fd6 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/TokenAuthWithSymmetricKeys.java @@ -18,13 +18,15 @@ */ package org.apache.pulsar.tests.integration.auth.token; -import lombok.extern.slf4j.Slf4j; +import java.util.concurrent.TimeUnit; import org.apache.pulsar.tests.integration.containers.BrokerContainer; import org.apache.pulsar.tests.integration.containers.ProxyContainer; import org.apache.pulsar.tests.integration.containers.PulsarContainer; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import lombok.extern.slf4j.Slf4j; + @Slf4j public class TokenAuthWithSymmetricKeys extends PulsarTokenAuthenticationBaseSuite { @@ -70,4 +72,13 @@ protected void configureProxy(ProxyContainer proxyContainer) throws Exception { proxyContainer.withEnv("tokenSecretKey", "data:base64," + secretKey); } + @Override + protected String createClientTokenWithExpiry(long expiryTime, TimeUnit unit) throws Exception { + return cmdContainer + .execCmd(PulsarCluster.PULSAR_COMMAND_SCRIPT, "tokens", "create", + "--secret-key", "data:base64," + secretKey, + "--subject", REGULAR_USER_ROLE, + "--expiry-time", unit.toSeconds(expiryTime) + "s") + .getStdout().trim(); + } }