Skip to content

Commit

Permalink
Fix consumer stuck issue due to reuse entry wrapper. (apache#10824)
Browse files Browse the repository at this point in the history
Fixes apache#10813
The issue is introduced by apache#7266, it only affects the master branch.

### Motivation

1. Add wrapperOffset to make sure get the correct batch size from the metadata
2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer
   it should be (messages / avgBatchSizePerMsg)

### Verifying this change

     * The test case is to simulate dispatch batches with different batch size to the consumer.
     * 1. The consumer has 1000 available permits
     * 2. The producer send batches with different batch size
     *
     * According the batch average size dispatching, the broker will dispatch all the batches to the consumer
  • Loading branch information
codelipenghui authored Jun 4, 2021
1 parent d8567a8 commit 4f23767
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ protected int updateEntryWrapperWithMetadata(EntryWrapper[] entryWrappers, List<
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, boolean isReplayRead) {
filterEntriesForConsumer(Optional.empty(), entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
filterEntriesForConsumer(Optional.empty(), 0, entries, batchSizes, sendMessageInfo, indexesAcks, cursor,
isReplayRead);
}

public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, List<Entry> entries,
EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, boolean isReplayRead) {
public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int entryWrapperOffset,
List<Entry> entries, EntryBatchSizes batchSizes, SendMessageInfo sendMessageInfo,
EntryBatchIndexesAcks indexesAcks, ManagedCursor cursor, boolean isReplayRead) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;
Expand All @@ -114,8 +114,9 @@ public void filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, List
continue;
}
ByteBuf metadataAndPayload = entry.getDataBuffer();
MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[i] != null
? entryWrapper.get()[i].getMetadata()
int entryWrapperIndex = i + entryWrapperOffset;
MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
? entryWrapper.get()[entryWrapperIndex].getMetadata()
: null;
msgMetadata = msgMetadata == null
? Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,11 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
return;
}
EntryWrapper[] entryWrappers = new EntryWrapper[entries.size()];
int totalMessages = updateEntryWrapperWithMetadata(entryWrappers, entries);
int remainingMessages = updateEntryWrapperWithMetadata(entryWrappers, entries);
int start = 0;
long totalMessagesSent = 0;
long totalBytesSent = 0;
int avgBatchSizePerMsg = totalMessages > 0 ? Math.max(totalMessages / entries.size(), 1) : 1;
int avgBatchSizePerMsg = remainingMessages > 0 ? Math.max(remainingMessages / entries.size(), 1) : 1;

int firstAvailableConsumerPermits, currentTotalAvailablePermits;
boolean dispatchMessage;
Expand Down Expand Up @@ -514,7 +514,7 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
c, c.getAvailablePermits());
}

int messagesForC = Math.min(Math.min(entriesToDispatch, availablePermits),
int messagesForC = Math.min(Math.min(remainingMessages, availablePermits),
serviceConfig.getDispatcherMaxRoundRobinBatchSize());
messagesForC = Math.max(messagesForC / avgBatchSizePerMsg, 1);

Expand All @@ -532,13 +532,14 @@ protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {

EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForThisConsumer.size());
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entriesForThisConsumer.size());
filterEntriesForConsumer(Optional.ofNullable(entryWrappers), entriesForThisConsumer, batchSizes,
sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);
filterEntriesForConsumer(Optional.ofNullable(entryWrappers), start, entriesForThisConsumer,
batchSizes, sendMessageInfo, batchIndexesAcks, cursor, readType == ReadType.Replay);

c.sendMessages(entriesForThisConsumer, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), redeliveryTracker);

int msgSent = sendMessageInfo.getTotalMessages();
remainingMessages -= msgSent;
start += messagesForC;
entriesToDispatch -= messagesForC;
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Topics;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import java.util.concurrent.TimeUnit;

@Test(groups = "broker-impl")
public class DispatchAccordingPermitsTests extends ProducerConsumerBase {

@Override
@BeforeMethod
public void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();
}

@Override
@AfterMethod(alwaysRun = true)
public void cleanup() throws Exception {
super.internalCleanup();
}

/**
* The test case is to simulate dispatch batches with different batch size to the consumer.
* 1. The consumer has 1000 available permits
* 2. The producer send batches with different batch size
*
* According the batch average size dispatching, the broker will dispatch all the batches to the consumer
*/
@Test
public void testFlowPermitsWithMultiBatchesDispatch() throws PulsarAdminException, PulsarClientException {
final String topic = "persistent://public/default/testFlowPermitsWithMultiBatchesDispatch";
final String subName = "test";
admin.topics().createSubscription(topic, "test", MessageId.earliest);

Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
.create();

for (int i = 0; i < 100; i++) {
producer.sendAsync("msg - " + i);
}
producer.flush();

for (int i = 0; i < 350; i++) {
producer.sendAsync("msg - " + i);
}
producer.flush();

for (int i = 0; i < 50; i++) {
producer.sendAsync("msg - " + i);
producer.flush();
}

Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

for (int i = 0; i < 500; i++) {
consumer.acknowledge(consumer.receive());
}

ConsumerImpl<String> consumerImpl = (ConsumerImpl<String>) consumer;
Assert.assertEquals(consumerImpl.incomingMessages.size(), 0);

TopicStats stats = admin.topics().getStats(topic);
Assert.assertTrue(stats.subscriptions.get(subName).consumers.get(0).availablePermits > 0);
}
}

0 comments on commit 4f23767

Please sign in to comment.