Skip to content

Commit

Permalink
Fixed message rate out with batches to count messages/s (apache#466) (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
jai1 authored Jun 15, 2017
1 parent 8abafee commit 042521a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import com.yahoo.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import com.yahoo.pulsar.common.naming.DestinationName;
import com.yahoo.pulsar.common.policies.data.ConsumerStats;
Expand Down Expand Up @@ -185,9 +186,6 @@ public Pair<ChannelPromise, Integer> sendMessages(final List<Entry> entries) {
readChecksum(metadataAndPayload);
}

// stats
msgOut.recordEvent(metadataAndPayload.readableBytes());

if (log.isDebugEnabled()) {
log.debug("[{}] Sending message to consumerId {}, entry id {}", subscription, consumerId,
pos.getEntryId());
Expand Down Expand Up @@ -238,6 +236,7 @@ int updatePermitsAndPendingAcks(final List<Entry> entries) throws PulsarServerEx
int permitsToReduce = 0;
Iterator<Entry> iter = entries.iterator();
boolean unsupportedVersion = false;
long totalReadableBytes = 0;
boolean clientSupportBatchMessages = cnx.isBatchMessageCompatibleVersion();
while (iter.hasNext()) {
Entry entry = iter.next();
Expand All @@ -258,6 +257,7 @@ int updatePermitsAndPendingAcks(final List<Entry> entries) throws PulsarServerEx
if (batchSize > 1 && !clientSupportBatchMessages) {
unsupportedVersion = true;
}
totalReadableBytes += metadataAndPayload.readableBytes();
permitsToReduce += batchSize;
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
Expand All @@ -271,6 +271,8 @@ int updatePermitsAndPendingAcks(final List<Entry> entries) throws PulsarServerEx
log.debug("[{}] [{}] message permits dropped below 0 - {}", subscription, consumerId, permits);
}
}

msgOut.recordMultipleEvents(permitsToReduce, totalReadableBytes);
return permitsToReduce;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.util.List;
Expand All @@ -27,6 +28,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -37,6 +39,8 @@

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.RateLimiter;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
import com.yahoo.pulsar.client.impl.ConsumerStats;
import com.yahoo.pulsar.client.impl.ProducerStats;

Expand Down Expand Up @@ -323,6 +327,39 @@ public void testSendTimeout(int batchMessageDelayMs) throws Exception {
assertEquals(cStat.getTotalMsgsReceived(), cStat.getTotalAcksSent());
log.info("-- Exiting {} test --", methodName);
}

public void testBatchMessagesRateOut() throws PulsarClientException, InterruptedException, PulsarAdminException {
log.info("-- Starting {} test --", methodName);
String topicName = "persistent://my-property/cluster/my-ns/testBatchMessagesRateOut";
double produceRate = 17;
int batchSize = 5;
ConsumerConfiguration consumerConf = new ConsumerConfiguration();
consumerConf.setSubscriptionType(SubscriptionType.Exclusive);
Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", consumerConf);
ProducerConfiguration producerConf = new ProducerConfiguration();
producerConf.setBatchingMaxMessages(batchSize);
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelay(2, TimeUnit.SECONDS);

Producer producer = pulsarClient.createProducer(topicName, producerConf);
AtomicBoolean runTest = new AtomicBoolean(true);
Thread t1 = new Thread(() -> {
RateLimiter r = RateLimiter.create(produceRate);
while (runTest.get()) {
r.acquire();
producer.sendAsync("Hello World".getBytes());
consumer.receiveAsync().thenAccept(message -> consumer.acknowledgeAsync(message));
}
});
t1.start();
Thread.sleep(2000); // Two seconds sleep
runTest.set(false);
pulsar.getBrokerService().updateRates();
double actualRate = admin.persistentTopics().getStats(topicName).msgRateOut;
assertTrue(actualRate > (produceRate / batchSize));
consumer.unsubscribe();
log.info("-- Exiting {} test --", methodName);
}

public void validatingLogInfo(Consumer consumer, Producer producer, boolean verifyAckCount)
throws InterruptedException {
Expand Down

0 comments on commit 042521a

Please sign in to comment.