Skip to content

Commit

Permalink
Improved ManagedLedgerWriter to use multiple threads and configurable…
Browse files Browse the repository at this point in the history
… digest type (apache#1396)
  • Loading branch information
merlimat authored Mar 15, 2018
1 parent 6fbd8c3 commit 833539a
Showing 1 changed file with 79 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,24 @@

import static java.util.concurrent.TimeUnit.NANOSECONDS;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.RateLimiter;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -52,17 +66,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.concurrent.DefaultThreadFactory;

public class ManagedLedgerWriter {

private static final ExecutorService executor = Executors
Expand Down Expand Up @@ -114,6 +117,9 @@ static class Arguments {
@Parameter(names = { "-a", "--ack-quorum" }, description = "Ledger ack quorum")
public int ackQuorum = 1;

@Parameter(names = { "-dt", "--digest-type" }, description = "BookKeeper digest type")
public DigestType digestType = DigestType.CRC32C;

@Parameter(names = { "-time",
"--test-duration" }, description = "Test duration in secs. If 0, it will keep publishing")
public long testTime = 0;
Expand Down Expand Up @@ -174,7 +180,7 @@ public static void main(String[] args) throws Exception {
mlConf.setMetadataEnsembleSize(arguments.ensembleSize);
mlConf.setMetadataWriteQuorumSize(arguments.writeQuorum);
mlConf.setMetadataAckQuorumSize(arguments.ackQuorum);
mlConf.setDigestType(DigestType.CRC32);
mlConf.setDigestType(arguments.digestType);
mlConf.setMaxSizePerLedgerMb(2048);

List<CompletableFuture<ManagedLedger>> futures = new ArrayList<>();
Expand Down Expand Up @@ -210,63 +216,75 @@ public void run() {
Collections.shuffle(managedLedgers);
AtomicBoolean isDone = new AtomicBoolean();

RateLimiter rateLimiter = RateLimiter.create(arguments.msgRate);
AddEntryCallback addEntryCallback = new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
long sendTime = (Long) (ctx);
messagesSent.increment();
bytesSent.add(payloadData.length);

executor.submit(() -> {
try {

long startTime = System.currentTimeMillis();

// Send messages on all topics/producers
long totalSent = 0;
while (true) {
for (int i = 0; i < arguments.numManagedLedgers; i++) {
if (arguments.testTime > 0) {
if (System.currentTimeMillis() - startTime > arguments.testTime) {
log.info("------------------- DONE -----------------------");
printAggregatedStats();
isDone.set(true);
Thread.sleep(5000);
System.exit(0);
}
}
long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime);
recorder.recordValue(latencyMicros);
cumulativeRecorder.recordValue(latencyMicros);
}

if (arguments.numMessages > 0) {
if (totalSent++ >= arguments.numMessages) {
log.info("------------------- DONE -----------------------");
printAggregatedStats();
isDone.set(true);
Thread.sleep(5000);
System.exit(0);
@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.warn("Write error on message", exception);
System.exit(-1);
}
};

List<List<ManagedLedger>> managedLedgersPerThread = Lists.partition(managedLedgers,
Math.max(1, managedLedgers.size() / arguments.numThreads));

for (int i = 0; i < arguments.numThreads; i++) {
List<ManagedLedger> managedLedgersForThisThread = managedLedgersPerThread.get(i);
int nunManagedLedgersForThisThread = managedLedgersForThisThread.size();

executor.submit(() -> {
try {
double msgRate = arguments.msgRate / (double) arguments.numThreads;
RateLimiter rateLimiter = RateLimiter.create(msgRate);

// Acquire 1 sec worth of messages to have a slower ramp-up
rateLimiter.acquire((int) msgRate);
long startTime = System.currentTimeMillis();

// Send messages on all topics/producers
long totalSent = 0;
while (true) {
for (int j = 0; j < nunManagedLedgersForThisThread; j++) {
if (arguments.testTime > 0) {
if (System.currentTimeMillis() - startTime > arguments.testTime) {
log.info("------------------- DONE -----------------------");
printAggregatedStats();
isDone.set(true);
Thread.sleep(5000);
System.exit(0);
}
}
}
rateLimiter.acquire();

final long sendTime = System.nanoTime();

managedLedgers.get(i).asyncAddEntry(payloadBuffer, new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
messagesSent.increment();
bytesSent.add(payloadData.length);

long latencyMicros = NANOSECONDS.toMicros(System.nanoTime() - sendTime);
recorder.recordValue(latencyMicros);
cumulativeRecorder.recordValue(latencyMicros);
if (arguments.numMessages > 0) {
if (totalSent++ >= arguments.numMessages) {
log.info("------------------- DONE -----------------------");
printAggregatedStats();
isDone.set(true);
Thread.sleep(5000);
System.exit(0);
}
}
rateLimiter.acquire();

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.warn("Write error on message", exception);
System.exit(-1);
}
}, null);
final long sendTime = System.nanoTime();
managedLedgersForThisThread.get(j).asyncAddEntry(payloadBuffer, addEntryCallback, sendTime);
}
}
} catch (Throwable t) {
log.error("Got error", t);
}
} catch (Throwable t) {
log.error("Got error", t);
}
});
});
}

// Print report stats
long oldTime = System.nanoTime();
Expand Down

0 comments on commit 833539a

Please sign in to comment.