Skip to content

Commit

Permalink
Added payload and key interfaces (openmessaging#21)
Browse files Browse the repository at this point in the history
* added payload reader
* added keydistributor (round robin & random)
* refactoring
* added payload file for 3 producers
  • Loading branch information
tsypuk authored and merlimat committed Jan 26, 2018
1 parent 0370405 commit 7d9221d
Show file tree
Hide file tree
Showing 32 changed files with 490 additions and 63 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Workload | Topics | Partitions per topic | Message size | Subscriptions per topi
[`backlog-1-topic-16-partitions-1kb.yaml`](workloads/backlog-1-topic-16-partitions-1kb.yaml) | 1 | 16 | 1 kB | 1 | 1 | 100000 | 100 | 5
[`max-rate-1-topic-1-partition-1kb.yaml`](workloads/max-rate-1-topic-1-partition-1kb.yaml) | 1 | 1 | 1 kB | 1 | 1 | 0 | 0 | 5
[`max-rate-1-topic-1-partition-100b.yaml`](workloads/max-rate-1-topic-1-partition-100b.yaml) | 1 | 1 | 100 bytes | 1 | 1 | 0 | 0 | 5
[`1-topic-3-partition-100b-3producers.yaml`](workloads/1-topic-3-partition-100b-3producers.yaml) | 1 | 3 | 100 bytes | 1 | 3 | 0 | 0 | 15
[`max-rate-1-topic-16-partitions-1kb.yaml`](workloads/max-rate-1-topic-16-partitions-1kb.yaml) | 1 | 16 | 1 kB | 1 | 1 | 0 | 0 | 5
[`max-rate-1-topic-16-partitions-100b.yaml`](workloads/max-rate-1-topic-16-partitions-100b.yaml) | 1 | 16 | 100 bytes | 1 | 1 | 0 | 0 | 5
[`max-rate-1-topic-100-partitions-1kb.yaml`](workloads/max-rate-1-topic-100-partitions-1kb.yaml) | 1 | 100 | 1 kB | 1 | 1 | 0 | 0 | 5
Expand Down
7 changes: 7 additions & 0 deletions benchmark-framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@
<artifactId>driver-artemis</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ public static void main(String[] args) throws Exception {
private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

static {
mapper.enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_USING_DEFAULT_VALUE);
}

private static final ObjectWriter writer = new ObjectMapper().writerWithDefaultPrettyPrinter();

private static final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,16 @@
*/
package io.openmessaging.benchmark;

import io.openmessaging.benchmark.utils.distributor.KeyDistributorType;

import static io.openmessaging.benchmark.utils.distributor.KeyDistributorType.ROUND_ROBIN;

public class Workload {

public Workload() {
keyDistributor = ROUND_ROBIN;
}

public String name;

/** Number of topics to create in the test */
Expand All @@ -28,8 +36,12 @@ public class Workload {
/** Number of partitions each topic will contain */
public int partitionsPerTopic;

public KeyDistributorType keyDistributor;

public int messageSize;

public String payloadFile;

public int subscriptionsPerTopic;

public int producersPerTopic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.HdrHistogram.Histogram;
Expand All @@ -47,6 +49,12 @@
import io.openmessaging.benchmark.driver.ConsumerCallback;
import io.openmessaging.benchmark.utils.PaddingDecimalFormat;
import io.openmessaging.benchmark.utils.Timer;
import io.openmessaging.benchmark.utils.distributor.KeyDistributor;
import io.openmessaging.benchmark.utils.distributor.KeyDistributorType;
import io.openmessaging.benchmark.utils.payload.FilePayloadReader;
import io.openmessaging.benchmark.utils.payload.PayloadReader;

import static java.util.stream.Collectors.toList;

public class WorkloadGenerator implements ConsumerCallback, AutoCloseable {

Expand Down Expand Up @@ -281,34 +289,39 @@ private List<String> createTopics() {
}

private List<BenchmarkConsumer> createConsumers(List<String> topics) {
List<CompletableFuture<BenchmarkConsumer>> futures = new ArrayList<>();
List<CompletableFuture<BenchmarkConsumer>> consumerFutures = new ArrayList<>();
Timer timer = new Timer();

for (int i = 0; i < workload.subscriptionsPerTopic; i++) {
String subscriptionName = String.format("sub-%03d", i);

for (String topic : topics) {
futures.add(benchmarkDriver.createConsumer(topic, subscriptionName, this));
}
topics.stream()
.map(topic -> benchmarkDriver.createConsumer(topic, subscriptionName, this))
.forEach(consumerFutures::add);
}

List<BenchmarkConsumer> consumers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
List<BenchmarkConsumer> consumers = consumerFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());

log.info("Created {} consumers in {} ms", consumers.size(), timer.elapsedMillis());
return consumers;
}

private List<BenchmarkProducer> createProducers(List<String> topics) {
List<CompletableFuture<BenchmarkProducer>> futures = new ArrayList<>();
List<CompletableFuture<BenchmarkProducer>> producerFutures = new ArrayList<>();
Timer timer = new Timer();

for (int i = 0; i < workload.producersPerTopic; i++) {
for (String topic : topics) {
futures.add(benchmarkDriver.createProducer(topic));
}
topics.stream()
.map(topic -> benchmarkDriver.createProducer(topic))
.forEach(producerFutures::add);
}

List<BenchmarkProducer> producers = futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
List<BenchmarkProducer> producers = producerFutures.stream()
.map(CompletableFuture::join)
.collect(toList());

log.info("Created {} producers in {} ms", producers.size(), timer.elapsedMillis());
return producers;
}
Expand Down Expand Up @@ -356,8 +369,7 @@ private void buildAndDrainBacklog(List<String> topics) {
}
}

private TestResult generateLoad(List<BenchmarkProducer> producers, long testDurationsSeconds,
RateLimiter rateLimiter) {
private TestResult generateLoad(List<BenchmarkProducer> producers, long testDurationsSeconds, RateLimiter rateLimiter) {
Recorder publishRecorder = new Recorder(TimeUnit.SECONDS.toMicros(30), 5);
Recorder cumulativeRecorder = new Recorder(TimeUnit.SECONDS.toMicros(30), 5);

Expand All @@ -367,43 +379,20 @@ private TestResult generateLoad(List<BenchmarkProducer> producers, long testDura
int processors = Runtime.getRuntime().availableProcessors();
Collections.shuffle(producers);

final byte[] payloadData = new byte[workload.messageSize];
final PayloadReader payloadReader = new FilePayloadReader(workload.messageSize);
final byte[] payloadData = payloadReader.load(workload.payloadFile);

// Divide the producers across multiple different threads
for (List<BenchmarkProducer> producersPerThread : Lists.partition(producers, processors)) {
executor.submit(() -> {
try {
final KeyDistributorType keyDistributorType = workload.keyDistributor;

// Send messages on all topics/producers assigned to this thread
while (!testCompleted) {
for (int i = 0; i < producersPerThread.size(); i++) {
BenchmarkProducer producer = producersPerThread.get(i);
rateLimiter.acquire();

final long sendTime = System.nanoTime();
String key = randomKeys[Math.abs(((int) sendTime)) % randomKeys.length];

producer.sendAsync(key, payloadData).thenRun(() -> {
messagesSent.increment();
totalMessagesSent.increment();
bytesSent.add(payloadData.length);

long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - sendTime);
publishRecorder.recordValue(latencyMicros);
cumulativeRecorder.recordValue(latencyMicros);

}).exceptionally(ex -> {
log.warn("Write error on message", ex);
System.exit(-1);
return null;
});
}
}
} catch (Throwable t) {
log.error("Got error", t);
}
});
}
final Function<BenchmarkProducer, KeyDistributor> assignKeyDistributor = (any) ->
KeyDistributor.build(keyDistributorType);

Lists.partition(producers, processors).stream()
.map(producersPerThread -> producersPerThread.stream()
.collect(Collectors.toMap(Function.identity(), assignKeyDistributor)))
.forEach(producersWithKeyDistributor ->
submitProducersToExecutor(producersWithKeyDistributor, rateLimiter, payloadData,
publishRecorder, cumulativeRecorder));

// Print report stats
long oldTime = System.nanoTime();
Expand Down Expand Up @@ -525,6 +514,37 @@ private TestResult generateLoad(List<BenchmarkProducer> producers, long testDura
return result;
}

private void submitProducersToExecutor(Map<BenchmarkProducer, KeyDistributor> producersWithKeyDistributor,
RateLimiter rateLimiter, byte[] payloadData, Recorder publishRecorder,
Recorder cumulativeRecorder) {
executor.submit(() -> {
try {
while (!testCompleted) {
producersWithKeyDistributor.forEach((producer, producersKeyDistributor) -> {
rateLimiter.acquire();
final long sendTime = System.nanoTime();
producer.sendAsync(producersKeyDistributor.next(), payloadData).thenRun(() -> {
messagesSent.increment();
totalMessagesSent.increment();
bytesSent.add(payloadData.length);

long latencyMicros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - sendTime);
publishRecorder.recordValue(latencyMicros);
cumulativeRecorder.recordValue(latencyMicros);

}).exceptionally(ex -> {
log.warn("Write error on message", ex);
System.exit(-1);
return null;
});
});
}
} catch (Throwable t) {
log.error("Got error", t);
}
});
}

@Override
public void messageReceived(byte[] data, long publishTimestamp) {
messagesReceived.increment();
Expand All @@ -549,17 +569,6 @@ private static final String getRandomString() {
return BaseEncoding.base64Url().omitPadding().encode(buffer);
}

private static final String[] randomKeys = new String[10000];

static {
// Generate a number of random keys to be used when publishing
byte[] buffer = new byte[7];
for (int i = 0; i < randomKeys.length; i++) {
random.nextBytes(buffer);
randomKeys[i] = BaseEncoding.base64Url().omitPadding().encode(buffer);
}
}

private static double microsToMillis(double timeInMicros) {
return timeInMicros / 1000.0;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.openmessaging.benchmark.utils.distributor;

import com.google.common.io.BaseEncoding;

import java.util.Random;

public abstract class KeyDistributor {

private static final int UNIQUE_COUNT = 10_000;
private static final int KEY_BYTE_SIZE = 7;

private static final String[] randomKeys = new String[UNIQUE_COUNT];
private static final Random random = new Random();

static {
// Generate a number of random keys to be used when publishing
byte[] buffer = new byte[KEY_BYTE_SIZE];
for (int i = 0; i < randomKeys.length; i++) {
random.nextBytes(buffer);
randomKeys[i] = BaseEncoding.base64Url().omitPadding().encode(buffer);
}
}

protected String get(int index) {
return randomKeys[index];
}

protected int getLength() {
return UNIQUE_COUNT;
}

public abstract String next();

public static KeyDistributor build(KeyDistributorType keyType) {
KeyDistributor keyDistributor = null;
switch (keyType) {
case ROUND_ROBIN:
keyDistributor = new RoundRobin();
break;
case RANDOM_NANO:
keyDistributor = new RandomNano();
break;
}
return keyDistributor;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.openmessaging.benchmark.utils.distributor;

import com.fasterxml.jackson.annotation.JsonEnumDefaultValue;

public enum KeyDistributorType {
@JsonEnumDefaultValue
ROUND_ROBIN,
RANDOM_NANO,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.openmessaging.benchmark.utils.distributor;

import javax.annotation.concurrent.ThreadSafe;

@ThreadSafe
public class RandomNano extends KeyDistributor {

public String next() {
int randomIndex = Math.abs((int) System.nanoTime() % getLength());
return get(randomIndex);
}
}
Loading

0 comments on commit 7d9221d

Please sign in to comment.