Skip to content

Commit

Permalink
PIP 37: [pulsar-client] support large message size (apache#4400)
Browse files Browse the repository at this point in the history
* PIP 37: [pulsar-client] support large message size

fix producer

fix ref counts

add timeouts

add validation

fix recycling

fix stats

review

fix test

fix test

fix send message and expiry-consumer-config

fix schema test

fix chunk properties

* fix test
  • Loading branch information
rdhabalia authored Jun 2, 2020
1 parent c64b22a commit 870a637
Show file tree
Hide file tree
Showing 42 changed files with 1,450 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1931,6 +1931,10 @@ private Response generateResponseWithEntry(Entry entry) throws IOException {
if (metadata.hasNullValue()) {
responseBuilder.header("X-Pulsar-null-value", metadata.hasNullValue());
}
if (metadata.getNumChunksFromMsg() > 0) {
responseBuilder.header("X-Pulsar-PROPERTY-TOTAL-CHUNKS", Integer.toString(metadata.getNumChunksFromMsg()));
responseBuilder.header("X-Pulsar-PROPERTY-CHUNK-ID", Integer.toString(metadata.getChunkId()));
}

// Decode if needed
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(metadata.getCompression());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchS
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;

for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
Expand Down Expand Up @@ -103,6 +104,7 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
int batchSize = msgMetadata.getNumMessagesInBatch();
totalMessages += batchSize;
totalBytes += metadataAndPayload.readableBytes();
totalChunkedMessages += msgMetadata.hasChunkId() ? 1: 0;
batchSizes.setBatchSize(i, batchSize);
if (indexesAcks != null && cursor != null) {
long[] ackSet = cursor.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
Expand All @@ -119,6 +121,7 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {

sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
}

private void processReplicatedSubscriptionSnapshot(PositionImpl pos, ByteBuf headersAndPayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class Consumer {

private long lastConsumedTimestamp;
private long lastAckedTimestamp;
private Rate chuckedMessageRate;

// Represents how many messages we can safely send to the consumer without
// overflowing its receiving queue. The consumer will use Flow commands to
Expand Down Expand Up @@ -146,6 +147,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.keySharedMeta = keySharedMeta;
this.cnx = cnx;
this.msgOut = new Rate();
this.chuckedMessageRate = new Rate();
this.msgRedeliver = new Rate();
this.bytesOutCounter = new LongAdder();
this.msgOutCounter = new LongAdder();
Expand Down Expand Up @@ -214,7 +216,7 @@ public boolean readCompacted() {
*/

public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
int totalMessages, long totalBytes, RedeliveryTracker redeliveryTracker) {
int totalMessages, long totalBytes, long totalChunkedMessages, RedeliveryTracker redeliveryTracker) {
this.lastConsumedTimestamp = System.currentTimeMillis();
final ChannelHandlerContext ctx = cnx.ctx();
final ChannelPromise writePromise = ctx.newPromise();
Expand Down Expand Up @@ -258,6 +260,7 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
msgOut.recordMultipleEvents(totalMessages, totalBytes);
msgOutCounter.add(totalMessages);
bytesOutCounter.add(totalBytes);
chuckedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);

ctx.channel().eventLoop().execute(() -> {
for (int i = 0; i < entries.size(); i++) {
Expand Down Expand Up @@ -505,10 +508,12 @@ private boolean shouldBlockConsumerOnUnackMsgs() {

public void updateRates() {
msgOut.calculateRate();
chuckedMessageRate.calculateRate();
msgRedeliver.calculateRate();
stats.msgRateOut = msgOut.getRate();
stats.msgThroughputOut = msgOut.getValueRate();
stats.msgRateRedeliver = msgRedeliver.getRate();
stats.chuckedMessageRate = chuckedMessageRate.getRate();
}

public ConsumerStats getStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class Producer {
private final long producerId;
private final String appId;
private Rate msgIn;
private Rate chuckedMessageRate;
// it records msg-drop rate only for non-persistent topic
private final Rate msgDrop;
private AuthenticationDataSource authenticationData;
Expand Down Expand Up @@ -100,6 +101,7 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName
this.appId = appId;
this.authenticationData = cnx.authenticationData;
this.msgIn = new Rate();
this.chuckedMessageRate = new Rate();
this.isNonPersistentTopic = topic instanceof NonPersistentTopic;
this.msgDrop = this.isNonPersistentTopic ? new Rate() : null;

Expand Down Expand Up @@ -136,13 +138,13 @@ public boolean equals(Object obj) {
return false;
}

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
publishMessageToTopic(headersAndPayload, sequenceId, batchSize);
publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked);
}

public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
ByteBuf headersAndPayload, long batchSize) {
ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
if (lowestSequenceId > highestSequenceId) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, highestSequenceId, ServerError.MetadataError,
Expand All @@ -152,7 +154,7 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS
return;
}
beforePublish(producerId, highestSequenceId, headersAndPayload, batchSize);
publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize);
publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked);
}

public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
Expand Down Expand Up @@ -197,16 +199,16 @@ public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPa
startPublishOperation((int) batchSize, headersAndPayload.readableBytes());
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize) {
private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
System.nanoTime()));
isChunked, System.nanoTime()));
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize) {
private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize, boolean isChunked) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
System.nanoTime()));
isChunked, System.nanoTime()));
}

private boolean verifyChecksum(ByteBuf headersAndPayload) {
Expand Down Expand Up @@ -286,6 +288,8 @@ private static final class MessagePublishContext implements PublishContext, Runn
private Rate rateIn;
private int msgSize;
private long batchSize;
private boolean chunked;

private long startTimeNs;

private String originalProducerName;
Expand All @@ -302,6 +306,10 @@ public long getSequenceId() {
return sequenceId;
}

public boolean isChunked() {
return chunked;
}

@Override
public long getHighestSequenceId() {
return highestSequenceId;
Expand Down Expand Up @@ -387,26 +395,30 @@ public void run() {
Commands.newSendReceipt(producer.producerId, sequenceId, highestSequenceId, ledgerId, entryId),
producer.cnx.ctx().voidPromise());
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
if (this.chunked) {
producer.chuckedMessageRate.recordEvent();
}
producer.publishOperationCompleted();
recycle();
}

static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize,
long batchSize, long startTimeNs) {
long batchSize, boolean chunked, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = sequenceId;
callback.rateIn = rateIn;
callback.msgSize = msgSize;
callback.batchSize = batchSize;
callback.chunked = chunked;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
return callback;
}

static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
int msgSize, long batchSize, long startTimeNs) {
int msgSize, long batchSize, boolean chunked, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
Expand All @@ -417,6 +429,7 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
callback.chunked = chunked;
return callback;
}

Expand Down Expand Up @@ -444,6 +457,11 @@ public void recycle() {
entryId = -1L;
batchSize = 0L;
startTimeNs = -1L;
ledgerId = -1;
entryId = -1;
batchSize = 0;
chunked = false;
startTimeNs = -1;
recyclerHandle.recycle(this);
}
}
Expand Down Expand Up @@ -525,9 +543,14 @@ public CompletableFuture<Void> disconnect() {

public void updateRates() {
msgIn.calculateRate();
chuckedMessageRate.calculateRate();
stats.msgRateIn = msgIn.getRate();
stats.msgThroughputIn = msgIn.getValueRate();
stats.averageMsgSize = msgIn.getAverageValue();
stats.chunkedMessageRate = chuckedMessageRate.getRate();
if (chuckedMessageRate.getCount() > 0 && this.topic instanceof PersistentTopic) {
((PersistentTopic) this.topic).msgChunkPublished = true;
}
if (this.isNonPersistentTopic) {
msgDrop.calculateRate();
((NonPersistentPublisherStats) stats).msgDropRate = msgDrop.getRate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class SendMessageInfo {
private int totalMessages;
private long totalBytes;
private int totalChunkedMessages;

private SendMessageInfo() {
// Private constructor so that all usages are through the thread-local instance
Expand All @@ -35,6 +36,7 @@ public static SendMessageInfo getThreadLocal() {
SendMessageInfo smi = THREAD_LOCAL.get();
smi.totalMessages = 0;
smi.totalBytes = 0;
smi.totalChunkedMessages = 0;
return smi;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,9 +1214,10 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
headersAndPayload, send.getNumMessages());
headersAndPayload, send.getNumMessages(), send.getIsChunk());
} else {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages());
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload,
send.getNumMessages(), send.getIsChunk());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public void sendMessages(List<Entry> entries) {
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null);
consumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());

TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
} else {
Expand Down Expand Up @@ -234,4 +234,4 @@ public boolean isConsumerAvailable(Consumer consumer) {

private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherMultipleConsumers.class);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void sendMessages(List<Entry> entries) {
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null);
currentConsumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
} else {
entries.forEach(entry -> {
int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
Expand Down Expand Up @@ -154,4 +154,4 @@ protected void readMoreEntries(Consumer consumer) {
protected void cancelPendingRead() {
// No-op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void sendMessages(List<Entry> entries) {
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo, null, null);
consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), getRedeliveryTracker());
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
} else {
entries.forEach(entry -> {
Expand All @@ -95,4 +95,4 @@ public void sendMessages(List<Entry> entries) {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.logging.log4j.util.Strings;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
Expand Down Expand Up @@ -523,7 +524,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
filterEntriesForConsumer(entriesForThisConsumer, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), redeliveryTracker);
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);

int msgSent = sendMessageInfo.getTotalMessages();
start += messagesForC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public synchronized void internalReadEntriesComplete(final List<Entry> entries,

currentConsumer
.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), redeliveryTracker)
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
redeliveryTracker)
.addListener(future -> {
if (future.isSuccess()) {
// acquire message-dispatch permits for already delivered messages
Expand Down Expand Up @@ -555,4 +556,4 @@ public CompletableFuture<Void> close() {
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
filterEntriesForConsumer(subList, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);

consumer.sendMessages(subList, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), getRedeliveryTracker()).addListener(future -> {
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
getRedeliveryTracker()).addListener(future -> {
if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
readMoreEntries();
}
});
});

for (int i = 0; i < messagesForC; i++) {
entriesWithSameKey.getValue().remove(0);
Expand Down Expand Up @@ -175,4 +176,4 @@ protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> pos

private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ public SubscriptionStats getStats(Boolean getPreciseBacklog) {
subStats.bytesOutCounter += consumerStats.bytesOutCounter;
subStats.msgOutCounter += consumerStats.msgOutCounter;
subStats.msgRateRedeliver += consumerStats.msgRateRedeliver;
subStats.chuckedMessageRate += consumerStats.chuckedMessageRate;
subStats.unackedMessages += consumerStats.unackedMessages;
subStats.lastConsumedTimestamp = Math.max(subStats.lastConsumedTimestamp, consumerStats.lastConsumedTimestamp);
subStats.lastAckedTimestamp = Math.max(subStats.lastAckedTimestamp, consumerStats.lastAckedTimestamp);
Expand Down
Loading

0 comments on commit 870a637

Please sign in to comment.