Skip to content

Commit

Permalink
Intercept messages to consumers and add intercept exception (apache#8129
Browse files Browse the repository at this point in the history
)

* Intercept messages to consumers and add intercept exception
* Capture response for broker interceptor.
  • Loading branch information
zymap authored Sep 29, 2020
1 parent bf6a88e commit 73e5bc4
Show file tree
Hide file tree
Showing 21 changed files with 1,063 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,7 @@ public void start() throws PulsarServerException {
OffloadPolicies.create(this.getConfiguration().getProperties()));
this.brokerInterceptor = BrokerInterceptors.load(config);
brokerService.setInterceptor(getBrokerInterceptor());
this.brokerInterceptor.initialize(config);
this.brokerInterceptor.initialize(this);
brokerService.start();

this.webService = new WebService(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
package org.apache.pulsar.broker.intercept;

import com.google.common.annotations.Beta;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.intercept.InterceptException;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
Expand All @@ -42,19 +42,29 @@ public interface BrokerInterceptor extends AutoCloseable {
/**
* Called by the broker while new command incoming.
*/
void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws Exception;
void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException;

/**
* Called by the broker while connection closed.
*/
void onConnectionClosed(ServerCnx cnx);

/**
* Called by the web service while new request incoming.
*/
void onWebServiceRequest(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException;
void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException;

/**
* Intercept the webservice response before send to client.
*/
void onWebserviceResponse(ServletRequest request, ServletResponse response) throws IOException, ServletException;

/**
* Initialize the broker interceptor.
*
* @throws Exception when fail to initialize the broker interceptor.
*/
void initialize(ServiceConfiguration conf) throws Exception;
void initialize(PulsarService pulsarService) throws Exception;

BrokerInterceptor DISABLED = new BrokerInterceptorDisabled();

Expand All @@ -64,23 +74,33 @@ public interface BrokerInterceptor extends AutoCloseable {
class BrokerInterceptorDisabled implements BrokerInterceptor {

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws Exception {
//No-op
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
// no-op
}

@Override
public void onConnectionClosed(ServerCnx cnx) {
// no-op
}

@Override
public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
// no-op
}

@Override
public void onWebServiceRequest(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
chain.doFilter(request, response);
public void onWebserviceResponse(ServletRequest request, ServletResponse response) throws IOException, ServletException {
// no-op
}

@Override
public void initialize(ServiceConfiguration conf) throws Exception {
//No-op
public void initialize(PulsarService pulsarService) throws Exception {
// no-op
}

@Override
public void close() {
//No-op
// no-op
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.common.nar.NarClassLoader;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
Expand All @@ -44,18 +44,28 @@ public class BrokerInterceptorWithClassLoader implements BrokerInterceptor {
private final NarClassLoader classLoader;

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws Exception {
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
this.interceptor.onPulsarCommand(command, cnx);
}

@Override
public void onWebServiceRequest(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
this.interceptor.onWebServiceRequest(request, response, chain);
public void onConnectionClosed(ServerCnx cnx) {
this.interceptor.onConnectionClosed(cnx);
}

@Override
public void initialize(ServiceConfiguration conf) throws Exception {
this.interceptor.initialize(conf);
public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
this.interceptor.onWebserviceRequest(request);
}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) throws IOException, ServletException {
this.interceptor.onWebserviceResponse(request, response);
}

@Override
public void initialize(PulsarService pulsarService) throws Exception {
this.interceptor.initialize(pulsarService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@

import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.intercept.InterceptException;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
Expand Down Expand Up @@ -85,23 +86,37 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
}

@Override
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws Exception {
public void onPulsarCommand(BaseCommand command, ServerCnx cnx) throws InterceptException {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onPulsarCommand(command, cnx);
}
}

@Override
public void onWebServiceRequest(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
public void onConnectionClosed(ServerCnx cnx) {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onWebServiceRequest(request, response, chain);
value.onConnectionClosed(cnx);
}
}

@Override
public void initialize(ServiceConfiguration conf) throws Exception {
public void onWebserviceRequest(ServletRequest request) throws IOException, ServletException, InterceptException {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onWebserviceRequest(request);
}
}

@Override
public void onWebserviceResponse(ServletRequest request, ServletResponse response) throws IOException, ServletException {
for (BrokerInterceptorWithClassLoader value : interceptors.values()) {
value.onWebserviceResponse(request, response);
}
}

@Override
public void initialize(PulsarService pulsarService) throws Exception {
for (BrokerInterceptorWithClassLoader v : interceptors.values()) {
v.initialize(conf);
v.initialize(pulsarService);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,8 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
if (redeliveryTracker.contains(position)) {
redeliveryCount = redeliveryTracker.incrementAndGetRedeliveryCount(position);
}
ctx.write(Commands.newMessage(consumerId, messageId, redeliveryCount, metadataAndPayload,
batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i)), ctx.voidPromise());
ctx.write(cnx.newMessageAndIntercept(consumerId, messageId, redeliveryCount, metadataAndPayload,
batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i), topicName), ctx.voidPromise());
messageId.recycle();
messageIdBuilder.recycle();
entry.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
Expand Down Expand Up @@ -140,7 +139,8 @@ public boolean equals(Object obj) {
return false;
}

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize,
boolean isChunked) {
beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
}
Expand All @@ -149,8 +149,8 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS
ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
if (lowestSequenceId > highestSequenceId) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, highestSequenceId, ServerError.MetadataError,
"Invalid lowest or highest sequence id"));
cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError,
"Invalid lowest or highest sequence id");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
Expand All @@ -162,8 +162,8 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS
public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
if (isClosed) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.PersistenceError,
"Producer is closed"));
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.PersistenceError,
"Producer is closed");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});

Expand All @@ -172,8 +172,7 @@ public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPa

if (!verifyChecksum(headersAndPayload)) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(
Commands.newSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker"));
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
Expand All @@ -190,8 +189,8 @@ public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPa
if (encryptionKeysCount < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted"));
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
Expand Down Expand Up @@ -361,8 +360,8 @@ public void completed(Exception exception, long ledgerId, long entryId) {
// For TopicClosed exception there's no need to send explicit error, since the client was
// already notified
long callBackSequenceId = Math.max(highestSequenceId, sequenceId);
producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, callBackSequenceId,
serverError, exception.getMessage()));
producer.cnx.getCommandSender().sendSendError(producer.producerId, callBackSequenceId,
serverError, exception.getMessage());
}
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
producer.publishOperationCompleted();
Expand Down Expand Up @@ -393,9 +392,8 @@ public void run() {
// stats
rateIn.recordMultipleEvents(batchSize, msgSize);
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
producer.cnx.ctx().writeAndFlush(
Commands.newSendReceipt(producer.producerId, sequenceId, highestSequenceId, ledgerId, entryId),
producer.cnx.ctx().voidPromise());
producer.cnx.getCommandSender().sendSendReceiptResponse(producer.producerId, sequenceId, highestSequenceId,
ledgerId, entryId);
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
if (this.chunked) {
producer.chuckedMessageRate.recordEvent();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.schema.SchemaInfo;

import java.util.List;

public interface PulsarCommandSender {


void sendPartitionMetadataResponse(PulsarApi.ServerError error, String errorMsg, long requestId);

void sendPartitionMetadataResponse(int partitions, long requestId);

void sendSuccessResponse(long requestId);

void sendErrorResponse(long requestId, PulsarApi.ServerError error, String message);

void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion);

void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion);

void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId,
long entryId);

void sendSendError(long producerId, long sequenceId, PulsarApi.ServerError error, String errorMsg);

void sendGetTopicsOfNamespaceResponse(List<String> topics, long requestId);

void sendGetSchemaResponse(long requestId, SchemaInfo schema, SchemaVersion version);

void sendGetSchemaErrorResponse(long requestId, PulsarApi.ServerError error, String errorMessage);

void sendGetOrCreateSchemaResponse(long requestId, SchemaVersion schemaVersion);

void sendGetOrCreateSchemaErrorResponse(long requestId, PulsarApi.ServerError error, String errorMessage);

void sendConnectedResponse(int clientProtocolVersion, int maxMessageSize);

void sendLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative,
PulsarApi.CommandLookupTopicResponse.LookupType response, long requestId, boolean proxyThroughServiceUrl);

void sendLookupResponse(PulsarApi.ServerError error, String errorMsg, long requestId);
}
Loading

0 comments on commit 73e5bc4

Please sign in to comment.