Skip to content

Commit

Permalink
Fix authorization error if partition number of partitioned topic is u…
Browse files Browse the repository at this point in the history
…pdated. (apache#10300) (apache#10333)

Fixes apache#10300 

### Motivation

Fix the bug that after updating the partition number of  a partitioned topic, which has topic level auth policy, new producer/consumer of this topic will get error.

### Modifications

In [`org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider#checkPermission`](https://github.com/apache/pulsar/blob/889b9b8e5efc62d2d0cbc761205fba5759c97af0/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java#L394), if current `topicName` is a sub partition topic, also check the permissions of its partitioned topic.
  • Loading branch information
dragonls authored May 6, 2021
1 parent def1932 commit c1d5162
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -434,16 +434,33 @@ public CompletableFuture<Boolean> checkPermission(TopicName topicName, String ro
return;
}
}

// If the partition number of the partitioned topic having topic level policy is updated,
// the new sub partitions may not inherit the policy of the partition topic.
// We can also check the permission of partitioned topic.
// For https://github.com/apache/pulsar/issues/10300
if (topicName.isPartitioned()) {
topicRoles = policies.get().auth_policies.destination_auth.get(topicName.getPartitionedTopicName());
if (topicRoles != null) {
// Topic has custom policy
Set<AuthAction> topicActions = topicRoles.get(role);
if (topicActions != null && topicActions.contains(action)) {
// The role has topic level permission
permissionFuture.complete(true);
return;
}
}
}
}
permissionFuture.complete(false);
}).exceptionally(ex -> {
log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
ex.getMessage());
permissionFuture.completeExceptionally(ex);
return null;
});
} catch (Exception e) {
log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
log.warn("Client with Role - {} failed to get permissions for topic - {}. {}", role, topicName,
e.getMessage());
permissionFuture.completeExceptionally(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,97 @@ public void testProxyAuthorization() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

/**
* <pre>
* 1. Create a 2-partition topic and grant produce/consume permission to client role.
* 2. Use producer/consumer with client role to process the topic, which is fine.
* 2. Update the topic partition number to 4.
* 3. Use new producer/consumer with client role to process the topic.
* 4. Broker should authorize producer/consumer normally.
* </pre>
*/
@Test
public void testUpdatePartitionNumAndReconnect() throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
createAdminClient();
PulsarClient proxyClient = createPulsarClient(proxyService.getServiceUrl(), PulsarClient.builder());

String clusterName = "proxy-authorization";
String namespaceName = "my-property/my-ns";
String topicName = "persistent://my-property/my-ns/my-topic1";
String subscriptionName = "my-subscriber-name";

admin.clusters().createCluster(clusterName, new ClusterData(brokerUrl.toString()));

admin.tenants().createTenant("my-property",
new TenantInfo(Sets.newHashSet(), Sets.newHashSet(clusterName)));
admin.namespaces().createNamespace(namespaceName);
admin.topics().createPartitionedTopic(topicName, 2);
admin.topics().grantPermission(topicName, CLIENT_ROLE,
Sets.newHashSet(AuthAction.consume, AuthAction.produce));

Consumer<byte[]> consumer = proxyClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName).subscribe();

Producer<byte[]> producer = proxyClient.newProducer(Schema.BYTES)
.topic(topicName).create();
final int MSG_NUM = 10;
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < MSG_NUM; i++) {
String message = "my-message-" + i;
messageSet.add(message);
producer.send(message.getBytes());
}

Message<byte[]> msg;
Set<String> receivedMessageSet = Sets.newHashSet();
for (int i = 0; i < MSG_NUM; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
receivedMessageSet.add(expectedMessage);
consumer.acknowledgeAsync(msg);
}
Assert.assertEquals(messageSet, receivedMessageSet);
consumer.close();
producer.close();

// update partition num
admin.topics().updatePartitionedTopic(topicName, 4);

// produce/consume the topic again
consumer = proxyClient.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName).subscribe();
producer = proxyClient.newProducer(Schema.BYTES)
.topic(topicName).create();

messageSet.clear();
for (int i = 0; i < MSG_NUM; i++) {
String message = "my-message-" + i;
messageSet.add(message);
producer.send(message.getBytes());
}

receivedMessageSet.clear();
for (int i = 0; i < MSG_NUM; i++) {
msg = consumer.receive(5, TimeUnit.SECONDS);
String receivedMessage = new String(msg.getData());
log.debug("Received message: [{}]", receivedMessage);
String expectedMessage = "my-message-" + i;
receivedMessageSet.add(expectedMessage);
consumer.acknowledgeAsync(msg);
}
Assert.assertEquals(messageSet, receivedMessageSet);
consumer.close();
producer.close();
log.info("-- Exiting {} test --", methodName);
}

/**
* <pre>
* It verifies jwt + Authentication + Authorization (client -> proxy -> broker).
Expand Down

0 comments on commit c1d5162

Please sign in to comment.