Skip to content

Commit

Permalink
Added better error reporting in PulsarClientToolTest
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat committed Sep 12, 2016
1 parent bd18ebc commit 39b7407
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,11 @@ public int run() throws PulsarClientException, IOException {

RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
if (limiter != null)
if (limiter != null) {
limiter.acquire();
}

Message msg = consumer.receive(20, TimeUnit.SECONDS);
Message msg = consumer.receive(5, TimeUnit.SECONDS);
if (msg == null) {
LOG.warn("No message to consume after waiting for 20 seconds.");
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,19 @@
*/
package com.yahoo.pulsar.client.cli;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.net.MalformedURLException;
import java.util.Properties;
import java.lang.InterruptedException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;

import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import com.yahoo.pulsar.broker.service.BrokerTestBase;
import com.yahoo.pulsar.client.cli.PulsarClientTool;

@Test
public class PulsarClientToolTest extends BrokerTestBase {
Expand All @@ -56,30 +52,31 @@ public void testInitialzation() throws MalformedURLException, InterruptedExcepti

String topicName = "persistent://property/ns/topic-scale-ns-0/topic";

int numberOfMessages = 100;
int numberOfMessages = 10;

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ExecutorService executor = Executors.newSingleThreadExecutor();

Future<Boolean> future = executor.submit(() -> {
CompletableFuture<Void> future = new CompletableFuture<Void>();
executor.execute(() -> {
PulsarClientTool pulsarClientToolConsumer;
try {
pulsarClientToolConsumer = new PulsarClientTool(properties);
String[] args = { "consume", "-t", "Exclusive", "-s", "sub-name", "-n",
Integer.toString(numberOfMessages), "--hex", "-r", "100", topicName };
Integer.toString(numberOfMessages), "--hex", "-r", "10", topicName };
Assert.assertEquals(pulsarClientToolConsumer.run(args), 0);
} catch (MalformedURLException e) {
Assert.fail("Exception : " + e.getMessage());
return false;
future.complete(null);
} catch (Throwable t) {
future.completeExceptionally(t);
}
return true;
});

PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties);

String[] args = { "produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r",
"200", topicName };
"20", topicName };
Assert.assertEquals(pulsarClientToolProducer.run(args), 0);

Assert.assertTrue(future.get(), "Exception occured while running consume task.");
future.get();
executor.shutdown();
}
}

0 comments on commit 39b7407

Please sign in to comment.