Skip to content

Commit

Permalink
[fix][broker] Producer/Consumer should call allowTopicOperationAsync (a…
Browse files Browse the repository at this point in the history
…pache#20142)

Fixes: apache#20066

### Motivation

In apache#20068 we changed the way that the `AuthorizationService` is implemented. I think we should change the `Consumer` and the `Producer` logic to call the correct `AuthorizationService` method.

Given that our goal is to deprecate the `AuthorizationService` methods for `canProduce` and `canConsume`, this change helps us move in the right direction.

### Modifications

* Update `Producer` and `Consumer` in broker to call the `AuthorizationService#allowTopicOperationAsync` method.

### Verifying this change

This change is trivial.

### Documentation

- [x] `doc-not-needed`

### Matching PR in forked repository

PR in forked repository: Skipping PR as I ran tests locally.
  • Loading branch information
michaeljmarshall authored Apr 20, 2023
1 parent d3fa998 commit dc5e497
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -56,6 +57,7 @@
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaType;
Expand Down Expand Up @@ -901,8 +903,10 @@ public String toString() {
public CompletableFuture<Void> checkPermissionsAsync() {
TopicName topicName = TopicName.get(subscription.getTopicName());
if (cnx.getBrokerService().getAuthorizationService() != null) {
return cnx.getBrokerService().getAuthorizationService().canConsumeAsync(topicName, appId,
cnx.getAuthenticationData(), subscription.getName())
AuthenticationDataSubscription authData =
new AuthenticationDataSubscription(cnx.getAuthenticationData(), subscription.getName());
return cnx.getBrokerService().getAuthorizationService()
.allowTopicOperationAsync(topicName, TopicOperation.CONSUME, appId, authData)
.handle((ok, e) -> {
if (e != null) {
log.warn("[{}] Get unexpected error while authorizing [{}] {}", appId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData.ClusterUrl;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
Expand Down Expand Up @@ -781,7 +782,7 @@ public CompletableFuture<Void> checkPermissionsAsync() {
TopicName topicName = TopicName.get(topic.getName());
if (cnx.getBrokerService().getAuthorizationService() != null) {
return cnx.getBrokerService().getAuthorizationService()
.canProduceAsync(topicName, appId, cnx.getAuthenticationData())
.allowTopicOperationAsync(topicName, TopicOperation.PRODUCE, appId, cnx.getAuthenticationData())
.handle((ok, ex) -> {
if (ex != null) {
log.warn("[{}] Get unexpected error while autorizing [{}] {}", appId, topic.getName(),
Expand Down

0 comments on commit dc5e497

Please sign in to comment.