Skip to content

Commit

Permalink
Added the first of the perf tests
Browse files Browse the repository at this point in the history
  • Loading branch information
abersnaze committed Jan 30, 2014
1 parent 4db5d8e commit 40bd48b
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 0 deletions.
12 changes: 12 additions & 0 deletions rxjava-core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
apply plugin: 'maven'
apply plugin: 'osgi'
apply plugin:'application'

sourceCompatibility = JavaVersion.VERSION_1_6
targetCompatibility = JavaVersion.VERSION_1_6
Expand Down Expand Up @@ -31,3 +32,14 @@ jar {
}
}

task time(type:JavaExec) {
//sourceSets.metaClass.methods.each { println it }
classpath = sourceSets.perf.runtimeClasspath
group 'Application'
description 'Execute the calipser benchmark timing of Rx'
main 'rx.operators.ObservableBenchmark'
// args '--print-config' // dump the caliper configuration before starting
// args '--verbose' // uncomment to have caliper log what it's doing
args '--trials=1' // only run the experiments once
//args '--time-limit=0' // experiments never timeout
}
115 changes: 115 additions & 0 deletions rxjava-core/src/perf/java/rx/operators/ObservableBenchmark.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package rx.operators;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.util.functions.Func1;

import com.google.caliper.Benchmark;
import com.google.caliper.runner.CaliperMain;

public class ObservableBenchmark extends Benchmark {
public void timeBaseline(int reps) {
for (int i = 0; i < reps; i++) {
observableOfInts.subscribe(newObserver());
}
awaitAllObservers();
}

public int timeMapIterate(long reps) {
int x = 0;
for (int i = 0; i < reps; i++) {
for (int j = 0; j < intValues.length; j++) {
// use hash code to make sure the JIT doesn't optimize too much and remove all of
// our code.
x |= ident.call(intValues[j]).hashCode();
}
}
return x;
}

public void timeMap(long reps) {
timeOperator(reps, new OperatorMap<Integer, Object>(ident));
}

/**************************************************************************
* Below is internal stuff to avoid object allocation and time overhead of anything that isn't
* being tested.
**************************************************************************/

public static void main(String[] args) {
CaliperMain.main(ObservableBenchmark.class, args);
}

private void timeOperator(long reps, Operator<Object, Integer> op) {
for (int i = 0; i < reps; i++) {
observableOfInts.lift(op).subscribe(newObserver());
}
awaitAllObservers();
}

private final static AtomicInteger outstanding = new AtomicInteger(0);
private final static CountDownLatch latch = new CountDownLatch(1);

private static <T> Observer<T> newObserver() {
outstanding.incrementAndGet();
return new Observer<T>() {
@Override
public void onCompleted() {
int left = outstanding.decrementAndGet();
if (left == 0) {
latch.countDown();
}
}

@Override
public void onError(Throwable e) {
int left = outstanding.decrementAndGet();
if (left == 0) {
latch.countDown();
}
}

@Override
public void onNext(T t) {
// do nothing
}
};
}

private static void awaitAllObservers() {
try {
latch.await();
} catch (InterruptedException e) {
return;
}
}

private static final Integer[] intValues = new Integer[1000];
static {
for (int i = 0; i < intValues.length; i++) {
intValues[i] = i;
}
}

private static final Observable<Integer> observableOfInts = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Observer<? super Integer> o) {
for (int i = 0; i < intValues.length; i++) {
if (o.isUnsubscribed())
return;
o.onNext(intValues[i]);
}
o.onCompleted();
}
});
private static final Func1<Integer, Object> ident = new Func1<Integer, Object>() {
@Override
public Object call(Integer t) {
return t;
}
};
}

0 comments on commit 40bd48b

Please sign in to comment.