Skip to content

Commit

Permalink
[Broker] Adjust the validation for policy schemaCompatibilityStrategy (
Browse files Browse the repository at this point in the history
  • Loading branch information
yuruguo authored Feb 9, 2022
1 parent 6e859dd commit 4886528
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
Expand Down Expand Up @@ -772,7 +774,9 @@ protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncRespon
}

protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return validateTopicOperationAsync(topicName, TopicOperation.GET_SCHEMA_COMPATIBILITY_STRATEGY)
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose((__) -> {
CompletableFuture<SchemaCompatibilityStrategy> future;
if (config().isTopicLevelPoliciesEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4856,7 +4856,9 @@ protected CompletableFuture<SchemaCompatibilityStrategy> internalGetSchemaCompat
if (applied) {
return getSchemaCompatibilityStrategyAsync();
}
return validateTopicOperationAsync(topicName, TopicOperation.GET_SCHEMA_COMPATIBILITY_STRATEGY)
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ)
.thenCompose(n -> getTopicPoliciesAsyncWithRetry(topicName).thenApply(op -> {
if (!op.isPresent()) {
return null;
Expand All @@ -4867,7 +4869,9 @@ protected CompletableFuture<SchemaCompatibilityStrategy> internalGetSchemaCompat
}

protected CompletableFuture<Void> internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrategy strategy) {
return validateTopicOperationAsync(topicName, TopicOperation.SET_SCHEMA_COMPATIBILITY_STRATEGY)
return validateTopicPolicyOperationAsync(topicName,
PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.WRITE)
.thenCompose((__) -> getTopicPoliciesAsyncWithRetry(topicName)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
Expand Down Expand Up @@ -454,6 +456,93 @@ public void testClearBacklogPermission() throws Exception {
log.info("-- Exiting {} test --", methodName);
}

@Test
public void testSchemaCompatibilityStrategyPermission() throws Exception {
log.info("-- Starting {} test --", methodName);

conf.setSystemTopicEnabled(true);
conf.setTopicLevelPoliciesEnabled(true);
conf.setAnonymousUserRole("superUser");
conf.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
setup();

final String tenantRole = "tenant-role";
final String generalRole = "general-role";
final String namespace = "my-property/my-ns-sub-auth";
final String topicName = "persistent://" + namespace + "/my-topic";

Authentication adminAuthentication = new ClientAuthentication("superUser");
@Cleanup
PulsarAdmin superAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(adminAuthentication).build());

Authentication tenantAdminAuthentication = new ClientAuthentication(tenantRole);
@Cleanup
PulsarAdmin tenantAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(tenantAdminAuthentication).build());

Authentication generalAdminAuthentication = new ClientAuthentication(generalRole);
@Cleanup
PulsarAdmin generalAdmin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
.authentication(generalAdminAuthentication).build());

superAdmin.clusters().createCluster("test",
ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
superAdmin.tenants().createTenant("my-property",
new TenantInfoImpl(Sets.newHashSet(tenantRole), Sets.newHashSet("test")));
superAdmin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
superAdmin.topics().createPartitionedTopic(topicName, 1);

// grant topic produce authorization to the generalRole
superAdmin.topics().grantPermission(topicName, generalRole,
Collections.singleton(AuthAction.produce));
replacePulsarClient(PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.authentication(generalAdminAuthentication));

@Cleanup
Producer<byte[]> batchProducer = pulsarClient.newProducer().topic(topicName).create();
batchProducer.close();

// generalRole doesn't have permission to access topic policy, so it will fail to write/read topic policy
try {
generalAdmin.topicPolicies().setSchemaCompatibilityStrategy(topicName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Unauthorized to validateTopicPolicyOperation " +
"for operation [WRITE] on topic [" + topicName + "] on policy [SCHEMA_COMPATIBILITY_STRATEGY]"));
}
try {
generalAdmin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Unauthorized to validateTopicPolicyOperation " +
"for operation [READ] on topic [" + topicName + "] on policy [SCHEMA_COMPATIBILITY_STRATEGY]"));
}
try {
generalAdmin.topicPolicies().getSchemaCompatibilityStrategy(topicName, false);
fail("should have failed with authorization exception");
} catch (Exception e) {
assertTrue(e.getMessage().startsWith("Unauthorized to validateTopicPolicyOperation " +
"for operation [READ] on topic [" + topicName + "] on policy [SCHEMA_COMPATIBILITY_STRATEGY]"));
}

// The superUser or tenantAdministrator can access topic policy, so it can successfully write/read topic policy
superAdmin.topicPolicies().setSchemaCompatibilityStrategy(topicName,
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE);
Awaitility.await().untilAsserted(() -> assertEquals(
superAdmin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true),
SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE));
tenantAdmin.topicPolicies().setSchemaCompatibilityStrategy(topicName,
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
Awaitility.await().untilAsserted(() -> assertEquals(
tenantAdmin.topicPolicies().getSchemaCompatibilityStrategy(topicName, true),
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE));

log.info("-- Exiting {} test --", methodName);
}

@Test
public void testSubscriptionPrefixAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,4 @@ public enum TopicOperation {

SET_REPLICATED_SUBSCRIPTION_STATUS,
GET_REPLICATED_SUBSCRIPTION_STATUS,

SET_SCHEMA_COMPATIBILITY_STRATEGY,
GET_SCHEMA_COMPATIBILITY_STRATEGY,
}

0 comments on commit 4886528

Please sign in to comment.