From c1f37d2b881ddebc5bbf000cff590896228e32b1 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Wed, 22 May 2024 18:06:38 +0800 Subject: [PATCH] improve auth success when lookup tc topic --- .../PulsarAuthorizationProvider.java | 4 ++ ...icatedTransactionProducerConsumerTest.java | 56 +++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java index a39c3d0560760..214fe42cae5a3 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java @@ -33,6 +33,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.broker.resources.PulsarResources; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.AuthPolicies; @@ -155,6 +156,9 @@ public CompletableFuture canConsumeAsync(TopicName topicName, String ro @Override public CompletableFuture canLookupAsync(TopicName topicName, String role, AuthenticationDataSource authenticationData) { + if (SystemTopicNames.isTransactionCoordinatorAssign(topicName)) { + return CompletableFuture.completedFuture(true); + } return canProduceAsync(topicName, role, authenticationData) .thenCompose(canProduce -> { if (canProduce) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java index aa9102189d430..05dd3c3011ab0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/AuthenticatedTransactionProducerConsumerTest.java @@ -317,6 +317,62 @@ public void testNoAuth() throws Exception { } } + @Test + public void testTcLookupAuth() throws Exception { + String role = "test"; + String tcSystemTopic = "persistent://pulsar/system/transaction_coordinator_assign"; + admin.topics().grantPermission(TOPIC, role, Sets.newHashSet(AuthAction.produce, AuthAction.consume)); + + @Cleanup + final PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarServiceList.get(0).getBrokerServiceUrl()) + .authentication(AuthenticationFactory.token(generateToken(kp, "test"))) + .enableTransaction(true) + .build(); + + try { + @Cleanup final Producer producer = pulsarClient + .newProducer(Schema.STRING) + .sendTimeout(60, TimeUnit.SECONDS) + .topic(TOPIC) + .create(); + + Transaction transaction = pulsarClient.newTransaction() + .withTransactionTimeout(60, TimeUnit.SECONDS).build().get(); + producer.newMessage(transaction).value("message").send(); + final Throwable ex = syncGetException(( + (PulsarClientImpl) pulsarClient).getTcClient().commitAsync(transaction.getTxnID()) + ); + Assert.assertNull(ex); + } catch (Exception e) { + Assert.fail(); + } + + try { + @Cleanup final Producer producer = pulsarClient + .newProducer(Schema.STRING) + .sendTimeout(60, TimeUnit.SECONDS) + .topic(tcSystemTopic) + .create(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof PulsarClientException.AuthorizationException); + Assert.assertTrue(e.getMessage().contains("Client is not authorized to Produce")); + } + + try { + @Cleanup final Consumer consumer = pulsarClient + .newConsumer(Schema.STRING) + .subscriptionName("test") + .topic(tcSystemTopic) + .subscribe(); + Assert.fail(); + } catch (Exception e) { + Assert.assertTrue(e instanceof PulsarClientException.AuthorizationException); + Assert.assertTrue(e.getMessage().contains("Client is not authorized to subscribe")); + } + } + private static Throwable syncGetException(CompletableFuture future) { try { future.get();