Skip to content

Commit

Permalink
Fixed count of messages per thread in managed ledger writer (apache#1438
Browse files Browse the repository at this point in the history
)
  • Loading branch information
merlimat authored Mar 25, 2018
1 parent 5efafc5 commit 9a62eb8
Showing 1 changed file with 35 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,22 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
Expand Down Expand Up @@ -153,7 +153,7 @@ public static void main(String[] args) throws Exception {
log.info("Starting Pulsar managed-ledger perf writer with config: {}", w.writeValueAsString(arguments));

byte[] payloadData = new byte[arguments.msgSize];
ByteBuf payloadBuffer = Unpooled.directBuffer(arguments.msgSize);
ByteBuf payloadBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(arguments.msgSize);
payloadBuffer.writerIndex(arguments.msgSize);

// Now processing command line arguments
Expand Down Expand Up @@ -216,40 +216,46 @@ public void run() {
Collections.shuffle(managedLedgers);
AtomicBoolean isDone = new AtomicBoolean();

AddEntryCallback addEntryCallback = new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
long sendTime = (Long) (ctx);
messagesSent.increment();
bytesSent.add(payloadData.length);

long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime);
recorder.recordValue(latencyMicros);
cumulativeRecorder.recordValue(latencyMicros);
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.warn("Write error on message", exception);
System.exit(-1);
}
};

List<List<ManagedLedger>> managedLedgersPerThread = Lists.partition(managedLedgers,
Math.max(1, managedLedgers.size() / arguments.numThreads));

for (int i = 0; i < arguments.numThreads; i++) {
List<ManagedLedger> managedLedgersForThisThread = managedLedgersPerThread.get(i);
int nunManagedLedgersForThisThread = managedLedgersForThisThread.size();
long numMessagesForThisThread = arguments.numMessages / arguments.numThreads;
int maxOutstandingForThisThread = arguments.maxOutstanding;

executor.submit(() -> {
try {
double msgRate = arguments.msgRate / (double) arguments.numThreads;
RateLimiter rateLimiter = RateLimiter.create(msgRate);
final double msgRate = arguments.msgRate / (double) arguments.numThreads;
final RateLimiter rateLimiter = RateLimiter.create(msgRate);

// Acquire 1 sec worth of messages to have a slower ramp-up
rateLimiter.acquire((int) msgRate);
long startTime = System.currentTimeMillis();
final long startTime = System.currentTimeMillis();

final Semaphore semaphore = new Semaphore(maxOutstandingForThisThread);

final AddEntryCallback addEntryCallback = new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
long sendTime = (Long) (ctx);
messagesSent.increment();
bytesSent.add(payloadData.length);

long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime);
recorder.recordValue(latencyMicros);
cumulativeRecorder.recordValue(latencyMicros);

semaphore.release();
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.warn("Write error on message", exception);
System.exit(-1);
}
};

// Send messages on all topics/producers
long totalSent = 0;
Expand All @@ -265,15 +271,17 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
}
}

if (arguments.numMessages > 0) {
if (totalSent++ >= arguments.numMessages) {
if (numMessagesForThisThread > 0) {
if (totalSent++ >= numMessagesForThisThread) {
log.info("------------------- DONE -----------------------");
printAggregatedStats();
isDone.set(true);
Thread.sleep(5000);
System.exit(0);
}
}

semaphore.acquire();
rateLimiter.acquire();

final long sendTime = System.nanoTime();
Expand Down

0 comments on commit 9a62eb8

Please sign in to comment.