Skip to content

Commit

Permalink
FLUME-603: Revisit Performance tests
Browse files Browse the repository at this point in the history
- Remove dependencies on non-existing data sets.
- Remove reporting tests (not relevent/supported)
- Add some accounting to avro sink to present proper MB/s statistitcs on batched datasets
- Update Maven build to create "microbenchmarks" profile
  • Loading branch information
Jonathan Hsieh committed Jun 30, 2011
1 parent f7f2e10 commit bef14eb
Show file tree
Hide file tree
Showing 36 changed files with 1,208 additions and 474 deletions.
34 changes: 34 additions & 0 deletions DEVNOTES
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ we describe the contents of different directories.
./flume-distribution Flume distribution package module
./flume-docs Flume documentation generation module
./flume-log4j-appender Flume log4j-avro appender module
./flume-microbenchmarks Flume performance microbenchmark test suite
./flume-node-web Flume node status servlet module
./flume-windows-dist Flume node Windows distribution package module
./plugins/ Flume plugin modules (hello world skeleton and hbase)
Expand Down Expand Up @@ -177,6 +178,39 @@ This directory is setup exactly as the tarball installation of Flume
would be.


=== Running Performance Microbenchmarks.

The suite of source and sink microbenchmark tests (located in
./flume-microbenchmarks/javaperf) can be run by using `mvn test -Pperf`.

Just like with the normal test cases, you can use the
`-Dtest=<TestClass>`. So you can do:

----
mvn test -Pperf -Dtest=PerfThriftSinks
----

The logs should output lines that are formatted similarly to these
lines:

----
[junit] nullsink,ubuntu,begin,10998597,552872,disk_loaded,2895851957,301662152,receiver_started,156786445,305698624,sink_started,105303802,305704456,thrift sink to thrift source done,39520160510,320377056,MB/s,4.579940971898899,23094932,320379168
[junit] [ 0us, 547,544 b mem] Starting (after gc)
[junit] [ 10,998,597ns d 10,998,597ns 552,872 b mem] begin
[junit] [ 2,914,443,637ns d 2,895,851,957ns 301,662,152 b mem] disk_loaded
[junit] [ 3,514,297,391ns d 156,786,445ns 305,698,624 b mem] receiver_started
[junit] [ 4,082,661,503ns d 105,303,802ns 305,704,456 b mem] sink_started
[junit] [ 44,235,264,972ns d 39,520,160,510ns 320,377,056 b mem] thrift sink to thrift source done
[junit] [ 44,878,445,315ns d 23,094,932ns 320,379,168 b mem] MB/s,4.579940971898899
----

The first line is a summary of all the information in cvs format. The
other lines are in a tabular, more human-readable form. The left
column is cumulative time in ns and the middle is delta from previous
in ns. The last column of numbers the amount of memory in heap,
followed but some comments or labels.


=== Building on Windows platforms

Building Flume in Windows is possible. One can generate packages and
Expand Down
15 changes: 15 additions & 0 deletions flume-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,23 @@
</goals>
</execution>
</executions>


</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.2</version>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@

import java.io.IOException;
import java.net.URL;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.avro.AvroRemoteException;
import org.apache.avro.ipc.AccountingTransceiver;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,7 +36,7 @@
import com.cloudera.flume.reporter.ReportEvent;

/**
*This is a sink that sends events to a remote host/port using Avro.
* This is a sink that sends events to a remote host/port using Avro.
*/
public class AvroEventSink extends EventSink.Base {

Expand All @@ -48,15 +49,7 @@ public class AvroEventSink extends EventSink.Base {
protected FlumeEventAvroServer avroClient;
String host;
int port;
HttpTransceiver transport;

// this boolean variable is not used anywhere
boolean nonblocking;
/*
* The following variables keeps track of the number of bytes of the
* Event.body shipped.
*/
AtomicLong sentBytes = new AtomicLong();
AccountingTransceiver transport;

public AvroEventSink(String host, int port) {
this.host = host;
Expand All @@ -74,7 +67,6 @@ public void append(Event e) throws IOException, InterruptedException {
this.ensureInitialized();
try {
avroClient.append(afe);
sentBytes.addAndGet(e.getBody().length);
super.append(e);
} catch (AvroRemoteException e1) {
throw new IOException("Append failed " + e1.getMessage(), e1);
Expand All @@ -94,7 +86,8 @@ private void ensureInitialized() throws IOException {
public void open() throws IOException {

URL url = new URL("http", host, port, "/");
transport = new HttpTransceiver(url);
Transceiver http = new HttpTransceiver(url);
transport = new AccountingTransceiver(http);
try {
this.avroClient = (FlumeEventAvroServer) SpecificRequestor.getClient(
FlumeEventAvroServer.class, transport);
Expand All @@ -119,6 +112,10 @@ public void close() throws IOException {
}
}

public long getSentBytes() {
return transport.getSentBytes();
}

/**
* {@inheritDoc}
*/
Expand All @@ -127,7 +124,7 @@ public ReportEvent getMetrics() {
ReportEvent rpt = super.getMetrics();
rpt.setStringMetric(A_SERVERHOST, host);
rpt.setLongMetric(A_SERVERPORT, port);
rpt.setLongMetric(A_SENTBYTES, sentBytes.get());
rpt.setLongMetric(A_SENTBYTES, transport.getSentBytes());
return rpt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public void open() throws IOException {
}
}

public long getSentBytes() {
return sentBytes.get();
}

@Override
public ReportEvent getMetrics() {
ReportEvent rpt = super.getMetrics();
Expand Down
6 changes: 6 additions & 0 deletions flume-core/src/main/java/com/cloudera/util/Benchmark.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class Benchmark {
long start;
long last;
long lastDelta;
PrintWriter out;
PrintWriter log;
List<String> values = new ArrayList<String>();
Expand Down Expand Up @@ -125,6 +126,7 @@ void _mark(String s) {

// skip over gc time
last = System.nanoTime(); // don't count gc time.
lastDelta = delta;
}

/**
Expand All @@ -143,6 +145,10 @@ public PrintWriter getLog() {
return log;
}

public long getLastDelta() {
return lastDelta;
}

/**
* In case I want to print the csv report summary
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.apache.avro.ipc;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.avro.Protocol;
import org.apache.avro.ipc.Transceiver;

public class AccountingTransceiver extends Transceiver {
Transceiver xcvr;

AtomicLong bytesWritten = new AtomicLong();

public AccountingTransceiver(Transceiver xcvr) {
this.xcvr = xcvr;
}

@Override
public String getRemoteName() {
return xcvr.getRemoteName();
}

// Transceive is explicitly excluded because it calls readBuffers and
// writeBuffers virtual funcs.

@Override
public List<ByteBuffer> readBuffers() throws IOException {
return xcvr.readBuffers();
}

@Override
public void writeBuffers(List<ByteBuffer> arg0) throws IOException {
long len = getLength(arg0); // must be done before writing them.
xcvr.writeBuffers(arg0);
bytesWritten.addAndGet(len);
}

@Override
public boolean isConnected() {
return xcvr.isConnected();
}

@Override
public void setRemote(Protocol protocol) {
xcvr.setRemote(protocol);
}

@Override
public Protocol getRemote() {
return xcvr.getRemote();
}

@Override
public void close() throws IOException {
xcvr.close();
}

public long getSentBytes() {
return bytesWritten.get();
}

static int getLength(List<ByteBuffer> buffers) {
int length = 0;
for (ByteBuffer buffer : buffers) {
length += 4;
length += buffer.remaining();
}
length += 4;
return length;
}

static long bufferLens(List<ByteBuffer> buffers) throws IOException {
long len = getLength(buffers);
return len + 4;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import com.cloudera.flume.reporter.Reportable;
import com.cloudera.flume.reporter.aggregator.AccumulatorSink;
import com.cloudera.flume.reporter.aggregator.CounterSink;
import com.cloudera.util.BenchmarkHarness;
import com.cloudera.util.FlumeTestHarness;
import com.cloudera.util.FileUtil;

/**
Expand Down Expand Up @@ -208,7 +208,7 @@ public void run() {
public void doTestLogicalNodesConcurrentDFOMans(final int threads,
final int events, int timeout) throws IOException, InterruptedException,
FlumeSpecException {
BenchmarkHarness.setupLocalWriteDir();
FlumeTestHarness.setupLocalWriteDir();
FlumeMaster master = new FlumeMaster();
FlumeNode node = new FlumeNode(new DirectMasterRPC(master), false, false);
final Reportable[] dfos = new Reportable[threads];
Expand Down Expand Up @@ -248,7 +248,7 @@ public void doTestLogicalNodesConcurrentDFOMans(final int threads,
}
assertTrue("Counts did not line up", success);

BenchmarkHarness.cleanupLocalWriteDir();
FlumeTestHarness.cleanupLocalWriteDir();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import com.cloudera.flume.reporter.ReportEvent;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.AccumulatorSink;
import com.cloudera.util.BenchmarkHarness;
import com.cloudera.util.FlumeTestHarness;
import com.cloudera.util.Clock;

/**
Expand All @@ -59,12 +59,12 @@ public class TestDiskFailoverBehavior {

@Before
public void setup() {
BenchmarkHarness.setupLocalWriteDir();
FlumeTestHarness.setupLocalWriteDir();
}

@After
public void teardown() throws IOException {
BenchmarkHarness.cleanupLocalWriteDir();
FlumeTestHarness.cleanupLocalWriteDir();
}

LogicalNode setupAgent(long count, String agentSink) throws IOException,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import com.cloudera.flume.handlers.debug.MemorySinkSource;
import com.cloudera.flume.reporter.ReportManager;
import com.cloudera.flume.reporter.aggregator.CounterSink;
import com.cloudera.util.BenchmarkHarness;
import com.cloudera.util.FlumeTestHarness;

/**
* This check to make sure basic benchmarking should work -- ie when closed all
Expand All @@ -43,7 +43,7 @@ public class TestDiskFailoverBenchmarking {
@Test
public void benchmarkBeforeFailover() throws FlumeSpecException, IOException,
InterruptedException {
BenchmarkHarness.setupLocalWriteDir();
FlumeTestHarness.setupLocalWriteDir();
// String spec =
// "{ benchinject => { benchreport(\"pre\") => { diskFailover => [console, counter(\"beforecount\")] } } }";
String spec = "{ benchinject => { benchreport(\"pre\") => { diskFailover => counter(\"beforecount\") } } }";
Expand All @@ -60,14 +60,14 @@ public void benchmarkBeforeFailover() throws FlumeSpecException, IOException,
CounterSink cnt = (CounterSink) ReportManager.get().getReportable(
"beforecount");
Assert.assertEquals(5, cnt.getCount());
BenchmarkHarness.cleanupLocalWriteDir();
FlumeTestHarness.cleanupLocalWriteDir();

}

@Test
public void benchmarkAfterFailover() throws FlumeSpecException, IOException,
InterruptedException {
BenchmarkHarness.setupLocalWriteDir();
FlumeTestHarness.setupLocalWriteDir();
String spec = "{ benchinject => { diskFailover => { benchreport(\"post\") => counter(\"beforecount\") } } }";
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(
LogicalNodeContext.testingContext()), spec);
Expand All @@ -82,13 +82,13 @@ public void benchmarkAfterFailover() throws FlumeSpecException, IOException,
CounterSink cnt = (CounterSink) ReportManager.get().getReportable(
"beforecount");
Assert.assertEquals(5, cnt.getCount());
BenchmarkHarness.cleanupLocalWriteDir();
FlumeTestHarness.cleanupLocalWriteDir();
}

@Test
public void benchmarkBeforeWriteahead() throws FlumeSpecException,
IOException, InterruptedException {
BenchmarkHarness.setupLocalWriteDir();
FlumeTestHarness.setupLocalWriteDir();
String spec = "{ benchinject => { benchreport(\"pre\") => { diskFailover => counter(\"beforecount\") } } }";
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(
LogicalNodeContext.testingContext()), spec);
Expand All @@ -103,13 +103,13 @@ public void benchmarkBeforeWriteahead() throws FlumeSpecException,
CounterSink cnt = (CounterSink) ReportManager.get().getReportable(
"beforecount");
Assert.assertEquals(5, cnt.getCount());
BenchmarkHarness.cleanupLocalWriteDir();
FlumeTestHarness.cleanupLocalWriteDir();
}

@Test
public void benchmarkAfterWriteahead() throws FlumeSpecException,
IOException, InterruptedException {
BenchmarkHarness.setupLocalWriteDir();
FlumeTestHarness.setupLocalWriteDir();

String spec = "{ benchinject => { ackedWriteAhead => { benchreport(\"post\") => counter(\"beforecount\") } } }";
EventSink snk = FlumeBuilder.buildSink(new ReportTestingContext(
Expand All @@ -127,7 +127,7 @@ public void benchmarkAfterWriteahead() throws FlumeSpecException,

// +2 because of wal ack begin and end messages.
Assert.assertEquals(5 + 2, cnt.getCount());
BenchmarkHarness.cleanupLocalWriteDir();
FlumeTestHarness.cleanupLocalWriteDir();

}
}
Loading

0 comments on commit bef14eb

Please sign in to comment.