Skip to content

Commit

Permalink
Return correct authz and auth errors from proxy to client (apache#9055)
Browse files Browse the repository at this point in the history
Co-authored-by: Jerry Peng <[email protected]>
  • Loading branch information
jerrypeng and Jerry Peng authored Dec 26, 2020
1 parent 2c3e33b commit 8574e58
Showing 1 changed file with 15 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Optional;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetSchema;
Expand Down Expand Up @@ -231,7 +232,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par
log.warn("[{}] Failed to get partitioned metadata for topic {} {}", clientAddress, topicName,
ex.getMessage(), ex);
proxyConnection.ctx().writeAndFlush(Commands.newPartitionMetadataResponse(
ServerError.ServiceNotReady, ex.getMessage(), clientRequestId));
getServerError(ex), ex.getMessage(), clientRequestId));
return null;
});
} else {
Expand Down Expand Up @@ -259,7 +260,7 @@ private void handlePartitionMetadataResponse(CommandPartitionedTopicMetadata par
if (t != null) {
log.warn("[{}] failed to get Partitioned metadata : {}", topicName.toString(),
t.getMessage(), t);
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(getServerError(t),
t.getMessage(), clientRequestId));
} else {
proxyConnection.ctx().writeAndFlush(
Expand Down Expand Up @@ -443,5 +444,17 @@ private InetSocketAddress getAddr(String brokerServiceUrl, long clientRequestId)
return InetSocketAddress.createUnresolved(brokerURI.getHost(), brokerURI.getPort());
}

private ServerError getServerError(Throwable error) {
ServerError responseError;
if (error instanceof PulsarClientException.AuthorizationException) {
responseError = ServerError.AuthorizationError;
} else if (error instanceof PulsarClientException.AuthenticationException) {
responseError = ServerError.AuthenticationError;
} else {
responseError = ServerError.ServiceNotReady;
}
return responseError;
}

private static final Logger log = LoggerFactory.getLogger(LookupProxyHandler.class);
}

0 comments on commit 8574e58

Please sign in to comment.