Skip to content

Commit

Permalink
check null to avoid NPE for internalSetTopicPolicies (apache#8201)
Browse files Browse the repository at this point in the history
### Motivation & Changes
1. add null check for topic policies to avoid NPE
2. add boundary check for `MaxUnackedMessagesOnSubscription` and `SetMaxUnackedMessagesOnConsumer`
  • Loading branch information
hangc0276 authored Oct 5, 2020
1 parent 6711b71 commit 7a1be70
Showing 1 changed file with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -899,6 +899,11 @@ private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPolicies of
}

protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) {
if (maxUnackedNum != null && maxUnackedNum < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedNum must be 0 or more");
}

TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
Expand All @@ -914,6 +919,11 @@ protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(In
}

protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) {
if (maxUnackedNum != null && maxUnackedNum < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedNum must be 0 or more");
}

TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
Expand Down Expand Up @@ -2308,7 +2318,7 @@ protected CompletableFuture<Void> internalSetDeduplicationEnabled(Boolean enable

protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInSecond) {
//Validate message ttl value.
if (ttlInSecond != null && ttlInSecond.intValue() < 0) {
if (ttlInSecond != null && ttlInSecond < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
}
validateAdminAccessForTenant(namespaceName.getTenant());
Expand Down Expand Up @@ -2477,16 +2487,18 @@ protected Optional<Integer> internalGetMaxProducers() {
}

protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers) {
if (maxProducers != null && maxProducers < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxProducers must be 0 or more");
}

validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
if (maxProducers < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxProducers must be 0 or more");
}

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxProducerPerTopic(maxProducers);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
Expand Down Expand Up @@ -2518,16 +2530,18 @@ protected Optional<Integer> internalGetMaxConsumers() {
}

protected CompletableFuture<Void> internalSetMaxConsumers(Integer maxConsumers) {
if (maxConsumers != null && maxConsumers < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxConsumers must be 0 or more");
}

validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
if (maxConsumers < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxConsumers must be 0 or more");
}

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxConsumerPerTopic(maxConsumers);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
Expand Down

0 comments on commit 7a1be70

Please sign in to comment.