diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java index a74730d23e102..faee5799289d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.intercept; +import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.Map; @@ -208,4 +209,9 @@ public void close() { log.warn("Failed to close the broker interceptor class loader", e); } } + + @VisibleForTesting + public BrokerInterceptor getInterceptor() { + return interceptor; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java index e7f82742a97cc..cef3f0eb609a1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.intercept; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import io.netty.buffer.ByteBuf; import java.io.IOException; @@ -277,4 +278,9 @@ public void close() { private boolean interceptorsEnabled() { return interceptors != null && !interceptors.isEmpty(); } + + @VisibleForTesting + public Map getInterceptors() { + return interceptors; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 1351c6fe715a9..4c81b46601ea7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1185,7 +1185,11 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { remoteAddress, topicName, subscriptionName); commandSender.sendSuccessResponse(requestId); if (brokerInterceptor != null) { - brokerInterceptor.consumerCreated(this, consumer, metadata); + try { + brokerInterceptor.consumerCreated(this, consumer, metadata); + } catch (Throwable t) { + log.error("Exception occur when intercept consumer created.", t); + } } } else { // The consumer future was completed before by a close command @@ -1223,8 +1227,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } // If client timed out, the future would have been completed by subsequent close. - // Send error - // back to client, only if not completed already. + // Send error back to client, only if not completed already. if (consumerFuture.completeExceptionally(exception)) { commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), @@ -1521,8 +1524,11 @@ private void buildProducerAndAddTopic(Topic topic, long producerId, String produ producer.getLastSequenceId(), producer.getSchemaVersion(), newTopicEpoch, true /* producer is ready now */); if (brokerInterceptor != null) { - brokerInterceptor. - producerCreated(this, producer, metadata); + try { + brokerInterceptor.producerCreated(this, producer, metadata); + } catch (Throwable t) { + log.error("Exception occur when intercept producer created.", t); + } } return; } else { @@ -1689,7 +1695,11 @@ protected void handleAck(CommandAck ack) { requestId, null, null, consumerId)); } if (brokerInterceptor != null) { - brokerInterceptor.messageAcked(this, consumer, copyOfAckForInterceptor); + try { + brokerInterceptor.messageAcked(this, consumer, copyOfAckForInterceptor); + } catch (Throwable t) { + log.error("Exception occur when intercept message acked.", t); + } } }).exceptionally(e -> { if (hasRequestId) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptor.java new file mode 100644 index 0000000000000..f58d56c05a951 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.Consumer; +import org.apache.pulsar.broker.service.Producer; +import org.apache.pulsar.broker.service.ServerCnx; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.intercept.InterceptException; + +public class ExceptionsBrokerInterceptor implements BrokerInterceptor { + + + private AtomicInteger producerCount = new AtomicInteger(); + private AtomicInteger consumerCount = new AtomicInteger(); + private AtomicInteger messageAckCount = new AtomicInteger(); + + public AtomicInteger getProducerCount() { + return producerCount; + } + + public AtomicInteger getConsumerCount() { + return consumerCount; + } + + public AtomicInteger getMessageAckCount() { + return messageAckCount; + } + + @Override + public void producerCreated(ServerCnx cnx, Producer producer, Map metadata) { + producerCount.incrementAndGet(); + throw new RuntimeException("exception when intercept producer created"); + } + + @Override + public void consumerCreated(ServerCnx cnx, Consumer consumer, Map metadata) { + consumerCount.incrementAndGet(); + throw new RuntimeException("exception when intercept consumer created"); + } + + @Override + public void messageAcked(ServerCnx cnx, Consumer consumer, CommandAck ackCmd) { + messageAckCount.incrementAndGet(); + throw new RuntimeException("exception when intercept consumer ack message"); + } + + @Override + public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException { + + } + + @Override + public void onConnectionClosed(ServerCnx cnx) { + + } + + @Override + public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException { + + } + + @Override + public void onWebserviceResponse(ServletRequest request, ServletResponse response) + throws IOException, ServletException { + + } + + @Override + public void initialize(PulsarService pulsarService) throws Exception { + + } + + @Override + public void close() { + + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.java new file mode 100644 index 0000000000000..aa254a8ac168a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ExceptionsBrokerInterceptorTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.testcontext.PulsarTestContext; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.impl.ClientCnx; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.common.nar.NarClassLoader; +import org.awaitility.Awaitility; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class ExceptionsBrokerInterceptorTest extends ProducerConsumerBase { + + private String interceptorName = "exception_interceptor"; + + @BeforeMethod + public void setup() throws Exception { + conf.setSystemTopicEnabled(false); + conf.setTopicLevelPoliciesEnabled(false); + this.conf.setDisableBrokerInterceptors(false); + + + this.enableBrokerInterceptor = true; + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) { + Map listenerMap = new HashMap<>(); + BrokerInterceptor interceptor = new ExceptionsBrokerInterceptor(); + NarClassLoader narClassLoader = mock(NarClassLoader.class); + listenerMap.put(interceptorName, new BrokerInterceptorWithClassLoader(interceptor, narClassLoader)); + pulsarTestContextBuilder.brokerInterceptor(new BrokerInterceptors(listenerMap)); + } + + @Test + public void testMessageAckedExceptions() throws Exception { + String topic = "persistent://public/default/test"; + String subName = "test-sub"; + int messageNumber = 10; + admin.topics().createNonPartitionedTopic(topic); + + BrokerInterceptors listener = (BrokerInterceptors) pulsar.getBrokerInterceptor(); + assertNotNull(listener); + BrokerInterceptorWithClassLoader brokerInterceptor = listener.getInterceptors().get(interceptorName); + assertNotNull(brokerInterceptor); + BrokerInterceptor interceptor = brokerInterceptor.getInterceptor(); + assertTrue(interceptor instanceof ExceptionsBrokerInterceptor); + + Producer producer = pulsarClient.newProducer().topic(topic).create(); + + ConsumerImpl consumer = (ConsumerImpl) pulsarClient + .newConsumer() + .topic(topic) + .subscriptionName(subName) + .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS) + .isAckReceiptEnabled(true) + .subscribe(); + + Awaitility.await().until(() -> ((ExceptionsBrokerInterceptor) interceptor).getProducerCount().get() == 1); + Awaitility.await().until(() -> ((ExceptionsBrokerInterceptor) interceptor).getConsumerCount().get() == 1); + + for (int i = 0; i < messageNumber; i ++) { + producer.send("test".getBytes(StandardCharsets.UTF_8)); + } + + int receiveCounter = 0; + Message message; + while((message = consumer.receive(3, TimeUnit.SECONDS)) != null) { + receiveCounter ++; + consumer.acknowledge(message); + } + assertEquals(receiveCounter, 10); + Awaitility.await().until(() + -> ((ExceptionsBrokerInterceptor) interceptor).getMessageAckCount().get() == messageNumber); + + ClientCnx clientCnx = consumer.getClientCnx(); + // no duplicated responses received from broker + assertEquals(clientCnx.getDuplicatedResponseCount(), 0); + } + +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index f2ebb12f957ec..5074d0f55ef62 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -45,6 +45,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; import lombok.AccessLevel; import lombok.Getter; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -115,6 +116,8 @@ public class ClientCnx extends PulsarHandler { protected final Authentication authentication; protected State state; + private AtomicLong duplicatedResponseCounter = new AtomicLong(0); + @Getter private final ConcurrentLongHashMap> pendingRequests = ConcurrentLongHashMap.>newBuilder() @@ -352,6 +355,11 @@ public static boolean isKnownException(Throwable t) { return t instanceof NativeIoException || t instanceof ClosedChannelException; } + @VisibleForTesting + public long getDuplicatedResponseCount() { + return duplicatedResponseCounter.get(); + } + @Override protected void handleConnected(CommandConnected connected) { checkArgument(state == State.SentConnectFrame || state == State.Connecting); @@ -475,6 +483,7 @@ protected void handleAckResponse(CommandAckResponse ackResponse) { buildError(ackResponse.getRequestId(), ackResponse.getMessage()))); } } else { + duplicatedResponseCounter.incrementAndGet(); log.warn("AckResponse has complete when receive response! requestId : {}, consumerId : {}", ackResponse.getRequestId(), ackResponse.hasConsumerId()); } @@ -519,6 +528,7 @@ protected void handleSuccess(CommandSuccess success) { if (requestFuture != null) { requestFuture.complete(null); } else { + duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } } @@ -537,6 +547,7 @@ protected void handleGetLastMessageIdSuccess(CommandGetLastMessageIdResponse suc if (requestFuture != null) { requestFuture.complete(new CommandGetLastMessageIdResponse().copyFrom(success)); } else { + duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } } @@ -572,6 +583,7 @@ protected void handleProducerSuccess(CommandProducerSuccess success) { success.hasTopicEpoch() ? Optional.of(success.getTopicEpoch()) : Optional.empty()); requestFuture.complete(pr); } else { + duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } } @@ -719,6 +731,8 @@ private CompletableFuture getAndRemovePendingLookupRequest(lon } else { pendingLookupRequestSemaphore.release(); } + } else { + duplicatedResponseCounter.incrementAndGet(); } return result; } @@ -775,6 +789,7 @@ protected void handleError(CommandError error) { getPulsarClientException(error.getError(), buildError(error.getRequestId(), error.getMessage()))); } else { + duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), error.getRequestId()); } } @@ -882,6 +897,7 @@ protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResp success.isFiltered(), success.isChanged())); } else { + duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } } @@ -895,6 +911,7 @@ protected void handleGetSchemaResponse(CommandGetSchemaResponse commandGetSchema CompletableFuture future = (CompletableFuture) pendingRequests.remove(requestId); if (future == null) { + duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); return; } @@ -908,6 +925,7 @@ protected void handleGetOrCreateSchemaResponse(CommandGetOrCreateSchemaResponse CompletableFuture future = (CompletableFuture) pendingRequests.remove(requestId); if (future == null) { + duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), requestId); return; } @@ -1080,6 +1098,7 @@ protected void handleTcClientConnectResponse(CommandTcClientConnectResponse resp requestFuture.completeExceptionally(getExceptionByServerError(error, response.getMessage())); } } else { + duplicatedResponseCounter.incrementAndGet(); log.warn("Tc client connect command has been completed and get response for request: {}", response.getRequestId()); } @@ -1133,6 +1152,7 @@ protected void handleCommandWatchTopicListSuccess(CommandWatchTopicListSuccess c if (requestFuture != null) { requestFuture.complete(commandWatchTopicListSuccess); } else { + duplicatedResponseCounter.incrementAndGet(); log.warn("{} Received unknown request id from server: {}", ctx.channel(), commandWatchTopicListSuccess.getRequestId()); }