Skip to content

Commit

Permalink
[BUG] Catch topic policy not hit exception in handleSubscribe (apache…
Browse files Browse the repository at this point in the history
…#10341)

### Motivation
When running KOP with topic policy, it get the following exception.
```
17:09:46.777 [mock-pulsar-bk:org.apache.pulsar.broker.service.ServerCnx@1010] WARN  org.apache.pulsar.broker.service.ServerCnx - [/127.0.0.1:55980][persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to create consumer: consumerId=0, Topic policies cache have not init.
java.util.concurrent.CompletionException: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
        at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_172]
        at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_172]
        at org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage.lambda$addEntry$32(BookkeeperSchemaStorage.java:566) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
        at org.apache.bookkeeper.client.PulsarMockLedgerHandle.lambda$asyncAddEntry$5(PulsarMockLedgerHandle.java:194) ~[testmocks-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) [?:1.8.0_172]
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) [?:1.8.0_172]
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) [?:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_172]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_172]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_172]
Caused by: org.apache.pulsar.broker.service.BrokerServiceException$TopicPoliciesCacheNotInitException: Topic policies cache have not init.
        at org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService.getTopicPolicies(SystemTopicBasedTopicPoliciesService.java:148) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.checkSubscriptionTypesEnable(PersistentTopic.java:2872) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
        at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:648) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
        at org.apache.pulsar.broker.service.ServerCnx.lambda$null$12(ServerCnx.java:966) ~[pulsar-broker-2.8.0-SNAPSHOT.jar:2.8.0-SNAPSHOT]
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952) ~[?:1.8.0_172]
        ... 15 more
17:09:46.801 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ClientCnx@682] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xb9d000f5, L:/127.0.0.1:55980 - R:localhost/127.0.0.1:15002] Received error from server: Topic policies cache have not init.
17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConsumerImpl@770] WARN  org.apache.pulsar.client.impl.ConsumerImpl - [persistent://pulsar/test/localhost:15000/__change_events-partition-0][multiTopicsReader-c3d8054591] Failed to subscribe to topic on localhost/127.0.0.1:15002
17:09:46.804 [pulsar-client-io-34-1:org.apache.pulsar.client.impl.ConnectionHandler@102] WARN  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/test/localhost:15000/__change_events-partition-0] [multiTopicsReader-c3d8054591] Could not get connection to broker: Topic policies cache have not init. -- Will try again in 0.1 s
```
The reason is that it doesn't catch exception in getTopicPolicies, which will lead to subscribe failed.
```Java
public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception {
        TopicName topicName = TopicName.get(topic);
        if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
            TopicPolicies topicPolicies =
                    brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
            if (topicPolicies == null) {
                return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
            } else {
                if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
                    return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
                }
                return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
            }
        } else {
            return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
        }
    }
```

### Changes
1. catch the exception in `checkSubscriptionTypesEnable`
2. expose exception stack in `servercnx#handleSubscribe`
  • Loading branch information
hangc0276 authored May 27, 2021
1 parent e678024 commit 608e36e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3024,15 +3024,19 @@ private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) {
public boolean checkSubscriptionTypesEnable(SubType subType) throws Exception {
TopicName topicName = TopicName.get(topic);
if (brokerService.pulsar().getConfiguration().isTopicLevelPoliciesEnabled()) {
TopicPolicies topicPolicies =
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
if (topicPolicies == null) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
} else {
if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
try {
TopicPolicies topicPolicies =
brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(TopicName.get(topic));
if (topicPolicies == null) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
} else {
if (topicPolicies.getSubscriptionTypesEnabled().isEmpty()) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
}
return topicPolicies.getSubscriptionTypesEnabled().contains(subType);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
}
} else {
return checkNsAndBrokerSubscriptionTypesEnable(topicName, subType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2159,4 +2159,29 @@ public void testGetCompactionThresholdApplied() throws Exception {
assertEquals(admin.topics().getCompactionThreshold(topic, true).longValue(), brokerPolicy);
}

@Test(timeOut = 30000)
public void testProduceConsumeOnTopicPolicy() {
final String msg = "send message ";
int numMsg = 10;
try {
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(persistenceTopic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscriptionName("test").subscribe();

Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(persistenceTopic).create();

for (int i = 0; i < numMsg; ++i) {
producer.newMessage().value(msg + i).send();
}

for (int i = 0; i < numMsg; ++i) {
Message<String> message = consumer.receive(100, TimeUnit.MILLISECONDS);
Assert.assertEquals(message.getValue(), msg + i);
}
} catch (PulsarClientException e) {
log.error("Failed to send/produce message, ", e);
Assert.fail();
}
}

}

0 comments on commit 608e36e

Please sign in to comment.