Skip to content

Commit

Permalink
Fix in ServerCnx to prevent using recycled commands (apache#1264)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jai Asher authored and jai1 committed Feb 21, 2018
1 parent 44fd826 commit 5d14788
Showing 1 changed file with 10 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ private boolean invalidOriginalPrincipal(String originalPrincipal) {
@Override
protected void handleLookup(CommandLookupTopic lookup) {
final long requestId = lookup.getRequestId();

final boolean authoritative = lookup.getAuthoritative();
if (log.isDebugEnabled()) {
log.debug("[{}] Received Lookup from {} for {}", lookup.getTopic(), remoteAddress, requestId);
}
Expand Down Expand Up @@ -254,9 +254,9 @@ protected void handleLookup(CommandLookupTopic lookup) {
String finalOriginalPrincipal = originalPrincipal;
isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
if (isProxyAuthorized) {
lookupDestinationAsync(getBrokerService().pulsar(), topicName, lookup.getAuthoritative(),
lookupDestinationAsync(getBrokerService().pulsar(), topicName, authoritative,
finalOriginalPrincipal != null ? finalOriginalPrincipal : authRole, authenticationData,
lookup.getRequestId()).handle((lookupResponse, ex) -> {
requestId).handle((lookupResponse, ex) -> {
if (ex == null) {
ctx.writeAndFlush(lookupResponse);
} else {
Expand Down Expand Up @@ -550,7 +550,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(),
subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex())
: null;

final String subscription = subscribe.getSubscription();
final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
final boolean readCompacted = subscribe.getReadCompacted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
Expand All @@ -568,7 +568,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
if (service.isAuthorizationEnabled()) {
authorizationFuture = service.getAuthorizationService().canConsumeAsync(topicName,
originalPrincipal != null ? originalPrincipal : authRole, authenticationData,
subscribe.getSubscription());
subscription);
} else {
authorizationFuture = CompletableFuture.completedFuture(true);
}
Expand Down Expand Up @@ -995,13 +995,13 @@ protected void handleUnsubscribe(CommandUnsubscribe unsubscribe) {
@Override
protected void handleSeek(CommandSeek seek) {
checkArgument(state == State.Connected);

final long requestId = seek.getRequestId();
CompletableFuture<Consumer> consumerFuture = consumers.get(seek.getConsumerId());

// Currently only seeking on a message id is supported
if (!seek.hasMessageId()) {
ctx.writeAndFlush(
Commands.newError(seek.getRequestId(), ServerError.MetadataError, "Message id was not present"));
Commands.newError(requestId, ServerError.MetadataError, "Message id was not present"));
return;
}

Expand All @@ -1011,20 +1011,20 @@ protected void handleSeek(CommandSeek seek) {
MessageIdData msgIdData = seek.getMessageId();

Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());
long requestId = seek.getRequestId();


subscription.resetCursor(position).thenRun(() -> {
log.info("[{}] [{}][{}] Reset subscription to message id {}", remoteAddress,
subscription.getTopic().getName(), subscription.getName(), position);
ctx.writeAndFlush(Commands.newSuccess(requestId));
}).exceptionally(ex -> {
log.warn("[{}][{}] Failed to reset subscription: {}", remoteAddress, subscription, ex.getMessage(), ex);
ctx.writeAndFlush(Commands.newError(seek.getRequestId(), ServerError.UnknownError,
ctx.writeAndFlush(Commands.newError(requestId, ServerError.UnknownError,
"Error when resetting subscription: " + ex.getCause().getMessage()));
return null;
});
} else {
ctx.writeAndFlush(Commands.newError(seek.getRequestId(), ServerError.MetadataError, "Consumer not found"));
ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Consumer not found"));
}
}

Expand Down

0 comments on commit 5d14788

Please sign in to comment.