Skip to content

Commit

Permalink
Intercept beforeSendMessage calls (apache#8932)
Browse files Browse the repository at this point in the history
*Motivation*

Currently the message metadata headers are deserialized for filtering
out the entries to dispatch. Add a `beforeSendMessage` method to
intercept entries before sending them to consumers
  • Loading branch information
sijie authored Dec 15, 2020
1 parent a7f692f commit f76655a
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.intercept.InterceptException;
Expand All @@ -40,6 +43,20 @@
@InterfaceStability.Evolving
public interface BrokerInterceptor extends AutoCloseable {

/**
* Intercept messages before sending them to the consumers.
*
* @param subscription pulsar subscription
* @param entry entry
* @param ackSet entry ack bitset. it is either <tt>null</tt> or an array of long-based bitsets.
* @param msgMetadata message metadata. The message metadata will be recycled after this call.
*/
default void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
}

/**
* Called by the broker while new command incoming.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.nar.NarClassLoader;

Expand All @@ -42,6 +45,15 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
private final BrokerInterceptor interceptor;
private final NarClassLoader classLoader;

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
this.interceptor.beforeSendMessage(
subscription, entry, ackSet, msgMetadata);
}

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
this.interceptor.onPulsarCommand(command, cnx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;

/**
Expand Down Expand Up @@ -85,6 +88,20 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
}
}

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.beforeSendMessage(
subscription,
entry,
ackSet,
msgMetadata);
}
}

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand Down Expand Up @@ -133,15 +134,27 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
totalBytes += metadataAndPayload.readableBytes();
totalChunkedMessages += msgMetadata.hasChunkId() ? 1 : 0;
batchSizes.setBatchSize(i, batchSize);
long[] ackSet = null;
if (indexesAcks != null && cursor != null) {
long[] ackSet = cursor.getDeletedBatchIndexesAsLongArray(
ackSet = cursor.getDeletedBatchIndexesAsLongArray(
PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
if (ackSet != null) {
indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
} else {
indexesAcks.setIndexesAcks(i, null);
}
}

BrokerInterceptor interceptor = subscription.interceptor();
if (null != interceptor) {
interceptor.beforeSendMessage(
subscription,
entry,
ackSet,
msgMetadata
);
}

} finally {
msgMetadata.recycle();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;

public interface Subscription {

BrokerInterceptor interceptor();

Topic getTopic();

String getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
Expand Down Expand Up @@ -76,6 +77,11 @@ public NonPersistentSubscription(NonPersistentTopic topic, String subscriptionNa
this.lastActive = System.currentTimeMillis();
}

@Override
public BrokerInterceptor interceptor() {
return this.topic.getBrokerService().getInterceptor();
}

@Override
public String getName() {
return this.subName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
Expand Down Expand Up @@ -136,6 +137,11 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
IS_FENCED_UPDATER.set(this, FALSE);
}

@Override
public BrokerInterceptor interceptor() {
return topic.getBrokerService().getInterceptor();
}

@Override
public String getName() {
return this.subName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
*/
package org.apache.pulsar.broker.intercept;

import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
Expand All @@ -35,6 +39,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;

public class BrokerInterceptorTest extends ProducerConsumerBase {

Expand Down Expand Up @@ -108,4 +113,28 @@ public void testPulsarCommand() throws PulsarClientException {
// CONNECT and PRODUCER
Assert.assertTrue(((CounterBrokerInterceptor)listener).getCount() >= 2);
}

@Test
public void testBeforeSendMessage() throws PulsarClientException {
BrokerInterceptor listener = pulsar.getBrokerInterceptor();
Assert.assertTrue(listener instanceof CounterBrokerInterceptor);

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("test-before-send-message")
.create();

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic("test-before-send-message")
.subscriptionName("test")
.subscribe();

producer.send("hello world");

Message<String> msg = consumer.receive();

assertEquals(msg.getValue(), "hello world");

Assert.assertTrue(((CounterBrokerInterceptor) listener).getBeforeSendCount() == 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.pulsar.broker.intercept;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.intercept.InterceptException;

import javax.servlet.ServletException;
Expand All @@ -33,8 +36,19 @@
@Slf4j
public class CounterBrokerInterceptor implements BrokerInterceptor {

int beforeSendCount = 0;
int count = 0;

@Override
public void beforeSendMessage(Subscription subscription,
Entry entry,
long[] ackSet,
MessageMetadata msgMetadata) {
log.info("Send message to topic {}, subscription {}",
subscription.getTopic(), subscription.getName());
beforeSendCount++;
}

@Override
public void onPulsarCommand(PulsarApi.BaseCommand command, ServerCnx cnx) throws InterceptException {
log.info("[{}] On [{}] Pulsar command", count, command.getType().name());
Expand Down Expand Up @@ -71,4 +85,8 @@ public void close() {
public int getCount() {
return count;
}

public int getBeforeSendCount() {
return beforeSendCount;
}
}

0 comments on commit f76655a

Please sign in to comment.