Skip to content

Commit

Permalink
Merge pull request Netflix#1171 from mattrjacobs/reduce-lifetime-of-h…
Browse files Browse the repository at this point in the history
…dr-histogram

Do all calculation of histogram values upfront.
  • Loading branch information
mattrjacobs committed Apr 8, 2016
2 parents 316b7ea + 99abb20 commit 5593413
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@

import org.HdrHistogram.Histogram;

import java.util.concurrent.ConcurrentLinkedQueue;

public class CachedValuesHistogram {

private final Histogram underlying;
static int POOL_SIZE = 1000;
static ConcurrentLinkedQueue<Histogram> HISTOGRAM_POOL = new ConcurrentLinkedQueue<Histogram>();

static {
for (int i = 0; i < POOL_SIZE; i++) {
HISTOGRAM_POOL.add(new Histogram(3));
}
}

private final int mean;
private final int p0;
Expand Down Expand Up @@ -49,13 +58,13 @@ public class CachedValuesHistogram {
private final int p99_99;
private final int p100;

private final long totalCount;

public static CachedValuesHistogram backedBy(Histogram underlying) {
return new CachedValuesHistogram(underlying);
}

private CachedValuesHistogram(Histogram underlying) {
this.underlying = underlying;

/**
* Single thread calculates a variety of commonly-accessed quantities.
* This way, all threads can access the cached values without synchronization
Expand Down Expand Up @@ -89,10 +98,10 @@ private CachedValuesHistogram(Histogram underlying) {
p99_95 = (int) underlying.getValueAtPercentile(99.95);
p99_99 = (int) underlying.getValueAtPercentile(99.99);
p100 = (int) underlying.getValueAtPercentile(100);
}

public Histogram getUnderlying() {
return underlying;
totalCount = underlying.getTotalCount();

release(underlying);
}

/**
Expand Down Expand Up @@ -138,20 +147,24 @@ public int getValueAtPercentile(double percentile) {
case 9995: return p99_95;
case 9999: return p99_99;
case 10000: return p100;
default: return getArbitraryPercentile(percentile);
default: throw new IllegalArgumentException("Percentile (" + percentile + ") is not currently cached");
}
}

/**
* Since this can be accessed by any thread, need external synchronization
* @param percentile percentile of distribution
* @return value at percentile
*/
private synchronized int getArbitraryPercentile(double percentile) {
return (int) underlying.getValueAtPercentile(percentile);
public long getTotalCount() {
return totalCount;
}

public synchronized long getTotalCount() {
return underlying.getTotalCount();
private static void release(Histogram histogram) {
histogram.reset();
HISTOGRAM_POOL.offer(histogram);
}

public static Histogram getNewHistogram() {
Histogram histogram = HISTOGRAM_POOL.poll();
if (histogram == null) {
histogram = new Histogram(3);
}
return histogram;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@
import org.HdrHistogram.Histogram;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observers.Subscribers;
import rx.subjects.BehaviorSubject;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -49,7 +46,7 @@
*/
public class RollingDistributionStream<Event extends HystrixEvent> {
private AtomicReference<Subscription> rollingDistributionSubscription = new AtomicReference<Subscription>(null);
private final BehaviorSubject<CachedValuesHistogram> rollingDistribution = BehaviorSubject.create(CachedValuesHistogram.backedBy(getNewHistogram()));
private final BehaviorSubject<CachedValuesHistogram> rollingDistribution = BehaviorSubject.create(CachedValuesHistogram.backedBy(CachedValuesHistogram.getNewHistogram()));
private final Observable<CachedValuesHistogram> rollingDistributionStream;

private static final Func2<Histogram, Histogram, Histogram> distributionAggregator = new Func2<Histogram, Histogram, Histogram>() {
Expand All @@ -67,15 +64,6 @@ public Observable<Histogram> call(Observable<Histogram> window) {
}
};

private static final Action1<List<CachedValuesHistogram>> releaseOlderOfTwoDistributions = new Action1<List<CachedValuesHistogram>>() {
@Override
public void call(List<CachedValuesHistogram> histograms) {
if (histograms != null && histograms.size() == 2) {
releaseHistogram(histograms.get(0).getUnderlying());
}
}
};

private static final Func1<Histogram, CachedValuesHistogram> cacheHistogramValues = new Func1<Histogram, CachedValuesHistogram>() {
@Override
public CachedValuesHistogram call(Histogram histogram) {
Expand All @@ -95,13 +83,13 @@ protected RollingDistributionStream(final HystrixEventStream<Event> stream, fina
final Func2<Histogram, Event, Histogram> addValuesToBucket) {
final List<Histogram> emptyDistributionsToStart = new ArrayList<Histogram>();
for (int i = 0; i < numBuckets; i++) {
emptyDistributionsToStart.add(getNewHistogram());
emptyDistributionsToStart.add(CachedValuesHistogram.getNewHistogram());
}

final Func1<Observable<Event>, Observable<Histogram>> reduceBucketToSingleDistribution = new Func1<Observable<Event>, Observable<Histogram>>() {
@Override
public Observable<Histogram> call(Observable<Event> bucket) {
return bucket.reduce(getNewHistogram(), addValuesToBucket);
return bucket.reduce(CachedValuesHistogram.getNewHistogram(), addValuesToBucket);
}
};

Expand Down Expand Up @@ -148,13 +136,6 @@ public void startCachingStreamValuesIfUnstarted() {
Subscription candidateSubscription = observe().subscribe(rollingDistribution);
if (rollingDistributionSubscription.compareAndSet(null, candidateSubscription)) {
//won the race to set the subscription

//as soon as subject receives a new Histogram, old one may be released
rollingDistribution
.window(2, 1) //subject is used as a single-value, but can be viewed as a stream. Here, get the latest 2 values of the subject
.flatMap(convertToList) //convert to list (of length 2)
.doOnNext(releaseOlderOfTwoDistributions) //if there are 2, then the oldest one will never be read, so we can reclaim its memory
.unsafeSubscribe(Subscribers.empty()); //no need to emit anywhere, this is side-effect only (release the reference to the old Histogram)
} else {
//lost the race to set the subscription, so we need to cancel this one
candidateSubscription.unsubscribe();
Expand All @@ -178,26 +159,4 @@ public void unsubscribe() {
rollingDistributionSubscription.compareAndSet(s, null);
}
}

private static Histogram getNewHistogram() {
Histogram histogram = HISTOGRAM_POOL.poll();
if (histogram == null) {
histogram = new Histogram(3);
}
return histogram;
}

private static void releaseHistogram(Histogram histogram) {
histogram.reset();
HISTOGRAM_POOL.offer(histogram);
}

static int POOL_SIZE = 1000;
static ConcurrentLinkedQueue<Histogram> HISTOGRAM_POOL = new ConcurrentLinkedQueue<Histogram>();

static {
for (int i = 0; i < POOL_SIZE; i++) {
HISTOGRAM_POOL.add(new Histogram(3));
}
}
}

0 comments on commit 5593413

Please sign in to comment.