Skip to content

Commit

Permalink
Fix topic ownership is not checked when get topic policy (apache#9767)
Browse files Browse the repository at this point in the history
### Motivation
1. Currently, the API of topic policies does not check the ownership of topic. In the case of multiple brokers, a `cache not init error` will appear.
  • Loading branch information
315157973 authored Mar 3, 2021
1 parent e88da2a commit 38a1f1d
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2505,15 +2505,9 @@ protected PersistentOfflineTopicStats internalGetBacklog(boolean authoritative)

protected void internalSetBacklogQuota(AsyncResponse asyncResponse,
BacklogQuota.BacklogQuotaType backlogQuotaType, BacklogQuota backlogQuota) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
if (backlogQuotaType == null) {
backlogQuotaType = BacklogQuota.BacklogQuotaType.destination_storage;
}
checkTopicLevelPolicyEnable();
TopicPolicies topicPolicies;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
Expand Down Expand Up @@ -2594,7 +2588,6 @@ protected void internalSetMessageTTL(AsyncResponse asyncResponse, Integer ttlInS
if (ttlInSecond != null && ttlInSecond < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for message TTL");
}
preValidation();
TopicPolicies topicPolicies;
//Update existing topic policy or create a new one if not exist.
try {
Expand Down Expand Up @@ -2645,7 +2638,6 @@ protected void internalRemoveBacklogQuota(AsyncResponse asyncResponse,
}

protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied){
preValidation();
RetentionPolicies retentionPolicies = getTopicPolicies(topicName)
.map(TopicPolicies::getRetentionPolicies).orElseGet(() -> {
if (applied) {
Expand All @@ -2660,7 +2652,6 @@ protected void internalGetRetention(AsyncResponse asyncResponse, boolean applied
}

protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retention) {
preValidation();
if (retention == null) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -2685,7 +2676,6 @@ protected CompletableFuture<Void> internalSetRetention(RetentionPolicies retenti
}

protected CompletableFuture<Void> internalRemoveRetention() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -2695,12 +2685,10 @@ protected CompletableFuture<Void> internalRemoveRetention() {
}

protected Optional<PersistencePolicies> internalGetPersistence(){
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getPersistence);
}

protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies) {
preValidation();
validatePersistencePolicies(persistencePolicies);

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
Expand All @@ -2709,7 +2697,6 @@ protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies per
}

protected CompletableFuture<Void> internalRemovePersistence() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -2725,20 +2712,16 @@ protected CompletableFuture<Void> internalSetMaxMessageSize(Integer maxMessageSi
+ "and must be smaller than that in the broker-level");
}

preValidation();

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxMessageSize(maxMessageSize);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected Optional<Integer> internalGetMaxMessageSize() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getMaxMessageSize);
}

protected CompletableFuture<Integer> internalGetMaxProducers(boolean applied) {
preValidation();
Integer maxNum = getTopicPolicies(topicName)
.map(TopicPolicies::getMaxProducerPerTopic)
.orElseGet(() -> {
Expand All @@ -2756,16 +2739,12 @@ protected CompletableFuture<Void> internalSetMaxProducers(Integer maxProducers)
throw new RestException(Status.PRECONDITION_FAILED,
"maxProducers must be 0 or more");
}

preValidation();

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxProducerPerTopic(maxProducers);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected Optional<Integer> internalGetMaxSubscriptionsPerTopic() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getMaxSubscriptionsPerTopic);
}

Expand All @@ -2774,36 +2753,31 @@ protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer ma
throw new RestException(Status.PRECONDITION_FAILED,
"maxSubscriptionsPerTopic must be 0 or more");
}
preValidation();

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected Optional<DispatchRate> internalGetReplicatorDispatchRate() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getReplicatorDispatchRate);
}

protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRate dispatchRate) {
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setReplicatorDispatchRate(dispatchRate);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

private void preValidation() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
protected void preValidation() {
checkTopicLevelPolicyEnable();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
checkTopicLevelPolicyEnable();
validateTopicOwnership(topicName, false);
}

protected CompletableFuture<Void> internalRemoveMaxProducers() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -2813,7 +2787,6 @@ protected CompletableFuture<Void> internalRemoveMaxProducers() {
}

protected CompletableFuture<Integer> internalGetMaxConsumers(boolean applied) {
preValidation();
Integer maxNum = getTopicPolicies(topicName)
.map(TopicPolicies::getMaxConsumerPerTopic)
.orElseGet(() -> {
Expand All @@ -2831,15 +2804,13 @@ protected CompletableFuture<Void> internalSetMaxConsumers(Integer maxConsumers)
throw new RestException(Status.PRECONDITION_FAILED,
"maxConsumers must be 0 or more");
}
preValidation();

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setMaxConsumerPerTopic(maxConsumers);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalRemoveMaxConsumers() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -3661,12 +3632,10 @@ protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean aut
}

protected Optional<DispatchRate> internalGetDispatchRate() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getDispatchRate);
}

protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate dispatchRate) {
preValidation();
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -3677,7 +3646,6 @@ protected CompletableFuture<Void> internalSetDispatchRate(DispatchRate dispatchR
}

protected CompletableFuture<Void> internalRemoveDispatchRate() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -3688,12 +3656,10 @@ protected CompletableFuture<Void> internalRemoveDispatchRate() {
}

protected Optional<DispatchRate> internalGetSubscriptionDispatchRate() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionDispatchRate);
}

protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRate dispatchRate) {
preValidation();
if (dispatchRate == null) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -3704,7 +3670,6 @@ protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRa
}

protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -3715,15 +3680,13 @@ protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {


protected Optional<Integer> internalGetMaxConsumersPerSubscription() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getMaxConsumersPerSubscription);
}

protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription");
}
preValidation();

TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
Expand All @@ -3732,7 +3695,6 @@ protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer
}

protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -3742,15 +3704,13 @@ protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() {
}

protected Optional<Long> internalGetCompactionThreshold() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getCompactionThreshold);
}

protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) {
if (compactionThreshold != null && compactionThreshold < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
}
preValidation();

TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
Expand All @@ -3759,7 +3719,6 @@ protected CompletableFuture<Void> internalSetCompactionThreshold(Long compaction
}

protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -3769,13 +3728,11 @@ protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
}

protected Optional<PublishRate> internalGetPublishRate() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getPublishRate);

}

protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate) {
preValidation();
if (publishRate == null) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -3786,24 +3743,20 @@ protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate
}

protected Optional<List<SubType>> internalGetSubscriptionTypesEnabled() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getSubscriptionTypesEnabled);

}

protected CompletableFuture<Void> internalSetSubscriptionTypesEnabled(
Set<SubscriptionType> subscriptionTypesEnabled) {
List<SubType> subTypes = Lists.newArrayList();
subscriptionTypesEnabled.forEach(subscriptionType -> subTypes.add(SubType.valueOf(subscriptionType.name())));
preValidation();
TopicPolicies topicPolicies = getTopicPolicies(topicName)
.orElseGet(TopicPolicies::new);
topicPolicies.setSubscriptionTypesEnabled(subTypes);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalRemovePublishRate() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand All @@ -3813,12 +3766,10 @@ protected CompletableFuture<Void> internalRemovePublishRate() {
}

protected Optional<SubscribeRate> internalGetSubscribeRate() {
preValidation();
return getTopicPolicies(topicName).map(TopicPolicies::getSubscribeRate);
}

protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) {
preValidation();
if (subscribeRate == null) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -3829,7 +3780,6 @@ protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscri
}

protected CompletableFuture<Void> internalRemoveSubscribeRate() {
preValidation();
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
Expand Down
Loading

0 comments on commit 38a1f1d

Please sign in to comment.