Skip to content

Commit

Permalink
Added option for no key partition routing (openmessaging#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored and sijie committed Apr 17, 2018
1 parent d6d0a62 commit 8740b95
Show file tree
Hide file tree
Showing 15 changed files with 94 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,7 @@

import io.openmessaging.benchmark.utils.distributor.KeyDistributorType;

import static io.openmessaging.benchmark.utils.distributor.KeyDistributorType.ROUND_ROBIN;

public class Workload {

public Workload() {
keyDistributor = ROUND_ROBIN;
}

public String name;

/** Number of topics to create in the test */
Expand All @@ -36,7 +29,7 @@ public Workload() {
/** Number of partitions each topic will contain */
public int partitionsPerTopic;

public KeyDistributorType keyDistributor;
public KeyDistributorType keyDistributor = KeyDistributorType.NO_KEY;

public int messageSize;

Expand All @@ -53,7 +46,7 @@ public Workload() {
/**
* If the consumer backlog is > 0, the generator will accumulate messages until the requested amount of storage is
* retained and then it will start the consumers to drain it.
*
*
* The testDurationMinutes will be overruled to allow the test to complete when the consumer has drained all the
* backlog and it's on par with the producer
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ protected int getLength() {
public static KeyDistributor build(KeyDistributorType keyType) {
KeyDistributor keyDistributor = null;
switch (keyType) {
case ROUND_ROBIN:
keyDistributor = new RoundRobin();
case NO_KEY:
keyDistributor = new KeyRoundRobin();
break;
case KEY_ROUND_ROBIN:
keyDistributor = new KeyRoundRobin();
break;
case RANDOM_NANO:
keyDistributor = new RandomNano();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@

public enum KeyDistributorType {
@JsonEnumDefaultValue
ROUND_ROBIN,
/**
* Key distributor that returns null keys to have default publish semantics
*/
NO_KEY,

/**
* Genarate a finite number of "keys" and cycle through them in round-robin fashion
*/
KEY_ROUND_ROBIN,

/**
* Random distribution based on System.nanoTime()
*/
RANDOM_NANO,
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
public class RoundRobin extends KeyDistributor {
public class KeyRoundRobin extends KeyDistributor {

private int currentIndex = 0;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.openmessaging.benchmark.utils.distributor;

public class NoKeyDistributor extends KeyDistributor {

@Override
public String next() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -203,7 +204,7 @@ public void startLoad(ProducerWorkAssignment producerWorkAssignment) {
@Override
public void probeProducers() throws IOException {
producers.forEach(
producer -> producer.sendAsync("key", new byte[10]).thenRun(() -> totalMessagesSent.increment()));
producer -> producer.sendAsync(Optional.of("key"), new byte[10]).thenRun(() -> totalMessagesSent.increment()));
}

private void submitProducersToExecutor(Map<BenchmarkProducer, KeyDistributor> producersWithKeyDistributor,
Expand All @@ -214,7 +215,8 @@ private void submitProducersToExecutor(Map<BenchmarkProducer, KeyDistributor> pr
producersWithKeyDistributor.forEach((producer, producersKeyDistributor) -> {
rateLimiter.acquire();
final long sendTime = System.nanoTime();
producer.sendAsync(producersKeyDistributor.next(), payloadData).thenRun(() -> {
producer.sendAsync(Optional.ofNullable(producersKeyDistributor.next()), payloadData)
.thenRun(() -> {
messagesSent.increment();
totalMessagesSent.increment();
messagesSentCounter.inc();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@
*/
package io.openmessaging.benchmark.utils.distributor;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import static io.openmessaging.benchmark.utils.distributor.KeyDistributorType.KEY_ROUND_ROBIN;
import static io.openmessaging.benchmark.utils.distributor.KeyDistributorType.RANDOM_NANO;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;
import static org.junit.Assert.assertEquals;

import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;

import static io.openmessaging.benchmark.utils.distributor.KeyDistributorType.RANDOM_NANO;
import static io.openmessaging.benchmark.utils.distributor.KeyDistributorType.ROUND_ROBIN;
import static java.util.stream.Collectors.counting;
import static java.util.stream.Collectors.groupingBy;
import static org.junit.Assert.assertEquals;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class KeyDistributorTest {

Expand All @@ -39,7 +39,7 @@ public class KeyDistributorTest {

@Before
public void init() {
roundRobin = KeyDistributor.build(ROUND_ROBIN);
roundRobin = KeyDistributor.build(KEY_ROUND_ROBIN);
randomNano = KeyDistributor.build(RANDOM_NANO);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@
*/
package io.openmessaging.benchmark.driver;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public interface BenchmarkProducer extends AutoCloseable {

/**
* Publish a message and return a callback to track the completion of the operation.
*
*
* @param key
* the key associated with this message
* @param payload
* the message payload
* @return a future that will be triggered when the message is successfully published
*/
CompletableFuture<Void> sendAsync(String key, byte[] payload);
CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.openmessaging.benchmark.driver.artemis;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand Down Expand Up @@ -46,7 +47,7 @@ public void close() throws Exception {
}

@Override
public CompletableFuture<Void> sendAsync(String key, byte[] payload) {
public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
ClientMessage msg = session.createMessage(true /* durable */ );
msg.setTimestamp(System.currentTimeMillis());
msg.getBodyBuffer().writeBytes(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@
*/
package io.openmessaging.benchmark.driver.bookkeeper;

import io.openmessaging.benchmark.driver.BenchmarkProducer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.util.TimeSequencer;

import io.openmessaging.benchmark.driver.BenchmarkProducer;

public class DlogBenchmarkProducer implements BenchmarkProducer {

private final AsyncLogWriter writer;
Expand All @@ -40,7 +43,7 @@ public void close() throws Exception {
}

@Override
public CompletableFuture<Void> sendAsync(String key, byte[] payload) {
public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
LogRecord record = new LogRecord(
sequencer.nextId(), payload);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.openmessaging.benchmark.driver.kafka;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -36,8 +37,8 @@ public KafkaBenchmarkProducer(KafkaProducer<String, byte[]> producer, String top
}

@Override
public CompletableFuture<Void> sendAsync(String key, byte[] payload) {
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key, payload);
public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, key.orElse(null), payload);

CompletableFuture<Void> future = new CompletableFuture<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
*/
package io.openmessaging.benchmark.driver.pulsar;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;

Expand All @@ -40,9 +40,13 @@ public void close() throws Exception {
}

@Override
public CompletableFuture<Void> sendAsync(String key, byte[] payload) {
Message msg = MessageBuilder.create().setKey(key).setContent(payload).build();
return producer.sendAsync(msg).thenApply(msgId -> null);
public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
MessageBuilder msgBuilder = MessageBuilder.create().setContent(payload);
if (key.isPresent()) {
msgBuilder.setKey(key.get());
}

return producer.sendAsync(msgBuilder.build()).thenApply(msgId -> null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import com.rabbitmq.client.AMQP.BasicProperties;
Expand All @@ -45,11 +46,11 @@ public void close() throws Exception {
private static final BasicProperties defaultProperties = new BasicProperties();

@Override
public CompletableFuture<Void> sendAsync(String key, byte[] payload) {
public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
BasicProperties props = defaultProperties.builder().timestamp(new Date()).build();
CompletableFuture<Void> future = new CompletableFuture<>();
try {
channel.basicPublish(exchange, key, props, payload);
channel.basicPublish(exchange, key.orElse(null), props, payload);
channel.waitForConfirms();
future.complete(null);
} catch (IOException | InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package io.openmessaging.benchmark.driver.rocketmq;

import io.openmessaging.benchmark.driver.BenchmarkProducer;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import io.openmessaging.benchmark.driver.BenchmarkProducer;

public class RocketMQBenchmarkProducer implements BenchmarkProducer {
private final DefaultMQProducer rmqProducer;
private final String rmqTopic;
Expand All @@ -35,9 +38,11 @@ public RocketMQBenchmarkProducer(final DefaultMQProducer rmqProducer, final Stri
}

@Override
public CompletableFuture<Void> sendAsync(final String key, final byte[] payload) {
public CompletableFuture<Void> sendAsync(final Optional<String> key, final byte[] payload) {
Message message = new Message(this.rmqTopic, payload);
message.setKeys(key);
if (key.isPresent()) {
message.setKeys(key.get());
}

CompletableFuture<Void> future = new CompletableFuture<>();
try {
Expand Down
2 changes: 1 addition & 1 deletion workloads/1-topic-1-partition-1kb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ name: 1 topic / 1 partition / 1Kb

topics: 1
partitionsPerTopic: 1
keyDistributor: "ROUND_ROBIN"
keyDistributor: "NO_KEY"
messageSize: 1024
payloadFile: "payload/payload-1Kb.data"
subscriptionsPerTopic: 1
Expand Down

0 comments on commit 8740b95

Please sign in to comment.