diff --git a/bin/pulsar-perf b/bin/pulsar-perf index 5794da2b35013..a520d96493862 100755 --- a/bin/pulsar-perf +++ b/bin/pulsar-perf @@ -73,12 +73,11 @@ pulsar_help() { cat < where command is one of: - produce Run a producer - consume Run a consumer - monitor-simple-load-manager Continuously receive broker data when using SimpleLoadManagerImpl - monitor-modular-load-manager Continuously receive broker data when using ModularLoadManagerImpl - simulation-client Run a simulation server acting as a Pulsar client - simulation-controller Run a simulation controller to give commands to servers + produce Run a producer + consume Run a consumer + monitor-brokers Continuously receive broker data and/or load reports + simulation-client Run a simulation server acting as a Pulsar client + simulation-controller Run a simulation controller to give commands to servers help This help message @@ -142,10 +141,8 @@ if [ "$COMMAND" == "produce" ]; then exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceProducer --conf-file $PULSAR_PERFTEST_CONF "$@" elif [ "$COMMAND" == "consume" ]; then exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceConsumer --conf-file $PULSAR_PERFTEST_CONF "$@" -elif [ "$COMMAND" == "monitor-simple-load-manager" ]; then - exec $JAVA $OPTS com.yahoo.pulsar.testclient.SimpleLoadManagerBrokerMonitor "$@" -elif [ "$COMMAND" == "monitor-modular-load-manager" ]; then - exec $JAVA $OPTS com.yahoo.pulsar.testclient.ModularLoadManagerBrokerMonitor "$@" +elif [ "$COMMAND" == "monitor-brokers" ]; then + exec $JAVA $OPTS com.yahoo.pulsar.testclient.BrokerMonitor "$@" elif [ "$COMMAND" == "simulation-client" ]; then exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationClient "$@" elif [ "$COMMAND" == "simulation-controller" ]; then diff --git a/docs/Documentation.md b/docs/Documentation.md index b2b42b8cc7747..88178d309bad4 100644 --- a/docs/Documentation.md +++ b/docs/Documentation.md @@ -14,6 +14,7 @@ - [Apache Storm adaptor](PulsarStorm.md) - [Spark Streaming Pulsar Receiver](PulsarSpark.md) - [Modular Load Manager](ModularLoadManager.md) + - [Simulation Tools](Simulation.md) * Internal Docs - [Binary protocol specification](BinaryProtocol.md) * Other Languages diff --git a/docs/SimulationTools.md b/docs/SimulationTools.md new file mode 100644 index 0000000000000..c9468c8a3ce8e --- /dev/null +++ b/docs/SimulationTools.md @@ -0,0 +1,101 @@ +# Simulation Tools + +It is sometimes necessary create an test environment and incur artificial load to observe how well load managers +handle the load. The load simulation controller, the load simulation client, and the broker monitor were created as an +effort to make create this load and observe the effects on the managers more easily. + +## Simulation Client +The simulation client is a machine which will create and subscribe to topics with configurable message rates and sizes. +Because it is sometimes necessary in simulating large load to use multiple client machines, the user does not interact +with the simulation client directly, but instead delegates their requests to the simulation controller, which will then +send signals to clients to start incurring load. The client implementation is in the class +`com.yahoo.pulsar.testclient.LoadSimulationClient`. + +### Usage +To Start a simulation client, use the `pulsar-perf` script with the command `simulation-client` as follows: + +``` +pulsar-perf simulation-client --port --service-url +``` + +The client will then be ready to receive controller commands. +## Simulation Controller +The simulation controller send signals to the simulation clients, requesting them to create new topics, stop old +topics, change the load incurred by topics, as well as several other tasks. It is implemented in the class +`com.yahoo.pulsar.testclient.LoadSimulationController` and presents a shell to the user as an interface to send +command with. + +### Usage +To start a simulation controller, use the `pulsar-perf` script with the command `simulation-controller` as follows: + +``` +pulsar-perf simulation-controller --cluster --client-port +--clients +``` + +The clients should already be started before the controller is started. You will then be presented with a simple prompt, +where you can issue commands to simulation clients. Arguments often refer to tenant names, namespace names, and topic +names. In all cases, the BASE name of the tenants, namespaces, and topics are used. For example, for the topic +`persistent://my_cluster/my_tenant/my_namespace/my_topic`, the tenant name is `my_tenant`, the namespace name is +`my_namespace`, and the topic name is `my_topic`. The controller can perform the following actions: + +* Create a topic with a producer and a consumer + * `trade [--rate ] + [--rand-rate ,] + [--size ]` +* Create a group of topics with a producer and a consumer + * `trade_group [--rate ] + [--rand-rate ,] + [--separation ] [--size ] + [--topics-per-namespace ]` +* Change the configuration of an existing topic + * `change [--rate ] + [--rand-rate ,] + [--size ]` +* Change the configuration of a group of topics + * `change_group [--rate ] [--rand-rate ,] + [--size ] [--topics-per-namespace ]` +* Shutdown a previously created topic + * `stop ` +* Shutdown a previously created group of topics + * `stop_group ` +* Copy the historical data from one ZooKeeper to another and simulate based on the message rates and sizes in that +history + * `copy [--rate-multiplier value]` +* Simulate the load of the historical data on the current ZooKeeper (should be same ZooKeeper being simulated on) + * `simulate [--rate-multiplier value]` +* Stream the latest data from the given active ZooKeeper to simulate the real-time load of that ZooKeeper. + * `stream [--rate-multiplier value]` + +The "group" arguments in these commands allow the user to create or affect multiple topics at once. Groups are created +when calling the `trade_group` command, and all topics from these groups may be subsequently modified or stopped +with the `change_group` and `stop_group` commands respectively. All ZooKeeper arguments are of the form +`zookeeper_host:port`. + +#### Difference Between Copy, Simulate, and Stream +The commands `copy`, `simulate`, and `stream` are very similar but have significant differences. `copy` is used when +you want to simulate the load of a static, external ZooKeeper on the ZooKeeper you are simulating on. Thus, +`source zookeeper` should be the ZooKeeper you want to copy and `target zookeeper` should be the ZooKeeper you are +simulating on, and then it will get the full benefit of the historical data of the source in both load manager +implementations. `simulate` on the other hand takes in only one ZooKeeper, the one you are simulating on. It assumes +that you are simulating on a ZooKeeper that has historical data for `SimpleLoadManagerImpl` and creates equivalent +historical data for `ModularLoadManagerImpl`. Then, the load according to the historical data is simulated by the +clients. Finally, `stream` takes in an active ZooKeeper different than the ZooKeeper being simulated on and streams +load data from it and simulates the real-time load. In all cases, the optional `rate-multiplier` argument allows the +user to simulate some proportion of the load. For instance, using `--rate-multiplier 0.05` will cause messages to +be sent at only `5%` of the rate of the load that is being simulated. + +## Broker Monitor +To observe the behavior of the load manager in these simulations, one may utilize the broker monitor, which is +implemented in `com.yahoo.pulsar.testclient.BrokerMonitor`. The broker monitor will print tabular load data to the +console as it is updated using watchers. + +### Usage +To start a broker monitor, use the `monitor-brokers` command in the `pulsar-perf` script: + +``` +pulsar-perf monitor-brokers --connect-string +``` + +The console will then continuously print load data until it is interrupted. + diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java index 2cadbef2b6835..f7c0ef5aad338 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/LoadManager.java @@ -17,6 +17,9 @@ import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.yahoo.pulsar.broker.PulsarServerException; import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.broker.ServiceConfiguration; @@ -25,8 +28,6 @@ import com.yahoo.pulsar.broker.stats.Metrics; import com.yahoo.pulsar.common.naming.ServiceUnitId; import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * LoadManager runs though set of load reports collected from different brokers and generates a recommendation of diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/LoadManagerShared.java index ee99cba545eef..4b6ea2a28ff03 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/LoadManagerShared.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/LoadManagerShared.java @@ -15,12 +15,7 @@ */ package com.yahoo.pulsar.broker.loadbalance.impl; -import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage; -import com.yahoo.pulsar.common.naming.NamespaceName; -import com.yahoo.pulsar.common.naming.ServiceUnitId; -import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import static com.google.common.base.Preconditions.checkArgument; import java.io.IOException; import java.net.MalformedURLException; @@ -28,7 +23,13 @@ import java.util.HashSet; import java.util.Set; -import static com.google.common.base.Preconditions.checkArgument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage; +import com.yahoo.pulsar.common.naming.NamespaceName; +import com.yahoo.pulsar.common.naming.ServiceUnitId; +import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage; /** * This class contains code which in shared between the two load manager implementations. diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index e0a73c2528724..9213bcf62f0ce 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -15,10 +15,7 @@ */ package com.yahoo.pulsar.broker.loadbalance.impl; -import static com.google.common.base.Preconditions.checkArgument; - import java.io.IOException; -import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.HashSet; @@ -31,9 +28,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import com.yahoo.pulsar.broker.TimeAverageMessageData; -import com.yahoo.pulsar.broker.loadbalance.LoadManager; -import com.yahoo.pulsar.common.policies.data.ResourceQuota; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.SystemUtils; @@ -57,14 +51,17 @@ import com.yahoo.pulsar.broker.PulsarService; import com.yahoo.pulsar.broker.ServiceConfiguration; import com.yahoo.pulsar.broker.TimeAverageBrokerData; +import com.yahoo.pulsar.broker.TimeAverageMessageData; import com.yahoo.pulsar.broker.loadbalance.BrokerFilter; import com.yahoo.pulsar.broker.loadbalance.BrokerHostUsage; import com.yahoo.pulsar.broker.loadbalance.LoadData; +import com.yahoo.pulsar.broker.loadbalance.LoadManager; import com.yahoo.pulsar.broker.loadbalance.LoadSheddingStrategy; import com.yahoo.pulsar.broker.loadbalance.ModularLoadManager; import com.yahoo.pulsar.broker.loadbalance.ModularLoadManagerStrategy; import com.yahoo.pulsar.client.admin.PulsarAdmin; import com.yahoo.pulsar.common.naming.ServiceUnitId; +import com.yahoo.pulsar.common.policies.data.ResourceQuota; import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats; import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage; import com.yahoo.pulsar.common.util.ObjectMapperFactory; diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java index 79c6bdfbc9d8c..cf64a8f06bb6a 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java @@ -15,11 +15,9 @@ */ package com.yahoo.pulsar.broker.loadbalance.impl; -import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.isNotEmpty; import java.io.IOException; -import java.net.MalformedURLException; import java.net.URL; import java.time.LocalDateTime; import java.util.ArrayList; diff --git a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/BrokerMonitor.java b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/BrokerMonitor.java new file mode 100644 index 0000000000000..2bf8af543a8c7 --- /dev/null +++ b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/BrokerMonitor.java @@ -0,0 +1,459 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.testclient; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.google.gson.Gson; +import com.yahoo.pulsar.broker.LocalBrokerData; +import com.yahoo.pulsar.broker.TimeAverageBrokerData; +import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; +import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport; +import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage; +import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage; +import com.yahoo.pulsar.testclient.utils.FixedColumnLengthTableMaker; + +/** + * Monitors brokers and prints to the console information about their system resource usages, their topic and bundle + * counts, their message rates, and other metrics. + */ +public class BrokerMonitor { + private static final Logger log = LoggerFactory.getLogger(BrokerMonitor.class); + + private static final String BROKER_ROOT = "/loadbalance/brokers"; + private static final int ZOOKEEPER_TIMEOUT_MILLIS = 5000; + private static final int GLOBAL_STATS_PRINT_PERIOD_MILLIS = 60000; + private final ZooKeeper zkClient; + private static final Gson gson = new Gson(); + + // Fields common for message rows. + private static final List MESSAGE_FIELDS = Arrays.asList("MSG/S IN", "MSG/S OUT", "TOTAL", "KB/S IN", + "KB/S OUT", "TOTAL"); + + // Fields common for system rows. + private static final List SYSTEM_FIELDS = Arrays.asList("CPU %", "MEMORY %", "DIRECT %", "BW IN %", + "BW OUT %", "MAX %"); + + private static final Object[] SYSTEM_ROW = makeSystemRow("SYSTEM"); + private static final Object[] COUNT_ROW = { "COUNT", "TOPIC", "BUNDLE", "PRODUCER", "CONSUMER", "BUNDLE +", + "BUNDLE -" }; + private static final Object[] LATEST_ROW = makeMessageRow("LATEST"); + private static final Object[] SHORT_ROW = makeMessageRow("SHORT"); + private static final Object[] LONG_ROW = makeMessageRow("LONG"); + private static final Object[] RAW_SYSTEM_ROW = makeSystemRow("RAW SYSTEM"); + private static final Object[] ALLOC_SYSTEM_ROW = makeSystemRow("ALLOC SYSTEM"); + private static final Object[] RAW_MESSAGE_ROW = makeMessageRow("RAW MSG"); + private static final Object[] ALLOC_MESSAGE_ROW = makeMessageRow("ALLOC MSG"); + private static final Object[] GLOBAL_HEADER = { "BROKER", "BUNDLE", "MSG/S", "KB/S", "MAX %" }; + + private final Map loadData; + + private static final FixedColumnLengthTableMaker localTableMaker = new FixedColumnLengthTableMaker(); + static { + // Makes the table length about 120. + localTableMaker.elementLength = 14; + localTableMaker.decimalFormatter = "%.2f"; + } + + private static final FixedColumnLengthTableMaker globalTableMaker = new FixedColumnLengthTableMaker(); + static { + globalTableMaker.decimalFormatter = "%.2f"; + globalTableMaker.topBorder = '*'; + globalTableMaker.bottomBorder = '*'; + // Make broker column substantially longer than other columns. + globalTableMaker.lengthFunction = column -> column == 0 ? 60 : 12; + } + + // Take advantage of repeated labels in message rows. + private static Object[] makeMessageRow(final String firstElement) { + final List result = new ArrayList<>(); + result.add(firstElement); + result.addAll(MESSAGE_FIELDS); + return result.toArray(); + } + + // Take advantage of repeated labels in system rows. + private static Object[] makeSystemRow(final String firstElement) { + final List result = new ArrayList<>(); + result.add(firstElement); + result.addAll(SYSTEM_FIELDS); + return result.toArray(); + } + + // Helper method to initialize rows. + private static void initRow(final Object[] row, final Object... elements) { + System.arraycopy(elements, 0, row, 1, elements.length); + } + + // Helper method to initialize rows which hold message data. + private static void initMessageRow(final Object[] row, final double messageRateIn, final double messageRateOut, + final double messageThroughputIn, final double messageThroughputOut) { + initRow(row, messageRateIn, messageRateOut, messageRateIn + messageRateOut, messageThroughputIn / 1024, + messageThroughputOut / 1024, (messageThroughputIn + messageThroughputOut) / 1024); + } + + // Prints out the global load data. + private void printGlobalData() { + synchronized (loadData) { + // 1 header row, 1 total row, and loadData.size() rows for brokers. + Object[][] rows = new Object[loadData.size() + 2][]; + rows[0] = GLOBAL_HEADER; + int totalBundles = 0; + double totalThroughput = 0; + double totalMessageRate = 0; + double maxMaxUsage = 0; + int i = 1; + for (final Map.Entry entry : loadData.entrySet()) { + final String broker = entry.getKey(); + final Object data = entry.getValue(); + rows[i] = new Object[GLOBAL_HEADER.length]; + rows[i][0] = broker; + int numBundles; + double messageRate; + double messageThroughput; + double maxUsage; + if (data instanceof LoadReport) { + final LoadReport loadReport = (LoadReport) data; + numBundles = (int) loadReport.getNumBundles(); + messageRate = loadReport.getMsgRateIn() + loadReport.getMsgRateOut(); + messageThroughput = (loadReport.getAllocatedBandwidthIn() + loadReport.getAllocatedBandwidthOut()) + / 1024; + final SystemResourceUsage systemResourceUsage = loadReport.getSystemResourceUsage(); + maxUsage = Math.max( + Math.max( + Math.max(systemResourceUsage.getCpu().percentUsage(), + systemResourceUsage.getMemory().percentUsage()), + Math.max(systemResourceUsage.getDirectMemory().percentUsage(), + systemResourceUsage.getBandwidthIn().percentUsage())), + systemResourceUsage.getBandwidthOut().percentUsage()); + } else if (data instanceof LocalBrokerData) { + final LocalBrokerData localData = (LocalBrokerData) data; + numBundles = localData.getNumBundles(); + messageRate = localData.getMsgRateIn() + localData.getMsgRateOut(); + messageThroughput = (localData.getMsgThroughputIn() + localData.getMsgThroughputOut()) / 1024; + maxUsage = localData.getMaxResourceUsage(); + } else { + throw new AssertionError("Unreachable code"); + } + + rows[i][1] = numBundles; + rows[i][2] = messageRate; + rows[i][3] = messageThroughput; + rows[i][4] = maxUsage; + + totalBundles += numBundles; + totalMessageRate += messageRate; + totalThroughput += messageThroughput; + maxMaxUsage = Math.max(maxUsage, maxMaxUsage); + ++i; + } + final int finalRow = loadData.size() + 1; + rows[finalRow] = new Object[GLOBAL_HEADER.length]; + rows[finalRow][0] = "TOTAL"; + rows[finalRow][1] = totalBundles; + rows[finalRow][2] = totalMessageRate; + rows[finalRow][3] = totalThroughput; + rows[finalRow][4] = maxMaxUsage; + final String table = globalTableMaker.make(rows); + log.info("Overall Broker Data:\n{}", table); + } + } + + // This watcher initializes data watchers whenever a new broker is found. + private class BrokerWatcher implements Watcher { + private final ZooKeeper zkClient; + private Set brokers; + + private BrokerWatcher(final ZooKeeper zkClient) { + this.zkClient = zkClient; + this.brokers = Collections.emptySet(); + } + + /** + * Creates a watch for a newly acquired broker so that its data is printed whenever it is updated. + * + * @param event + * The watched event. + */ + public synchronized void process(final WatchedEvent event) { + try { + if (event.getType() == Event.EventType.NodeChildrenChanged) { + updateBrokers(event.getPath()); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + // Inform the user of any broker gains and losses and put watches on newly acquired brokers. + private synchronized void updateBrokers(final String path) { + final Set newBrokers = new HashSet<>(); + try { + newBrokers.addAll(zkClient.getChildren(path, this)); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + for (String oldBroker : brokers) { + if (!newBrokers.contains(oldBroker)) { + log.info("Lost broker: " + oldBroker); + } + } + for (String newBroker : newBrokers) { + if (!brokers.contains(newBroker)) { + log.info("Gained broker: " + newBroker); + final BrokerDataWatcher brokerDataWatcher = new BrokerDataWatcher(zkClient); + brokerDataWatcher.printData(path + "/" + newBroker); + } + } + this.brokers = newBrokers; + } + } + + // This watcher prints tabular data for a broker after its ZNode is updated. + private class BrokerDataWatcher implements Watcher { + private final ZooKeeper zkClient; + + private BrokerDataWatcher(final ZooKeeper zkClient) { + this.zkClient = zkClient; + } + + // Given the path to a broker ZNode, return the broker host name. + private String brokerNameFromPath(final String path) { + return path.substring(path.lastIndexOf('/') + 1); + } + + /** + * Print the local and historical broker data in a tabular format, and put this back as a watcher. + * + * @param event + * The watched event. + */ + public synchronized void process(final WatchedEvent event) { + try { + if (event.getType() == Event.EventType.NodeDataChanged) { + printData(event.getPath()); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private double percentUsage(final double usage, final double limit) { + return limit > 0 && usage >= 0 ? 100 * Math.min(1, usage / limit) : 0; + } + + // Decide whether this broker is running SimpleLoadManagerImpl or ModularLoadManagerImpl and then print the data + // accordingly. + private synchronized void printData(final String path) { + final String broker = brokerNameFromPath(path); + String jsonString; + try { + jsonString = new String(zkClient.getData(path, this, null)); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + // Use presence of the String "allocated" to determine if this is using SimpleLoadManagerImpl. + if (jsonString.contains("allocated")) { + printLoadReport(broker, gson.fromJson(jsonString, LoadReport.class)); + } else { + final LocalBrokerData localBrokerData = gson.fromJson(jsonString, LocalBrokerData.class); + final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker; + try { + final TimeAverageBrokerData timeAverageData = gson.fromJson( + new String(zkClient.getData(timeAveragePath, false, null)), TimeAverageBrokerData.class); + printBrokerData(broker, localBrokerData, timeAverageData); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + // Print the load report in a tabular form for a broker running SimpleLoadManagerImpl. + private synchronized void printLoadReport(final String broker, final LoadReport loadReport) { + loadData.put(broker, loadReport); + + // Initialize the constant rows. + final Object[][] rows = new Object[10][]; + + rows[0] = COUNT_ROW; + rows[2] = RAW_SYSTEM_ROW; + rows[4] = ALLOC_SYSTEM_ROW; + rows[6] = RAW_MESSAGE_ROW; + rows[8] = ALLOC_MESSAGE_ROW; + + // First column is a label, so start at the second column at index 1. + // Client count row. + rows[1] = new Object[COUNT_ROW.length]; + initRow(rows[1], loadReport.getNumTopics(), loadReport.getNumBundles(), loadReport.getNumProducers(), + loadReport.getNumConsumers(), loadReport.getBundleGains().size(), + loadReport.getBundleLosses().size()); + + // Raw system row. + final SystemResourceUsage systemResourceUsage = loadReport.getSystemResourceUsage(); + final ResourceUsage cpu = systemResourceUsage.getCpu(); + final ResourceUsage memory = systemResourceUsage.getMemory(); + final ResourceUsage directMemory = systemResourceUsage.getDirectMemory(); + final ResourceUsage bandwidthIn = systemResourceUsage.getBandwidthIn(); + final ResourceUsage bandwidthOut = systemResourceUsage.getBandwidthOut(); + final double maxUsage = Math.max( + Math.max(Math.max(cpu.percentUsage(), memory.percentUsage()), + Math.max(directMemory.percentUsage(), bandwidthIn.percentUsage())), + bandwidthOut.percentUsage()); + rows[3] = new Object[RAW_SYSTEM_ROW.length]; + initRow(rows[3], cpu.percentUsage(), memory.percentUsage(), directMemory.percentUsage(), + bandwidthIn.percentUsage(), bandwidthOut.percentUsage(), maxUsage); + + // Allocated system row. + rows[5] = new Object[ALLOC_SYSTEM_ROW.length]; + final double allocatedCpuUsage = percentUsage(loadReport.getAllocatedCPU(), cpu.limit); + final double allocatedMemoryUsage = percentUsage(loadReport.getAllocatedMemory(), memory.limit); + final double allocatedBandwidthInUsage = percentUsage(loadReport.getAllocatedBandwidthIn(), + bandwidthIn.limit); + final double allocatedBandwidthOutUsage = percentUsage(loadReport.getAllocatedBandwidthOut(), + bandwidthOut.limit); + final double maxAllocatedUsage = Math.max( + Math.max(Math.max(allocatedCpuUsage, allocatedMemoryUsage), allocatedBandwidthInUsage), + allocatedBandwidthOutUsage); + initRow(rows[5], allocatedCpuUsage, allocatedMemoryUsage, null, allocatedBandwidthInUsage, + allocatedBandwidthOutUsage, maxAllocatedUsage); + + // Raw message row. + rows[7] = new Object[RAW_MESSAGE_ROW.length]; + initMessageRow(rows[7], loadReport.getMsgRateIn(), loadReport.getMsgRateOut(), bandwidthIn.usage, + bandwidthOut.usage); + + // Allocated message row. + rows[9] = new Object[ALLOC_MESSAGE_ROW.length]; + initMessageRow(rows[9], loadReport.getAllocatedMsgRateIn(), loadReport.getAllocatedMsgRateOut(), + loadReport.getAllocatedBandwidthIn(), loadReport.getAllocatedBandwidthOut()); + + final String table = localTableMaker.make(rows); + log.info("\nLoad Report for {}:\n{}\n", broker, table); + } + + // Print the broker data in a tabular form for a broker using ModularLoadManagerImpl. + private synchronized void printBrokerData(final String broker, final LocalBrokerData localBrokerData, + final TimeAverageBrokerData timeAverageData) { + loadData.put(broker, localBrokerData); + + // Initialize the constant rows. + final Object[][] rows = new Object[10][]; + rows[0] = SYSTEM_ROW; + rows[2] = COUNT_ROW; + rows[4] = LATEST_ROW; + rows[6] = SHORT_ROW; + rows[8] = LONG_ROW; + + // First column is a label, so start at the second column at index 1. + // System row. + rows[1] = new Object[SYSTEM_ROW.length]; + initRow(rows[1], localBrokerData.getCpu().percentUsage(), localBrokerData.getMemory().percentUsage(), + localBrokerData.getDirectMemory().percentUsage(), localBrokerData.getBandwidthIn().percentUsage(), + localBrokerData.getBandwidthOut().percentUsage(), localBrokerData.getMaxResourceUsage() * 100); + + // Count row. + rows[3] = new Object[COUNT_ROW.length]; + initRow(rows[3], localBrokerData.getNumTopics(), localBrokerData.getNumBundles(), + localBrokerData.getNumProducers(), localBrokerData.getNumConsumers(), + localBrokerData.getLastBundleGains().size(), localBrokerData.getLastBundleLosses().size()); + + // Latest message data row. + rows[5] = new Object[LATEST_ROW.length]; + initMessageRow(rows[5], localBrokerData.getMsgRateIn(), localBrokerData.getMsgRateOut(), + localBrokerData.getMsgThroughputIn(), localBrokerData.getMsgThroughputOut()); + + // Short-term message data row. + rows[7] = new Object[SHORT_ROW.length]; + initMessageRow(rows[7], timeAverageData.getShortTermMsgRateIn(), timeAverageData.getShortTermMsgRateOut(), + timeAverageData.getShortTermMsgThroughputIn(), timeAverageData.getShortTermMsgThroughputOut()); + + // Long-term message data row. + rows[9] = new Object[LONG_ROW.length]; + initMessageRow(rows[9], timeAverageData.getLongTermMsgRateIn(), timeAverageData.getLongTermMsgRateOut(), + timeAverageData.getLongTermMsgThroughputIn(), timeAverageData.getLongTermMsgThroughputOut()); + + final String table = localTableMaker.make(rows); + log.info("\nBroker Data for {}:\n{}\n", broker, table); + } + } + + // JCommander arguments class. + private static class Arguments { + @Parameter(names = { "--connect-string" }, description = "Zookeeper connect string", required = true) + public String connectString = null; + } + + /** + * Create a broker monitor from the given ZooKeeper client. + * + * @param zkClient + * Client to create this from. + */ + public BrokerMonitor(final ZooKeeper zkClient) { + loadData = new ConcurrentHashMap<>(); + this.zkClient = zkClient; + } + + /** + * Start the broker monitoring procedure. + */ + public void start() { + try { + final BrokerWatcher brokerWatcher = new BrokerWatcher(zkClient); + brokerWatcher.updateBrokers(BROKER_ROOT); + while (true) { + Thread.sleep(GLOBAL_STATS_PRINT_PERIOD_MILLIS); + printGlobalData(); + } + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** + * Run a monitor from command line arguments. + * + * @param args + * Arguments for the monitor. + */ + public static void main(String[] args) { + try { + final Arguments arguments = new Arguments(); + final JCommander jc = new JCommander(arguments); + jc.parse(args); + final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null); + final BrokerMonitor monitor = new BrokerMonitor(zkClient); + monitor.start(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/LoadSimulationClient.java b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/LoadSimulationClient.java index dae3492be97a5..2d15b32f8db9b 100644 --- a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/LoadSimulationClient.java +++ b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/LoadSimulationClient.java @@ -30,6 +30,8 @@ import java.util.function.Function; import org.apache.commons.lang.SystemUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; @@ -49,8 +51,6 @@ import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.concurrent.DefaultThreadFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * LoadSimulationClient is used to simulate client load by maintaining producers and consumers for topics. Instances of @@ -59,22 +59,24 @@ public class LoadSimulationClient { private final static Logger log = LoggerFactory.getLogger(LoadSimulationClient.class); - // Values for command responses. - public static final byte FOUND_TOPIC = 0; - public static final byte NO_SUCH_TOPIC = 1; - public static final byte REDUNDANT_COMMAND = 2; - // Values for command encodings. public static final byte CHANGE_COMMAND = 0; public static final byte STOP_COMMAND = 1; public static final byte TRADE_COMMAND = 2; public static final byte CHANGE_GROUP_COMMAND = 3; public static final byte STOP_GROUP_COMMAND = 4; + public static final byte FIND_COMMAND = 5; private final ExecutorService executor; + // Map from a message size to a cached byte[] of that size. private final Map payloadCache; + + // Map from a full topic name to the TradeUnit created for that topic. private final Map topicsToTradeUnits; + + // Pulsar client to create producers and consumers with. private final PulsarClient client; + private final ProducerConfiguration producerConf; private final ConsumerConfiguration consumerConf; private final ClientConfiguration clientConf; @@ -100,8 +102,8 @@ private static class TradeUnit { final Map payloadCache; public TradeUnit(final TradeConfiguration tradeConf, final PulsarClient client, - final ProducerConfiguration producerConf, final ConsumerConfiguration consumerConf, - final Map payloadCache) throws Exception { + final ProducerConfiguration producerConf, final ConsumerConfiguration consumerConf, + final Map payloadCache) throws Exception { consumerFuture = client.subscribeAsync(tradeConf.topic, "Subscriber-" + tradeConf.topic, consumerConf); producerFuture = client.createProducerAsync(tradeConf.topic, producerConf); this.payload = new AtomicReference<>(); @@ -227,95 +229,86 @@ private void handle(final byte command, final DataInputStream inputStream, final final TradeConfiguration tradeConf = new TradeConfiguration(); tradeConf.command = command; switch (command) { - case CHANGE_COMMAND: - // Change the topic's settings if it exists. Report whether the - // topic was found on this server. - decodeProducerOptions(tradeConf, inputStream); - if (topicsToTradeUnits.containsKey(tradeConf.topic)) { - topicsToTradeUnits.get(tradeConf.topic).change(tradeConf); - outputStream.write(FOUND_TOPIC); - } else { - outputStream.write(NO_SUCH_TOPIC); - } - break; - case STOP_COMMAND: - // Stop the topic if it exists. Report whether the topic was found, - // and whether it was already stopped. - tradeConf.topic = inputStream.readUTF(); - if (topicsToTradeUnits.containsKey(tradeConf.topic)) { - final boolean wasStopped = topicsToTradeUnits.get(tradeConf.topic).stop.getAndSet(true); - outputStream.write(wasStopped ? REDUNDANT_COMMAND : FOUND_TOPIC); - } else { - outputStream.write(NO_SUCH_TOPIC); - } - break; - case TRADE_COMMAND: - // Create the topic. It is assumed that the topic does not already - // exist. - decodeProducerOptions(tradeConf, inputStream); - final TradeUnit tradeUnit = new TradeUnit(tradeConf, client, producerConf, consumerConf, payloadCache); - topicsToTradeUnits.put(tradeConf.topic, tradeUnit); - executor.submit(() -> { - try { - tradeUnit.start(); - } catch (Exception ex) { - throw new RuntimeException(ex); + case CHANGE_COMMAND: + // Change the topic's settings if it exists. + decodeProducerOptions(tradeConf, inputStream); + if (topicsToTradeUnits.containsKey(tradeConf.topic)) { + topicsToTradeUnits.get(tradeConf.topic).change(tradeConf); } - }); - // Tell controller topic creation is finished. - outputStream.write(NO_SUCH_TOPIC); - break; - case CHANGE_GROUP_COMMAND: - // Change the settings of all topics belonging to a group. Report - // the number of topics changed. - decodeGroupOptions(tradeConf, inputStream); - tradeConf.size = inputStream.readInt(); - tradeConf.rate = inputStream.readDouble(); - // See if a topic belongs to this tenant and group using this regex. - final String groupRegex = ".*://.*/" + tradeConf.tenant + "/" + tradeConf.group + "-.*/.*"; - int numFound = 0; - for (Map.Entry entry : topicsToTradeUnits.entrySet()) { - final String destination = entry.getKey(); - final TradeUnit unit = entry.getValue(); - if (destination.matches(groupRegex)) { - ++numFound; - unit.change(tradeConf); + break; + case STOP_COMMAND: + // Stop the topic if it exists. + tradeConf.topic = inputStream.readUTF(); + if (topicsToTradeUnits.containsKey(tradeConf.topic)) { + topicsToTradeUnits.get(tradeConf.topic).stop.set(true); } - } - outputStream.writeInt(numFound); - break; - case STOP_GROUP_COMMAND: - // Stop all topics belonging to a group. Report the number of topics - // stopped. - decodeGroupOptions(tradeConf, inputStream); - // See if a topic belongs to this tenant and group using this regex. - final String regex = ".*://.*/" + tradeConf.tenant + "/" + tradeConf.group + "-.*/.*"; - int numStopped = 0; - for (Map.Entry entry : topicsToTradeUnits.entrySet()) { - final String destination = entry.getKey(); - final TradeUnit unit = entry.getValue(); - if (destination.matches(regex) && !unit.stop.getAndSet(true)) { - ++numStopped; + break; + case TRADE_COMMAND: + // Create the topic. It is assumed that the topic does not already exist. + decodeProducerOptions(tradeConf, inputStream); + final TradeUnit tradeUnit = new TradeUnit(tradeConf, client, producerConf, consumerConf, payloadCache); + topicsToTradeUnits.put(tradeConf.topic, tradeUnit); + executor.submit(() -> { + try { + tradeUnit.start(); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + }); + break; + case CHANGE_GROUP_COMMAND: + // Change the settings of all topics belonging to a group. + decodeGroupOptions(tradeConf, inputStream); + tradeConf.size = inputStream.readInt(); + tradeConf.rate = inputStream.readDouble(); + // See if a topic belongs to this tenant and group using this regex. + final String groupRegex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*"; + for (Map.Entry entry : topicsToTradeUnits.entrySet()) { + final String destination = entry.getKey(); + final TradeUnit unit = entry.getValue(); + if (destination.matches(groupRegex)) { + unit.change(tradeConf); + } } - } - outputStream.writeInt(numStopped); - break; - default: - throw new IllegalArgumentException("Unrecognized command code received: " + command); + break; + case STOP_GROUP_COMMAND: + // Stop all topics belonging to a group. + decodeGroupOptions(tradeConf, inputStream); + // See if a topic belongs to this tenant and group using this regex. + final String regex = ".*://" + tradeConf.tenant + "/.*/" + tradeConf.group + "-.*/.*"; + for (Map.Entry entry : topicsToTradeUnits.entrySet()) { + final String destination = entry.getKey(); + final TradeUnit unit = entry.getValue(); + if (destination.matches(regex)) { + unit.stop.set(true); + } + } + break; + case FIND_COMMAND: + // Write a single boolean indicating if the topic was found. + outputStream.writeBoolean(topicsToTradeUnits.containsKey(inputStream.readUTF())); + outputStream.flush(); + break; + default: + throw new IllegalArgumentException("Unrecognized command code received: " + command); } - outputStream.flush(); } + // Make listener as lightweight as possible. private static final MessageListener ackListener = Consumer::acknowledgeAsync; + /** + * Create a LoadSimulationClient with the given JCommander arguments. + * @param arguments Arguments to configure this from. + */ public LoadSimulationClient(final MainArguments arguments) throws Exception { payloadCache = new ConcurrentHashMap<>(); topicsToTradeUnits = new ConcurrentHashMap<>(); final EventLoopGroup eventLoopGroup = SystemUtils.IS_OS_LINUX ? new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors(), - new DefaultThreadFactory("pulsar-test-client")) + new DefaultThreadFactory("pulsar-test-client")) : new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), - new DefaultThreadFactory("pulsar-test-client")); + new DefaultThreadFactory("pulsar-test-client")); clientConf = new ClientConfiguration(); clientConf.setConnectionsPerBroker(4); @@ -340,6 +333,10 @@ public LoadSimulationClient(final MainArguments arguments) throws Exception { executor = Executors.newCachedThreadPool(new DefaultThreadFactory("test-client")); } + /** + * Start a client with command line arguments. + * @param args Command line arguments to pass in. + */ public static void main(String[] args) throws Exception { final MainArguments mainArguments = new MainArguments(); final JCommander jc = new JCommander(mainArguments); @@ -352,6 +349,9 @@ public static void main(String[] args) throws Exception { (new LoadSimulationClient(mainArguments)).run(); } + /** + * Start listening for controller commands to create producers and consumers. + */ public void run() throws Exception { final ServerSocket serverSocket = new ServerSocket(port); @@ -361,7 +361,7 @@ public void run() throws Exception { // has not been tested or considered and is not recommended. log.info("Listening for controller command..."); final Socket socket = serverSocket.accept(); - log.info("Connected to {}\n", socket.getInetAddress().getHostName()); + log.info("Connected to {}", socket.getInetAddress().getHostName()); executor.submit(() -> { try { handle(socket); diff --git a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/LoadSimulationController.java b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/LoadSimulationController.java index 4786009431518..a0097acc81690 100644 --- a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/LoadSimulationController.java +++ b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/LoadSimulationController.java @@ -22,76 +22,74 @@ import java.io.InputStreamReader; import java.net.Socket; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.bookkeeper.util.ZkUtils; import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.beust.jcommander.ParameterException; import com.yahoo.pulsar.broker.BundleData; +import com.yahoo.pulsar.broker.loadbalance.LoadManager; import com.yahoo.pulsar.common.policies.data.ResourceQuota; import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport; import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats; import com.yahoo.pulsar.common.util.ObjectMapperFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * To use: 1. Delegate a list of server machines which act as zookeeper clients. 2. Choose a port for those machines. 3. - * On each of these machines, get them to listen via pulsar-perf simulation-server --port --service-url - * 4. Start the controller with pulsar-perf simulation-controller --cluster - * --servers : --server-port 5. You will get a shell on the - * controller, where you can use the commands trade, change, stop, trade_group, change_group, stop_group. You can enter - * "help" to see the syntax for the commands. Note that tenant, namespace, and topic refer to - * persistent://cluster/tenant/namespace/topic/bundle. For instance, to start trading for topic with destination - * persistent://mycluster/mytenant/mynamespace/mytopic/bundle at rate 200 msgs/s, you would type "trade mytenant - * mynamespace mytopic --rate 200". The group commands also refer to a "group_name" parameter. This is a string that is - * prefixed to the namespaces when trade_group is invoked so they may be identified by other group commands. At the - * moment, groups may not be modified after they have been created via trade_group. - * + * This class provides a shell for the user to dictate how simulation clients should incur load. */ public class LoadSimulationController { private static final Logger log = LoggerFactory.getLogger(LoadSimulationController.class); private final static String QUOTA_ROOT = "/loadbalance/resource-quota/namespace"; + private final static String BUNDLE_DATA_ROOT = "/loadbalance/bundle-data"; - // Input streams for each server to send commands through. + // Input streams for each client to send commands through. private final DataInputStream[] inputStreams; - // Output streams for each server to receive information from. + // Output streams for each client to receive information from. private final DataOutputStream[] outputStreams; - // Server host names. - private final String[] servers; + // client host names. + private final String[] clients; - // Port servers are listening on. - private final int serverPort; + // Port clients are listening on. + private final int clientPort; // The ZooKeeper cluster to run on. private final String cluster; private final Random random; + private static final ExecutorService threadPool = Executors.newCachedThreadPool(); + // JCommander arguments for starting a controller via main. private static class MainArguments { @Parameter(names = { "--cluster" }, description = "Cluster to test on", required = true) String cluster; - @Parameter(names = { "--servers" }, description = "Comma separated list of server hostnames", required = true) - String serverHostNames; + @Parameter(names = { "--clients" }, description = "Comma separated list of client hostnames", required = true) + String clientHostNames; - @Parameter(names = { "--server-port" }, description = "Port that the servers are listening on", required = true) - int serverPort; + @Parameter(names = { "--client-port" }, description = "Port that the clients are listening on", required = true) + int clientPort; } // JCommander arguments for accepting user input. @@ -100,7 +98,7 @@ private static class ShellArguments { + "change tenant namespace topic\n" + "stop tenant namespace topic\n" + "trade_group tenant group_name num_namespaces\n" + "change_group tenant group_name\n" + "stop_group tenant group_name\n" + "script script_name\n" + "copy tenant_name source_zk target_zk\n" - + "stream source_zk\n", required = true) + + "stream source_zk\n" + "simulate zk\n", required = true) List commandArguments; @Parameter(names = { "--rand-rate" }, description = "Choose message rate uniformly randomly from the next two " @@ -113,41 +111,45 @@ private static class ShellArguments { @Parameter(names = { "--rate-multiplier" }, description = "Multiplier to use for copying or streaming rates") double rateMultiplier = 1; - @Parameter(names = { "--size" }, description = "Message size in bytes") - int size = 1024; - @Parameter(names = { "--separation" }, description = "Separation time in ms for trade_group actions " + "(0 for no separation)") int separation = 0; + @Parameter(names = { "--size" }, description = "Message size in bytes") + int size = 1024; + @Parameter(names = { "--topics-per-namespace" }, description = "Number of topics to create per namespace in " + "trade_group (total number of topics is num_namespaces X num_topics)") int topicsPerNamespace = 1; } - // In stream mode, the BrokerWatcher watches the /loadbalance/broker zpath - // and adds LoadReportWatchers - // accordingly when new brokers come up. + // In stream mode, the BrokerWatcher watches the /loadbalance/broker zpath and adds LoadReportWatchers accordingly + // when new brokers come up. private class BrokerWatcher implements Watcher { private final ZooKeeper zkClient; + + // Currently observed brokers. private final Set brokers; - private final String path; + + // Shell arguments to configure streaming with. private final ShellArguments arguments; - public BrokerWatcher(final String path, final ZooKeeper zkClient, final ShellArguments arguments) { - this.path = path; + private BrokerWatcher(final ZooKeeper zkClient, final ShellArguments arguments) { this.zkClient = zkClient; this.arguments = arguments; brokers = new HashSet<>(); + // Observe the currently active brokers and put a watch on the broker root. process(null); } + // Add load report watchers for newly observed brokers. public synchronized void process(final WatchedEvent event) { try { - final List currentBrokers = zkClient.getChildren(path, this); + final List currentBrokers = zkClient.getChildren(LoadManager.LOADBALANCE_BROKERS_ROOT, this); for (final String broker : currentBrokers) { if (!brokers.contains(broker)) { - new LoadReportWatcher(String.format("%s/%s", path, broker), zkClient, arguments); + new LoadReportWatcher(String.format("%s/%s", LoadManager.LOADBALANCE_BROKERS_ROOT, broker), + zkClient, arguments); brokers.add(broker); } } @@ -157,10 +159,8 @@ public synchronized void process(final WatchedEvent event) { } } - // In stream mode, the LoadReportWatcher watches the /loadbalance/broker - // children and adds or modifies topics - // with suitable rates based on the most recent message rate and throughput - // information. + // In stream mode, the LoadReportWatcher watches the /loadbalance/broker children and adds or modifies topics with + // suitable rates based on the most recent message rate and throughput information. private class LoadReportWatcher implements Watcher { private final ZooKeeper zkClient; private final String path; @@ -174,6 +174,7 @@ public LoadReportWatcher(final String path, final ZooKeeper zkClient, final Shel process(null); } + // Update the message rate information for the bundles in a recently changed load report. public synchronized void process(final WatchedEvent event) { try { // Get the load report and put this back as a watch. @@ -185,22 +186,17 @@ public synchronized void process(final WatchedEvent event) { final String destination = String.format("%s/%s", namespace, "t"); final NamespaceBundleStats stats = entry.getValue(); - // Approximate total message rate via average between - // in/out. + // Approximate total message rate via average between in/out. final double messageRate = arguments.rateMultiplier * (stats.msgRateIn + stats.msgRateOut) / 2; // size = throughput / rate. final int messageSize = (int) Math.ceil(arguments.rateMultiplier * (stats.msgThroughputIn + stats.msgThroughputOut) / (2 * messageRate)); - final ShellArguments tradeArguments = new ShellArguments(); arguments.rate = messageRate; arguments.size = messageSize; - // Try to modify the topic if it already exists. Otherwise, - // create it. - if (!change(tradeArguments, destination)) { - trade(tradeArguments, destination); - } + // Try to modify the topic if it already exists. Otherwise, create it. + changeOrCreate(arguments, destination); } } catch (Exception ex) { throw new RuntimeException(ex); @@ -216,18 +212,18 @@ public synchronized void process(final WatchedEvent event) { */ public LoadSimulationController(final MainArguments arguments) throws Exception { random = new Random(); - serverPort = arguments.serverPort; + clientPort = arguments.clientPort; cluster = arguments.cluster; - servers = arguments.serverHostNames.split(","); - final Socket[] sockets = new Socket[servers.length]; - inputStreams = new DataInputStream[servers.length]; - outputStreams = new DataOutputStream[servers.length]; - log.info("Found {} servers:", servers.length); - for (int i = 0; i < servers.length; ++i) { - sockets[i] = new Socket(servers[i], serverPort); + clients = arguments.clientHostNames.split(","); + final Socket[] sockets = new Socket[clients.length]; + inputStreams = new DataInputStream[clients.length]; + outputStreams = new DataOutputStream[clients.length]; + log.info("Found {} clients:", clients.length); + for (int i = 0; i < clients.length; ++i) { + sockets[i] = new Socket(clients[i], clientPort); inputStreams[i] = new DataInputStream(sockets[i].getInputStream()); outputStreams[i] = new DataOutputStream(sockets[i].getOutputStream()); - log.info("Connected to {}", servers[i]); + log.info("Connected to {}", clients[i]); } } @@ -235,17 +231,61 @@ public LoadSimulationController(final MainArguments arguments) throws Exception // actual number of application arguments. private boolean checkAppArgs(final int numAppArgs, final int numRequired) { if (numAppArgs != numRequired) { - log.info("ERROR: Wrong number of application arguments (found {}, required {})", numAppArgs, - numRequired); + log.info("ERROR: Wrong number of application arguments (found {}, required {})", numAppArgs, numRequired); return false; } return true; } + // Recursively acquire all resource quotas by getting the ZK children of the given path and calling this function + // on the children if there are any, or getting the data from this ZNode otherwise. + private void getResourceQuotas(final String path, final ZooKeeper zkClient, + final Map[] threadLocalMaps) throws Exception { + final List children = zkClient.getChildren(path, false); + if (children.isEmpty()) { + threadLocalMaps[random.nextInt(clients.length)].put(path, ObjectMapperFactory.getThreadLocal() + .readValue(zkClient.getData(path, false, null), ResourceQuota.class)); + } else { + for (final String child : children) { + getResourceQuotas(String.format("%s/%s", path, child), zkClient, threadLocalMaps); + } + } + } + + // Initialize a BundleData from a resource quota and configurations and modify the quota accordingly. + private BundleData initializeBundleData(final ResourceQuota quota, final ShellArguments arguments) { + final double messageRate = (quota.getMsgRateIn() + quota.getMsgRateOut()) / 2; + final int messageSize = (int) Math.ceil((quota.getBandwidthIn() + quota.getBandwidthOut()) / (2 * messageRate)); + arguments.rate = messageRate * arguments.rateMultiplier; + arguments.size = messageSize; + final NamespaceBundleStats startingStats = new NamespaceBundleStats(); + + // Modify the original quota so that new rates are set. + final double modifiedRate = messageRate * arguments.rateMultiplier; + final double modifiedBandwidth = (quota.getBandwidthIn() + quota.getBandwidthOut()) * arguments.rateMultiplier + / 2; + quota.setMsgRateIn(modifiedRate); + quota.setMsgRateOut(modifiedRate); + quota.setBandwidthIn(modifiedBandwidth); + quota.setBandwidthOut(modifiedBandwidth); + + // Assume modified memory usage is comparable to the rate multiplier times the original usage. + quota.setMemory(quota.getMemory() * arguments.rateMultiplier); + startingStats.msgRateIn = quota.getMsgRateIn(); + startingStats.msgRateOut = quota.getMsgRateOut(); + startingStats.msgThroughputIn = quota.getBandwidthIn(); + startingStats.msgThroughputOut = quota.getBandwidthOut(); + final BundleData bundleData = new BundleData(10, 1000, startingStats); + // Assume there is ample history for the bundle. + bundleData.getLongTermData().setNumSamples(1000); + bundleData.getShortTermData().setNumSamples(10); + return bundleData; + } + // Makes a destination string from a tenant name, namespace name, and topic // name. private String makeDestination(final String tenant, final String namespace, final String topic) { - return String.format("persistent://%s/%s/%s/%s", cluster, tenant, namespace, topic); + return String.format("persistent://%s/%s/%s/%s", tenant, cluster, namespace, topic); } // Write options that are common to modifying and creating topics. @@ -257,7 +297,7 @@ private void writeProducerOptions(final DataOutputStream outputStream, final She // doubles. final String[] splits = arguments.rangeString.split(","); if (splits.length != 2) { - log.info("ERROR: Argument to --rand-rate should be a two comma-separated values"); + log.error("Argument to --rand-rate should be two comma-separated values"); return; } final double first = Double.parseDouble(splits[0]); @@ -271,76 +311,205 @@ private void writeProducerOptions(final DataOutputStream outputStream, final She outputStream.writeDouble(arguments.rate); } - // Trade using the arguments parsed via JCommander and the destination name. - private synchronized void trade(final ShellArguments arguments, final String destination) throws Exception { - // Decide which server to send to randomly to preserve statelessness of - // the controller. - final int i = random.nextInt(servers.length); - log.info("Sending trade request to " + servers[i]); - outputStreams[i].write(LoadSimulationClient.TRADE_COMMAND); - writeProducerOptions(outputStreams[i], arguments, destination); - outputStreams[i].flush(); - if (inputStreams[i].read() != -1) { - log.info("Created producer and consumer for " + destination); + // Change producer settings for a given destination and JCommander arguments. + private void change(final ShellArguments arguments, final String destination, final int client) throws Exception { + outputStreams[client].write(LoadSimulationClient.CHANGE_COMMAND); + writeProducerOptions(outputStreams[client], arguments, destination); + outputStreams[client].flush(); + } + + // Change an existing topic, or create it if it does not exist. + private int changeOrCreate(final ShellArguments arguments, final String destination) throws Exception { + final int client = find(destination); + if (client == -1) { + trade(arguments, destination, random.nextInt(clients.length)); } else { - log.info("ERROR: Socket to {} closed", servers[i]); + change(arguments, destination, client); } + return client; } - private void handleTrade(final ShellArguments arguments) throws Exception { - final List commandArguments = arguments.commandArguments; - // Trade expects three application arguments: tenant, namespace, and - // topic. - if (checkAppArgs(commandArguments.size() - 1, 3)) { - final String destination = makeDestination(commandArguments.get(1), commandArguments.get(2), - commandArguments.get(3)); - trade(arguments, destination); + // Find a topic and change it if it exists. + private int changeIfExists(final ShellArguments arguments, final String destination) throws Exception { + final int client = find(destination); + if (client != -1) { + change(arguments, destination, client); } + return client; } - // Change producer settings for a given destination and JCommander - // arguments. - // Returns true if the topic was found and false otherwise. - private synchronized boolean change(final ShellArguments arguments, final String destination) throws Exception { - log.info("Searching for server with topic " + destination); - for (DataOutputStream outputStream : outputStreams) { - outputStream.write(LoadSimulationClient.CHANGE_COMMAND); - writeProducerOptions(outputStream, arguments, destination); - outputStream.flush(); + // Attempt to find a topic on the clients. + private int find(final String destination) throws Exception { + int clientWithTopic = -1; + for (int i = 0; i < clients.length; ++i) { + outputStreams[i].write(LoadSimulationClient.FIND_COMMAND); + outputStreams[i].writeUTF(destination); } - boolean foundTopic = false; - for (int i = 0; i < servers.length; ++i) { - int readValue; - switch (readValue = inputStreams[i].read()) { - case LoadSimulationClient.FOUND_TOPIC: - log.info("Found topic {} on server {}", destination, servers[i]); - foundTopic = true; - break; - case LoadSimulationClient.NO_SUCH_TOPIC: - break; - case -1: - log.info("ERROR: Socket to {} closed", servers[i]); - break; - default: - log.info("ERROR: Unknown response signal received: " + readValue); + for (int i = 0; i < clients.length; ++i) { + if (inputStreams[i].readBoolean()) { + clientWithTopic = i; } } - return foundTopic; + return clientWithTopic; } + // Trade using the arguments parsed via JCommander and the destination name. + private synchronized void trade(final ShellArguments arguments, final String destination, final int client) + throws Exception { + // Decide which client to send to randomly to preserve statelessness of + // the controller. + outputStreams[client].write(LoadSimulationClient.TRADE_COMMAND); + writeProducerOptions(outputStreams[client], arguments, destination); + outputStreams[client].flush(); + } + + // Handle the command line arguments associated with the change command. private void handleChange(final ShellArguments arguments) throws Exception { final List commandArguments = arguments.commandArguments; - // Change expects three application arguments: tenant name, namespace - // name, and topic name. + // Change expects three application arguments: tenant name, namespace name, and topic name. if (checkAppArgs(commandArguments.size() - 1, 3)) { final String destination = makeDestination(commandArguments.get(1), commandArguments.get(2), commandArguments.get(3)); - if (!change(arguments, destination)) { - log.info("ERROR: Topic {} not found", destination); + if (changeIfExists(arguments, destination) == -1) { + log.info("Topic {} not found", destination); } } } + // Handle the command line arguments associated with the copy command. + private void handleCopy(final ShellArguments arguments) throws Exception { + final List commandArguments = arguments.commandArguments; + // Copy accepts 3 application arguments: Tenant name, source ZooKeeper and target ZooKeeper connect strings. + if (checkAppArgs(commandArguments.size() - 1, 3)) { + final String tenantName = commandArguments.get(1); + final String sourceZKConnectString = commandArguments.get(2); + final String targetZKConnectString = commandArguments.get(3); + final ZooKeeper sourceZKClient = new ZooKeeper(sourceZKConnectString, 5000, null); + final ZooKeeper targetZKClient = new ZooKeeper(targetZKConnectString, 5000, null); + // Make a map for each thread to speed up the ZooKeeper writing process. + final Map[] threadLocalMaps = new Map[clients.length]; + for (int i = 0; i < clients.length; ++i) { + threadLocalMaps[i] = new HashMap<>(); + } + getResourceQuotas(QUOTA_ROOT, sourceZKClient, threadLocalMaps); + final List futures = new ArrayList<>(clients.length); + int i = 0; + log.info("Copying..."); + for (final Map bundleToQuota : threadLocalMaps) { + final int j = i; + futures.add(threadPool.submit(() -> { + for (final Map.Entry entry : bundleToQuota.entrySet()) { + final String bundle = entry.getKey(); + final ResourceQuota quota = entry.getValue(); + // Simulation will send messages in and out at about the same rate, so just make the rate the + // average of in and out. + + final int tenantStart = QUOTA_ROOT.length() + 1; + final int clusterStart = bundle.indexOf('/', tenantStart) + 1; + final String sourceTenant = bundle.substring(tenantStart, clusterStart - 1); + final int namespaceStart = bundle.indexOf('/', clusterStart) + 1; + final String sourceCluster = bundle.substring(clusterStart, namespaceStart - 1); + final String namespace = bundle.substring(namespaceStart, bundle.lastIndexOf('/')); + final String keyRangeString = bundle.substring(bundle.lastIndexOf('/') + 1); + // To prevent duplicate node issues for same namespace names in different clusters/tenants. + final String manglePrefix = String.format("%s-%s-%s", sourceCluster, sourceTenant, + keyRangeString); + final String mangledNamespace = String.format("%s-%s", manglePrefix, namespace); + final BundleData bundleData = initializeBundleData(quota, arguments); + final String oldAPITargetPath = String.format( + "/loadbalance/resource-quota/namespace/%s/%s/%s/0x00000000_0xffffffff", tenantName, + cluster, mangledNamespace); + final String newAPITargetPath = String.format( + "/loadbalance/bundle-data/%s/%s/%s/0x00000000_0xffffffff", tenantName, cluster, + mangledNamespace); + try { + ZkUtils.createFullPathOptimistic(targetZKClient, oldAPITargetPath, + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(quota), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + // Ignore already created nodes. + } catch (Exception e) { + throw new RuntimeException(e); + } + // Put the bundle data in the new ZooKeeper. + try { + ZkUtils.createFullPathOptimistic(targetZKClient, newAPITargetPath, + bundleData.getJsonBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + // Ignore already created nodes. + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + trade(arguments, makeDestination(tenantName, mangledNamespace, "t"), j); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + })); + ++i; + } + for (final Future future : futures) { + future.get(); + } + sourceZKClient.close(); + targetZKClient.close(); + } + } + + // Handle the command line arguments associated with the simulate command. + private void handleSimulate(final ShellArguments arguments) throws Exception { + final List commandArguments = arguments.commandArguments; + checkAppArgs(commandArguments.size() - 1, 1); + final ZooKeeper zkClient = new ZooKeeper(commandArguments.get(1), 5000, null); + // Make a map for each thread to speed up the ZooKeeper writing process. + final Map[] threadLocalMaps = new Map[clients.length]; + for (int i = 0; i < clients.length; ++i) { + threadLocalMaps[i] = new HashMap<>(); + } + getResourceQuotas(QUOTA_ROOT, zkClient, threadLocalMaps); + final List futures = new ArrayList<>(clients.length); + int i = 0; + log.info("Simulating..."); + for (final Map bundleToQuota : threadLocalMaps) { + final int j = i; + futures.add(threadPool.submit(() -> { + for (final Map.Entry entry : bundleToQuota.entrySet()) { + final String bundle = entry.getKey(); + final String newAPIPath = bundle.replace(QUOTA_ROOT, BUNDLE_DATA_ROOT); + final ResourceQuota quota = entry.getValue(); + final int tenantStart = QUOTA_ROOT.length() + 1; + final String destination = String.format("persistent://%s/t", bundle.substring(tenantStart)); + final BundleData bundleData = initializeBundleData(quota, arguments); + // Put the bundle data in the new ZooKeeper. + try { + ZkUtils.createFullPathOptimistic(zkClient, newAPIPath, bundleData.getJsonBytes(), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException e) { + try { + zkClient.setData(newAPIPath, bundleData.getJsonBytes(), -1); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + trade(arguments, destination, j); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + })); + ++i; + } + for (final Future future : futures) { + future.get(); + } + zkClient.close(); + } + + // Handle the command line arguments associated with the stop command. private void handleStop(final ShellArguments arguments) throws Exception { final List commandArguments = arguments.commandArguments; // Stop expects three application arguments: tenant name, namespace @@ -348,62 +517,43 @@ private void handleStop(final ShellArguments arguments) throws Exception { if (checkAppArgs(commandArguments.size() - 1, 3)) { final String destination = makeDestination(commandArguments.get(1), commandArguments.get(2), commandArguments.get(3)); - log.info("Searching for server with topic " + destination); for (DataOutputStream outputStream : outputStreams) { outputStream.write(LoadSimulationClient.STOP_COMMAND); outputStream.writeUTF(destination); outputStream.flush(); } - boolean foundTopic = false; - for (int i = 0; i < servers.length; ++i) { - int readValue; - switch (readValue = inputStreams[i].read()) { - case LoadSimulationClient.FOUND_TOPIC: - log.info("Found topic {} on server {}", destination, servers[i]); - foundTopic = true; - break; - case LoadSimulationClient.NO_SUCH_TOPIC: - break; - case LoadSimulationClient.REDUNDANT_COMMAND: - log.info("ERROR: Topic {} already stopped on {}", destination, servers[i]); - foundTopic = true; - break; - case -1: - log.info("ERROR: Socket to {} closed", servers[i]); - break; - default: - log.info("ERROR: Unknown response signal received: " + readValue); - } - } - if (!foundTopic) { - log.info("ERROR: Topic {} not found", destination); - } } } - private void handleGroupTrade(final ShellArguments arguments) throws Exception { + // Handle the command line arguments associated with the stream command. + private void handleStream(final ShellArguments arguments) throws Exception { final List commandArguments = arguments.commandArguments; - // Group trade expects 3 application arguments: tenant name, group name, - // and number of namespaces. + // Stream accepts 1 application argument: ZooKeeper connect string. + if (checkAppArgs(commandArguments.size() - 1, 1)) { + final String zkConnectString = commandArguments.get(1); + final ZooKeeper zkClient = new ZooKeeper(zkConnectString, 5000, null); + new BrokerWatcher(zkClient, arguments); + // This controller will now stream rate changes from the given ZK. + // Users wishing to stop this should Ctrl + C and use another + // Controller to send new commands. + while (true) + ; + } + } + + // Handle the command line arguments associated with the trade command. + private void handleTrade(final ShellArguments arguments) throws Exception { + final List commandArguments = arguments.commandArguments; + // Trade expects three application arguments: tenant, namespace, and + // topic. if (checkAppArgs(commandArguments.size() - 1, 3)) { - final String tenant = commandArguments.get(1); - final String group = commandArguments.get(2); - final int numNamespaces = Integer.parseInt(commandArguments.get(3)); - for (int i = 0; i < numNamespaces; ++i) { - for (int j = 0; j < arguments.topicsPerNamespace; ++j) { - // For each namespace and topic pair, create the namespace - // by using the group name and the - // namespace index, and then create the topic by using the - // topic index. Then just call trade. - final String destination = makeDestination(tenant, String.format("%s-%d", group, i), - Integer.toString(j)); - trade(arguments, destination); - Thread.sleep(arguments.separation); - } - } + final String destination = makeDestination(commandArguments.get(1), commandArguments.get(2), + commandArguments.get(3)); + trade(arguments, destination, random.nextInt(clients.length)); } } + // Handle the command line arguments associated with the group change command. private void handleGroupChange(final ShellArguments arguments) throws Exception { final List commandArguments = arguments.commandArguments; // Group change expects two application arguments: tenant name and group @@ -419,37 +569,10 @@ private void handleGroupChange(final ShellArguments arguments) throws Exception outputStream.writeDouble(arguments.rate); outputStream.flush(); } - accumulateAndReport(tenant, group); - } - } - - // Report the number of topics found belonging to the given tenant and - // group. - private void accumulateAndReport(final String tenant, final String group) throws Exception { - int numFound = 0; - for (int i = 0; i < servers.length; ++i) { - final int foundOnServer = inputStreams[i].readInt(); - if (foundOnServer == -1) { - log.info("ERROR: Socket to {} closed", servers[i]); - } else if (foundOnServer == 0) { - log.info("Found no topics belonging to tenant {} and group {} on {}", tenant, group, - servers[i]); - } else if (foundOnServer > 0) { - log.info("Found {} topics belonging to tenant {} and group {} on {}", foundOnServer, tenant, - group, servers[i]); - numFound += foundOnServer; - } else { - log.info("ERROR: Negative value {} received for topic count on {}", foundOnServer, - servers[i]); - } - } - if (numFound == 0) { - log.info("ERROR: Found no topics belonging to tenant {} and group {}", tenant, group); - } else { - log.info("Found {} topics belonging to tenant {} and group {}", numFound, tenant, group); } } + // Handle the command line arguments associated with the group stop command. private void handleGroupStop(final ShellArguments arguments) throws Exception { final List commandArguments = arguments.commandArguments; // Group stop requires two application arguments: tenant name and group @@ -463,116 +586,40 @@ private void handleGroupStop(final ShellArguments arguments) throws Exception { outputStream.writeUTF(group); outputStream.flush(); } - accumulateAndReport(tenant, group); } } - // Recursively acquire all resource quotas by getting the ZK children of the - // given path and calling this function - // on the children if there are any, or getting the data from this ZNode - // otherwise. - private void getResourceQuotas(final String path, final ZooKeeper zkClient, - final Map bundleToQuota) throws Exception { - final List children = zkClient.getChildren(path, false); - if (children.isEmpty()) { - bundleToQuota.put(path, ObjectMapperFactory.getThreadLocal().readValue(zkClient.getData(path, false, null), - ResourceQuota.class)); - } else { - for (final String child : children) { - getResourceQuotas(String.format("%s/%s", path, child), zkClient, bundleToQuota); - } - } - } - - private void handleStream(final ShellArguments arguments) throws Exception { - final List commandArguments = arguments.commandArguments; - // Stream accepts 1 application argument: ZooKeeper connect string. - if (checkAppArgs(commandArguments.size() - 1, 1)) { - final String zkConnectString = commandArguments.get(1); - final ZooKeeper zkClient = new ZooKeeper(zkConnectString, 5000, null); - new BrokerWatcher("/loadbalance/brokers", zkClient, arguments); - // This controller will now stream rate changes from the given ZK. - // Users wishing to stop this should Ctrl + C and use another - // Controller to send new commands. - while (true) - ; - } - } - - private void handleCopy(final ShellArguments arguments) throws Exception { + // Handle the command line arguments associated with the group trade command. + private void handleGroupTrade(final ShellArguments arguments) throws Exception { final List commandArguments = arguments.commandArguments; - // Copy accepts 3 application arguments: Tenant name, source ZooKeeper - // and target ZooKeeper connect strings. + // Group trade expects 3 application arguments: tenant name, group name, + // and number of namespaces. if (checkAppArgs(commandArguments.size() - 1, 3)) { - final String tenantName = commandArguments.get(1); - final String sourceZKConnectString = commandArguments.get(2); - final String targetZKConnectString = commandArguments.get(3); - final ZooKeeper sourceZKClient = new ZooKeeper(sourceZKConnectString, 5000, null); - final ZooKeeper targetZKClient = new ZooKeeper(targetZKConnectString, 5000, null); - final Map bundleToQuota = new HashMap<>(); - getResourceQuotas(QUOTA_ROOT, sourceZKClient, bundleToQuota); - for (final Map.Entry entry : bundleToQuota.entrySet()) { - final String bundle = entry.getKey(); - final ResourceQuota quota = entry.getValue(); - // Simulation will send messages in and out at about the same - // rate, so just make the rate the average - // of in and out. - final double messageRate = (quota.getMsgRateIn() + quota.getMsgRateOut()) / 2; - final int messageSize = (int) Math - .ceil((quota.getBandwidthIn() + quota.getBandwidthOut()) / messageRate); - final int clusterStart = QUOTA_ROOT.length() + 1; - final int tenantStart = bundle.indexOf('/', clusterStart) + 1; - final String sourceCluster = bundle.substring(clusterStart, tenantStart - 1); - final int namespaceStart = bundle.indexOf('/', tenantStart) + 1; - final String sourceTenant = bundle.substring(tenantStart, namespaceStart - 1); - final String namespace = bundle.substring(namespaceStart, bundle.lastIndexOf('/')); - final String keyRangeString = bundle.substring(bundle.lastIndexOf('/') + 1); - // To prevent duplicate node issues for same namespace names in - // different clusters/tenants. - final String manglePrefix = String.format("%s-%s-%s", sourceCluster, sourceTenant, keyRangeString); - final String mangledNamespace = String.format("%s-%s", manglePrefix, namespace); - arguments.rate = messageRate * arguments.rateMultiplier; - arguments.size = messageSize; - final NamespaceBundleStats startingStats = new NamespaceBundleStats(); - - // Modify the original quota so that new rates are set. - quota.setMsgRateIn(quota.getMsgRateIn() * arguments.rateMultiplier); - quota.setMsgRateOut(quota.getMsgRateOut() * arguments.rateMultiplier); - quota.setBandwidthIn(quota.getBandwidthIn() * arguments.rateMultiplier); - quota.setBandwidthOut(quota.getBandwidthOut() * arguments.rateMultiplier); - - // Assume modified memory usage is comparable to the rate - // multiplier times the original usage. - quota.setMemory(quota.getMemory() * arguments.rateMultiplier); - startingStats.msgRateIn = quota.getMsgRateIn(); - startingStats.msgRateOut = quota.getMsgRateOut(); - startingStats.msgThroughputIn = quota.getBandwidthIn(); - startingStats.msgThroughputOut = quota.getBandwidthOut(); - final BundleData bundleData = new BundleData(10, 1000, startingStats); - // Assume there is ample history for topic. - bundleData.getLongTermData().setNumSamples(1000); - bundleData.getShortTermData().setNumSamples(1000); - final String oldAPITargetPath = String.format( - "/loadbalance/resource-quota/namespace/%s/%s/%s/0x00000000_0xffffffff", cluster, tenantName, - mangledNamespace); - final String newAPITargetPath = String.format("/loadbalance/bundle-data/%s/%s/%s/0x00000000_0xffffffff", - cluster, tenantName, mangledNamespace); - log.info("Copying {} to {}", bundle, oldAPITargetPath); - ZkUtils.createFullPathOptimistic(targetZKClient, oldAPITargetPath, - ObjectMapperFactory.getThreadLocal().writeValueAsBytes(quota), ZooDefs.Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); - log.info("Creating new API data at {}", newAPITargetPath); - // Put the quota in the new ZooKeeper. - ZkUtils.createFullPathOptimistic(targetZKClient, newAPITargetPath, bundleData.getJsonBytes(), - ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - trade(arguments, makeDestination(tenantName, mangledNamespace, "t")); + final String tenant = commandArguments.get(1); + final String group = commandArguments.get(2); + final int numNamespaces = Integer.parseInt(commandArguments.get(3)); + for (int i = 0; i < numNamespaces; ++i) { + for (int j = 0; j < arguments.topicsPerNamespace; ++j) { + // For each namespace and topic pair, create the namespace + // by using the group name and the + // namespace index, and then create the topic by using the + // topic index. Then just call trade. + final String destination = makeDestination(tenant, String.format("%s-%d", group, i), + Integer.toString(j)); + trade(arguments, destination, random.nextInt(clients.length)); + Thread.sleep(arguments.separation); + } } - sourceZKClient.close(); - targetZKClient.close(); } } - public void read(final String[] args) { + /** + * Read the user-submitted arguments as commands to send to clients. + * + * @param args + * Arguments split on whitespace from user input. + */ + private void read(final String[] args) { // Don't attempt to process blank input. if (args.length > 0 && !(args.length == 1 && args[0].isEmpty())) { final ShellArguments arguments = new ShellArguments(); @@ -620,6 +667,9 @@ public void read(final String[] args) { case "stream": handleStream(arguments); break; + case "simulate": + handleSimulate(arguments); + break; case "quit": case "exit": System.exit(0); @@ -636,6 +686,9 @@ public void read(final String[] args) { } } + /** + * Create a shell for the user to send commands to clients. + */ public void run() throws Exception { BufferedReader inReader = new BufferedReader(new InputStreamReader(System.in)); while (true) { @@ -646,6 +699,12 @@ public void run() throws Exception { } } + /** + * Start a controller with command line arguments. + * + * @param args + * Arguments to pass in. + */ public static void main(String[] args) throws Exception { final MainArguments arguments = new MainArguments(); final JCommander jc = new JCommander(arguments); diff --git a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/ModularLoadManagerBrokerMonitor.java b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/ModularLoadManagerBrokerMonitor.java deleted file mode 100644 index afdb32d3d1712..0000000000000 --- a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/ModularLoadManagerBrokerMonitor.java +++ /dev/null @@ -1,205 +0,0 @@ -/** - * Copyright 2016 Yahoo Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yahoo.pulsar.testclient; - -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; - -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.google.gson.Gson; -import com.yahoo.pulsar.broker.LocalBrokerData; -import com.yahoo.pulsar.broker.TimeAverageBrokerData; -import com.yahoo.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class ModularLoadManagerBrokerMonitor { - private static final Logger log = LoggerFactory.getLogger(ModularLoadManagerImpl.class); - private static final String BROKER_ROOT = "/loadbalance/brokers"; - private static final int ZOOKEEPER_TIMEOUT_MILLIS = 5000; - private final ZooKeeper zkClient; - private static final Gson gson = new Gson(); - - private static class BrokerWatcher implements Watcher { - public final ZooKeeper zkClient; - public Set brokers; - - public BrokerWatcher(final ZooKeeper zkClient) { - this.zkClient = zkClient; - this.brokers = Collections.emptySet(); - } - - public synchronized void process(final WatchedEvent event) { - try { - if (event.getType() == Event.EventType.NodeChildrenChanged) { - updateBrokers(event.getPath()); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - public synchronized void updateBrokers(final String path) { - final Set newBrokers = new HashSet<>(); - try { - newBrokers.addAll(zkClient.getChildren(path, this)); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - for (String oldBroker : brokers) { - if (!newBrokers.contains(oldBroker)) { - log.info("Lost broker: " + oldBroker); - } - } - for (String newBroker : newBrokers) { - if (!brokers.contains(newBroker)) { - log.info("Gained broker: " + newBroker); - final BrokerDataWatcher brokerDataWatcher = new BrokerDataWatcher(zkClient); - brokerDataWatcher.printBrokerData(path + "/" + newBroker); - } - } - this.brokers = newBrokers; - } - } - - private static class BrokerDataWatcher implements Watcher { - private final ZooKeeper zkClient; - - public BrokerDataWatcher(final ZooKeeper zkClient) { - this.zkClient = zkClient; - } - - public static String brokerNameFromPath(final String path) { - return path.substring(path.lastIndexOf('/') + 1); - } - - public synchronized void process(final WatchedEvent event) { - try { - if (event.getType() == Event.EventType.NodeDataChanged) { - final String broker = brokerNameFromPath(event.getPath()); - printBrokerData(event.getPath()); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - private static void printMessageData(final double msgThroughputIn, final double msgThroughputOut, - final double msgRateIn, final double msgRateOut) { - log.info(String.format("Message Throughput In: %.2f KB/s", msgThroughputIn / 1024)); - log.info(String.format("Message Throughput Out: %.2f KB/s", msgThroughputOut / 1024)); - log.info(String.format("Message Rate In: %.2f msgs/s", msgRateIn)); - log.info(String.format("Message Rate Out: %.2f msgs/s", msgRateOut)); - } - - public synchronized void printBrokerData(final String brokerPath) { - final String broker = brokerNameFromPath(brokerPath); - final String timeAveragePath = ModularLoadManagerImpl.TIME_AVERAGE_BROKER_ZPATH + "/" + broker; - LocalBrokerData localBrokerData; - try { - localBrokerData = gson.fromJson(new String(zkClient.getData(brokerPath, this, null)), - LocalBrokerData.class); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - - log.info("Broker Data for " + broker + ":"); - log.info("---------------"); - - log.info("Num Topics: " + localBrokerData.getNumTopics()); - log.info("Num Bundles: " + localBrokerData.getNumBundles()); - log.info("Num Consumers: " + localBrokerData.getNumConsumers()); - log.info("Num Producers: " + localBrokerData.getNumProducers()); - - log.info(String.format("CPU: %.2f%%", localBrokerData.getCpu().percentUsage())); - - log.info(String.format("Memory: %.2f%%", localBrokerData.getMemory().percentUsage())); - - log.info(String.format("Direct Memory: %.2f%%", localBrokerData.getDirectMemory().percentUsage())); - - log.info("Latest Data:"); - printMessageData(localBrokerData.getMsgThroughputIn(), localBrokerData.getMsgThroughputOut(), - localBrokerData.getMsgRateIn(), localBrokerData.getMsgRateOut()); - - TimeAverageBrokerData timeAverageData; - try { - timeAverageData = gson.fromJson(new String(zkClient.getData(timeAveragePath, null, null)), - TimeAverageBrokerData.class); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - log.info("Short Term Data:"); - printMessageData(timeAverageData.getShortTermMsgThroughputIn(), - timeAverageData.getShortTermMsgThroughputOut(), timeAverageData.getShortTermMsgRateIn(), - timeAverageData.getShortTermMsgRateOut()); - - log.info("Long Term Data:"); - printMessageData(timeAverageData.getLongTermMsgThroughputIn(), - timeAverageData.getLongTermMsgThroughputOut(), timeAverageData.getLongTermMsgRateIn(), - timeAverageData.getLongTermMsgRateOut()); - - if (!localBrokerData.getLastBundleGains().isEmpty()) { - for (String bundle : localBrokerData.getLastBundleGains()) { - log.info("Gained Bundle: " + bundle); - } - } - if (!localBrokerData.getLastBundleLosses().isEmpty()) { - for (String bundle : localBrokerData.getLastBundleLosses()) { - log.info("Lost Bundle: " + bundle); - } - } - } - } - - static class Arguments { - @Parameter(names = { "--connect-string" }, description = "Zookeeper connect string", required = true) - public String connectString = null; - } - - public ModularLoadManagerBrokerMonitor(final ZooKeeper zkClient) { - this.zkClient = zkClient; - } - - private void start() { - try { - final BrokerWatcher brokerWatcher = new BrokerWatcher(zkClient); - brokerWatcher.updateBrokers(BROKER_ROOT); - while (true) { - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - public static void main(String[] args) { - try { - final Arguments arguments = new Arguments(); - final JCommander jc = new JCommander(arguments); - jc.parse(args); - final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null); - final ModularLoadManagerBrokerMonitor monitor = new ModularLoadManagerBrokerMonitor(zkClient); - monitor.start(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } -} diff --git a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/SimpleLoadManagerBrokerMonitor.java b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/SimpleLoadManagerBrokerMonitor.java deleted file mode 100644 index 603d8b82d3009..0000000000000 --- a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/SimpleLoadManagerBrokerMonitor.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Copyright 2016 Yahoo Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.yahoo.pulsar.testclient; - -import com.beust.jcommander.JCommander; -import com.beust.jcommander.Parameter; -import com.google.gson.Gson; -import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport; -import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; -import org.apache.zookeeper.ZooKeeper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * To use the monitor, simply start one via pulsar-perf monitor --connect-string : You will then - * receive updates in LoadReports as they occur. - */ -public class SimpleLoadManagerBrokerMonitor { - private static final Logger log = LoggerFactory.getLogger(SimpleLoadManagerBrokerMonitor.class); - private static final String BROKER_ROOT = "/loadbalance/brokers"; - private static final int ZOOKEEPER_TIMEOUT_MILLIS = 5000; - private final ZooKeeper zkClient; - private static final Gson gson = new Gson(); - - private static class BrokerWatcher implements Watcher { - public final ZooKeeper zkClient; - public Set brokers; - - public BrokerWatcher(final ZooKeeper zkClient) { - this.zkClient = zkClient; - this.brokers = Collections.emptySet(); - } - - public synchronized void process(final WatchedEvent event) { - try { - if (event.getType() == Event.EventType.NodeChildrenChanged) { - updateBrokers(event.getPath()); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - public synchronized void updateBrokers(final String path) { - final Set newBrokers = new HashSet<>(); - try { - newBrokers.addAll(zkClient.getChildren(path, this)); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - for (String oldBroker : brokers) { - if (!newBrokers.contains(oldBroker)) { - log.info("Lost broker: " + oldBroker); - } - } - for (String newBroker : newBrokers) { - if (!brokers.contains(newBroker)) { - log.info("Gained broker: " + newBroker); - final LoadReportWatcher loadReportWatcher = new LoadReportWatcher(zkClient); - loadReportWatcher.printLoadReport(path + "/" + newBroker); - } - } - this.brokers = newBrokers; - } - } - - private static class LoadReportWatcher implements Watcher { - private final ZooKeeper zkClient; - - public LoadReportWatcher(final ZooKeeper zkClient) { - this.zkClient = zkClient; - } - - public synchronized void process(final WatchedEvent event) { - try { - if (event.getType() == Event.EventType.NodeDataChanged) { - printLoadReport(event.getPath()); - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - public synchronized void printLoadReport(final String path) { - final String brokerName = path.substring(path.lastIndexOf('/') + 1); - LoadReport loadReport; - try { - loadReport = gson.fromJson(new String(zkClient.getData(path, this, null)), LoadReport.class); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - final SystemResourceUsage resourceUsage = loadReport.getSystemResourceUsage(); - - log.info("Load Report for " + brokerName + ":"); - log.info("---------------"); - - log.info("Num Topics: " + loadReport.getNumTopics()); - log.info("Num Bundles: " + loadReport.getNumBundles()); - - log.info(String.format("Raw CPU: %.2f%%", resourceUsage.getCpu().percentUsage())); - log.info(String.format("Allocated CPU: %.2f%%", - percentUsage(loadReport.getAllocatedCPU(), resourceUsage.getCpu().limit))); - log.info(String.format("Preallocated CPU: %.2f%%", - percentUsage(loadReport.getPreAllocatedCPU(), resourceUsage.getCpu().limit))); - - log.info(String.format("Raw Memory: %.2f%%", resourceUsage.getMemory().percentUsage())); - log.info(String.format("Allocated Memory: %.2f%%", - percentUsage(loadReport.getAllocatedMemory(), resourceUsage.getMemory().limit))); - log.info(String.format("Preallocated Memory: %.2f%%", - percentUsage(loadReport.getPreAllocatedMemory(), resourceUsage.getMemory().limit))); - - log.info(String.format("Raw Bandwidth In: %.2f%%", resourceUsage.getBandwidthIn().percentUsage())); - log.info(String.format("Allocated Bandwidth In: %.2f%%", - percentUsage(loadReport.getAllocatedBandwidthIn(), resourceUsage.getBandwidthIn().limit))); - log.info(String.format("Preallocated Bandwidth In: %.2f%%", - percentUsage(loadReport.getPreAllocatedBandwidthIn(), resourceUsage.getBandwidthIn().limit))); - - log.info(String.format("Raw Bandwidth Out: %.2f%%", resourceUsage.getBandwidthOut().percentUsage())); - log.info(String.format("Allocated Bandwidth Out: %.2f%%", - percentUsage(loadReport.getAllocatedBandwidthOut(), resourceUsage.getBandwidthOut().limit))); - log.info(String.format("Preallocated Bandwidth Out: %.2f%%", - percentUsage(loadReport.getPreAllocatedBandwidthOut(), resourceUsage.getBandwidthOut().limit))); - - log.info(String.format("Direct Memory: %.2f%%", resourceUsage.getDirectMemory().percentUsage())); - - log.info(String.format("Messages In Per Second: %.2f", loadReport.getMsgRateIn())); - log.info(String.format("Messages Out Per Second: %.2f", loadReport.getMsgRateOut())); - log.info(String.format("Preallocated Messages In Per Second: %.2f", loadReport.getPreAllocatedMsgRateIn())); - log.info(String.format("Preallocated Out Per Second: %.2f", loadReport.getPreAllocatedMsgRateOut())); - - if (!loadReport.getBundleGains().isEmpty()) { - for (String bundle : loadReport.getBundleGains()) { - log.info("Gained Bundle: " + bundle); - } - } - if (!loadReport.getBundleLosses().isEmpty()) { - for (String bundle : loadReport.getBundleLosses()) { - log.info("Lost Bundle: " + bundle); - } - } - } - } - - static class Arguments { - @Parameter(names = { "--connect-string" }, description = "Zookeeper connect string", required = true) - public String connectString = null; - } - - public SimpleLoadManagerBrokerMonitor(final ZooKeeper zkClient) { - this.zkClient = zkClient; - } - - private static double percentUsage(final double usage, final double limit) { - return limit > 0 && usage >= 0 ? 100 * Math.min(1, usage / limit) : 0; - } - - private void start() { - try { - final BrokerWatcher brokerWatcher = new BrokerWatcher(zkClient); - brokerWatcher.updateBrokers(BROKER_ROOT); - while (true) { - } - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - - public static void main(String[] args) { - try { - final Arguments arguments = new Arguments(); - final JCommander jc = new JCommander(arguments); - jc.parse(args); - final ZooKeeper zkClient = new ZooKeeper(arguments.connectString, ZOOKEEPER_TIMEOUT_MILLIS, null); - final SimpleLoadManagerBrokerMonitor monitor = new SimpleLoadManagerBrokerMonitor(zkClient); - monitor.start(); - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } -} diff --git a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/utils/FixedColumnLengthTableMaker.java b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/utils/FixedColumnLengthTableMaker.java new file mode 100644 index 0000000000000..fa7016eac0c95 --- /dev/null +++ b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/utils/FixedColumnLengthTableMaker.java @@ -0,0 +1,141 @@ +package com.yahoo.pulsar.testclient.utils; + +import java.util.Objects; +import java.util.function.Function; + +/** + * Light-weight utility for creating rows where each column has a fixed length in a command-line setting. + */ +public class FixedColumnLengthTableMaker { + /** + * Character to duplicate to make the bottom border. + */ + public char bottomBorder = '='; + + /** + * Format String to apply to decimal entries. If set to null, no special formatting is applied. + */ + public String decimalFormatter = null; + + /** + * Length of table elements. Elements whose String representations exceed this length are trimmed down to this + * length. + */ + public int elementLength = 10; + + /** + * The border to use to make the left side of the table. + */ + public String leftBorder = "||"; + + /** + * The amount of spacing to pad left of an element with. + */ + public int leftPadding = 0; + + /** + * The border to use to make the right side of the table. + */ + public String rightBorder = "||"; + + /** + * The amount of spacing to pad right of an element with. + */ + public int rightPadding = 1; + + /** + * The String to separate elements with. + */ + public String separator = "|"; + + /** + * Character to duplicate to make the top border. + */ + public char topBorder = '='; + + /** + * If not null, lengthFunction should give the length for the given column index. + */ + public Function lengthFunction = null; + + // Helper function to add top and bottom borders. + private void addHorizontalBorder(final int length, final StringBuilder builder, final char borderChar) { + for (int i = 0; i < length; ++i) { + builder.append(borderChar); + } + } + + // Helper function to pad with white space. + private void addSpace(final int amount, final StringBuilder builder) { + for (int i = 0; i < amount; ++i) { + builder.append(' '); + } + } + + private int lengthFor(final int column) { + return lengthFunction == null ? elementLength : lengthFunction.apply(column); + } + + /** + * Make a table using the specified settings. + * + * @param rows + * Rows to construct the table from. + * @return A String version of the table. + */ + public String make(final Object[][] rows) { + final StringBuilder builder = new StringBuilder(); + int numColumns = 0; + for (final Object[] row : rows) { + // Take the largest number of columns out of any row to be the total. + numColumns = Math.max(numColumns, row.length); + } + // Total length of the table in characters. + int totalLength = numColumns * (leftPadding + rightPadding + separator.length()) - separator.length() + + leftBorder.length() + rightBorder.length(); + for (int i = 0; i < numColumns; ++i) { + totalLength += lengthFor(i); + } + addHorizontalBorder(totalLength, builder, topBorder); + builder.append('\n'); + int i; + for (final Object[] row : rows) { + i = 0; + builder.append(leftBorder); + for (final Object element : row) { + addSpace(leftPadding, builder); + String elementString; + if ((element instanceof Float || element instanceof Double) && decimalFormatter != null) { + elementString = String.format(decimalFormatter, element); + } else { + // Avoid throwing NPE + elementString = Objects.toString(element, ""); + } + if (elementString.length() > lengthFor(i)) { + // Trim down to the maximum number of characters. + elementString = elementString.substring(0, lengthFor(i)); + } + builder.append(elementString); + // Add the space due to remaining characters and the right padding. + addSpace(lengthFor(i) - elementString.length() + rightPadding, builder); + if (i != numColumns - 1) { + // Don't add separator for the last column. + builder.append(separator); + } + i += 1; + } + // Put empty elements for remaining columns. + for (; i < numColumns; ++i) { + addSpace(leftPadding + rightPadding + lengthFor(i), builder); + if (i != numColumns - 1) { + builder.append(separator); + } + } + builder.append(rightBorder); + builder.append('\n'); + } + addHorizontalBorder(totalLength, builder, bottomBorder); + return builder.toString(); + } + +}