diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
index 7cd6214d6cbf2..8ac0835f76043 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptor.java
@@ -22,9 +22,12 @@
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
+import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.intercept.InterceptException;
@@ -40,6 +43,20 @@
@InterfaceStability.Evolving
public interface BrokerInterceptor extends AutoCloseable {
+ /**
+ * Intercept messages before sending them to the consumers.
+ *
+ * @param subscription pulsar subscription
+ * @param entry entry
+ * @param ackSet entry ack bitset. it is either null or an array of long-based bitsets.
+ * @param msgMetadata message metadata. The message metadata will be recycled after this call.
+ */
+ default void beforeSendMessage(Subscription subscription,
+ Entry entry,
+ long[] ackSet,
+ MessageMetadata msgMetadata) {
+ }
+
/**
* Called by the broker while new command incoming.
*/
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
index 124bca8f40c66..e2c9821e53fac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptorWithClassLoader.java
@@ -25,9 +25,12 @@
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.nar.NarClassLoader;
@@ -42,6 +45,15 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
private final BrokerInterceptor interceptor;
private final NarClassLoader classLoader;
+ @Override
+ public void beforeSendMessage(Subscription subscription,
+ Entry entry,
+ long[] ackSet,
+ MessageMetadata msgMetadata) {
+ this.interceptor.beforeSendMessage(
+ subscription, entry, ackSet, msgMetadata);
+ }
+
@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
this.interceptor.onPulsarCommand(command, cnx);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
index 55e25c9f23407..3ae04b397252a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/BrokerInterceptors.java
@@ -25,10 +25,13 @@
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
/**
@@ -85,6 +88,20 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
}
}
+ @Override
+ public void beforeSendMessage(Subscription subscription,
+ Entry entry,
+ long[] ackSet,
+ MessageMetadata msgMetadata) {
+ for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
+ value.beforeSendMessage(
+ subscription,
+ entry,
+ ackSet,
+ msgMetadata);
+ }
+ }
+
@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 878a026f2840f..10326af16283f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -30,6 +30,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -133,8 +134,9 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
totalBytes += metadataAndPayload.readableBytes();
totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
batchSizes.setBatchSize(i, batchSize);
+ long[] ackSet = null;
if (indexesAcks != null && cursor != null) {
- long[] ackSet = cursor.getDeletedBatchIndexesAsLongArray(
+ ackSet = cursor.getDeletedBatchIndexesAsLongArray(
PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
if (ackSet != null) {
indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
@@ -142,6 +144,17 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
indexesAcks.setIndexesAcks(i, null);
}
}
+
+ BrokerInterceptor interceptor = subscription.interceptor();
+ if (null != interceptor) {
+ interceptor.beforeSendMessage(
+ subscription,
+ entry,
+ ackSet,
+ msgMetadata
+ );
+ }
+
} finally {
msgMetadata.recycle();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
index 7ad8a9086462b..1bf0d1fbe8bc0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java
@@ -24,12 +24,15 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
public interface Subscription {
+ BrokerInterceptor interceptor();
+
Topic getTopic();
String getName();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 90ff723930e30..917ec80968696 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -28,6 +28,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -76,6 +77,11 @@ public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionNa
this.lastActive = System.currentTimeMillis();
}
+ @Override
+ public BrokerInterceptor interceptor() {
+ return this.topic.getBrokerService().getInterceptor();
+ }
+
@Override
public String getName() {
return this.subName;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index f6a753366dcfa..865d4098ed67e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -44,6 +44,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -136,6 +137,11 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
IS_FENCED_UPDATER.set(this, FALSE);
}
+ @Override
+ public BrokerInterceptor interceptor() {
+ return topic.getBrokerService().getInterceptor();
+ }
+
@Override
public String getName() {
return this.subName;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
index 781e85f8c3677..4a1504c3dc743 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/BrokerInterceptorTest.java
@@ -18,7 +18,11 @@
*/
package org.apache.pulsar.broker.intercept;
+import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
@@ -35,6 +39,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
public class BrokerInterceptorTest extends ProducerConsumerBase {
@@ -108,4 +113,28 @@ public void testPulsarCommand() throws PulsarClientException {
// CONNECT and PRODUCER
Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 2);
}
+
+ @Test
+ public void testBeforeSendMessage() throws PulsarClientException {
+ BrokerInterceptor listener = pulsar.getBrokerInterceptor();
+ Assert.assertTrue(listener instanceof CounterBrokerInterceptor);
+
+ @Cleanup
+ Producer producer = pulsarClient.newProducer(Schema.STRING)
+ .topic("test-before-send-message")
+ .create();
+
+ Consumer consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topic("test-before-send-message")
+ .subscriptionName("test")
+ .subscribe();
+
+ producer.send("hello world");
+
+ Message msg = consumer.receive();
+
+ assertEquals(msg.getValue(), "hello world");
+
+ Assert.assertTrue(((CounterBrokerInterceptor) listener).getBeforeSendCount() == 1);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
index defe3869866fa..8f3fa5171a2a3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CounterBrokerInterceptor.java
@@ -19,9 +19,12 @@
package org.apache.pulsar.broker.intercept;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
+import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import javax.servlet.ServletException;
@@ -33,8 +36,19 @@
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {
+ int beforeSendCount = 0;
int count = 0;
+ @Override
+ public void beforeSendMessage(Subscription subscription,
+ Entry entry,
+ long[] ackSet,
+ MessageMetadata msgMetadata) {
+ log.info("Send message to topic {}, subscription {}",
+ subscription.getTopic(), subscription.getName());
+ beforeSendCount++;
+ }
+
@Override
public void onPulsarCommand(PulsarApi.BaseCommand command, ServerCnx cnx) throws InterceptException {
log.info("[{}] On [{}] Pulsar command", count, command.getType().name());
@@ -71,4 +85,8 @@ public void close() {
public int getCount() {
return count;
}
+
+ public int getBeforeSendCount() {
+ return beforeSendCount;
+ }
}