Skip to content

Commit

Permalink
Handle NotAllowed Exception at the client side. (apache#7430)
Browse files Browse the repository at this point in the history
* Handle NotAllowed Exception at the client side.
  • Loading branch information
codelipenghui authored Jul 17, 2020
1 parent 73f3632 commit f8b2a23
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ private static PulsarApi.ServerError getClientErrorCode(Throwable t, boolean che
return ServerError.TransactionCoordinatorNotFound;
} else if (t instanceof CoordinatorException.InvalidTxnStatusException) {
return ServerError.InvalidTxnStatus;
} else if (t instanceof NotAllowedException) {
return ServerError.NotAllowedError;
} else {
if (checkCauseIfUnknown) {
return getClientErrorCode(t.getCause(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void testOrderingKeyWithHashRangeExclusiveStickyKeyConsumerSelector(boole
receiveAndCheck(checkList);
}

@Test(expectedExceptions = PulsarClientException.class)
@Test(expectedExceptions = PulsarClientException.NotAllowedException.class)
public void testDisableKeySharedSubscription() throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(false);
String topic = "persistent://public/default/key_shared_disabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,23 @@ public NotSupportedException(String msg) {
}
}

/**
* Not allowed exception thrown by Pulsar client.
*/
public static class NotAllowedException extends PulsarClientException {

/**
* Constructs an {@code NotAllowedException} with the specified detail message.
*
* @param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public NotAllowedException(String msg) {
super(msg);
}
}

/**
* Full producer queue error thrown by Pulsar client.
*/
Expand Down Expand Up @@ -790,6 +807,8 @@ public static Throwable wrap(Throwable t, String msg) {
return new InvalidTopicNameException(msg);
} else if (t instanceof NotSupportedException) {
return new NotSupportedException(msg);
} else if (t instanceof NotAllowedException) {
return new NotAllowedException(msg);
} else if (t instanceof ProducerQueueIsFullError) {
return new ProducerQueueIsFullError(msg);
} else if (t instanceof ProducerBlockedQuotaExceededError) {
Expand Down Expand Up @@ -873,6 +892,8 @@ public static PulsarClientException unwrap(Throwable t) {
return new InvalidTopicNameException(msg);
} else if (cause instanceof NotSupportedException) {
return new NotSupportedException(msg);
} else if (cause instanceof NotAllowedException) {
return new NotAllowedException(msg);
} else if (cause instanceof ProducerQueueIsFullError) {
return new ProducerQueueIsFullError(msg);
} else if (cause instanceof ProducerBlockedQuotaExceededError) {
Expand Down Expand Up @@ -911,6 +932,7 @@ public static boolean isRetriableError(Throwable t) {
|| t instanceof InvalidMessageException
|| t instanceof InvalidTopicNameException
|| t instanceof NotSupportedException
|| t instanceof NotAllowedException
|| t instanceof ChecksumException
|| t instanceof CryptoException
|| t instanceof ConsumerAssignException
Expand Down
1 change: 1 addition & 0 deletions pulsar-client-cpp/include/pulsar/Result.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ enum Result
/// Shared and Key_Shared subscription mode
ResultTransactionCoordinatorNotFoundError, /// Transaction coordinator not found
ResultInvalidTxnStatusError, /// Invalid txn status error
ResultNotAllowedError, /// Not allowed
};

// Return string representation of result code
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ static Result getResult(ServerError serverError) {

case InvalidTxnStatus:
return ResultInvalidTxnStatusError;

case NotAllowedError:
return ResultNotAllowedError;
}
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
Expand Down
3 changes: 3 additions & 0 deletions pulsar-client-cpp/lib/Result.cc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ const char* strResult(Result result) {

case ResultInvalidTxnStatusError:
return "ResultInvalidTxnStatusError";

case ResultNotAllowedError:
return "ResultNotAllowedError";
};
// NOTE : Do not add default case in the switch above. In future if we get new cases for
// ServerError and miss them in the switch above we would like to get notified. Adding
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,8 @@ private PulsarClientException getPulsarClientException(ServerError error, String
return new PulsarClientException.TopicDoesNotExistException(errorMsg);
case ConsumerAssignError:
return new PulsarClientException.ConsumerAssignException(errorMsg);
case NotAllowedError:
return new PulsarClientException.NotAllowedException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pulsar-common/src/main/proto/PulsarApi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ enum ServerError {

TransactionCoordinatorNotFound = 20; // Transaction coordinator not found error
InvalidTxnStatus = 21; // Invalid txn status error
NotAllowedError = 22; // Not allowed error
}

enum AuthMethod {
Expand Down

0 comments on commit f8b2a23

Please sign in to comment.