Skip to content

Commit

Permalink
Merge pull request Netflix#599 from mattrjacobs/add-collapser-metrics…
Browse files Browse the repository at this point in the history
…-to-observable-collapser

Added metrics to HystrixObservableCollapser
  • Loading branch information
mattrjacobs committed Feb 2, 2015
2 parents 1465c0a + ba56ae8 commit 464da00
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -70,6 +72,7 @@ public abstract class HystrixObservableCollapser<K, BatchReturnType, ResponseTyp
private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory;
private final HystrixRequestCache requestCache;
private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper;
private final HystrixCollapserMetrics metrics;

/**
* The scope of request collapsing.
Expand Down Expand Up @@ -112,28 +115,41 @@ protected HystrixObservableCollapser(HystrixCollapserKey collapserKey) {
* Fluent interface for constructor arguments
*/
protected HystrixObservableCollapser(Setter setter) {
this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter);
this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter, null);
}

/* package for tests */HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder) {
/* package for tests */HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder, HystrixCollapserMetrics metrics) {
if (collapserKey == null || collapserKey.name().trim().equals("")) {
String defaultKeyName = getDefaultNameFromClass(getClass());
collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);
}

this.collapserFactory = new RequestCollapserFactory<>(collapserKey, scope, timer, propertiesBuilder);
HystrixCollapserProperties properties = HystrixPropertiesFactory.getCollapserProperties(collapserKey, propertiesBuilder);
this.collapserFactory = new RequestCollapserFactory<>(collapserKey, scope, timer, properties);
this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());

if (metrics == null) {
this.metrics = HystrixCollapserMetrics.getInstance(collapserKey, properties);
} else {
this.metrics = metrics;
}

final HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> self = this;

/* strategy: HystrixMetricsPublisherCollapser */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForCollapser(collapserKey, this.metrics, properties);


/**
* Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
*/
collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() {

@Override
public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
return self.shardRequests(requests);
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = self.shardRequests(requests);
self.metrics.markShards(shards.size());
return shards;
}

@Override
Expand All @@ -142,7 +158,7 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR

// mark the number of requests being collapsed together
command.markAsCollapsedCommand(requests.size());

self.metrics.markBatch(requests.size());
return command.toObservable();
}

Expand Down Expand Up @@ -223,6 +239,14 @@ public Scope getScope() {
return Scope.valueOf(collapserFactory.getScope().name());
}

/**
* Return the {@link HystrixCollapserMetrics} for this collapser
* @return {@link HystrixCollapserMetrics} for this collapser
*/
public HystrixCollapserMetrics getMetrics() {
return metrics;
}

/**
* The request arguments to be passed to the {@link HystrixCommand}.
* <p>
Expand Down Expand Up @@ -373,17 +397,14 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
if (getProperties().requestCacheEnabled().get()) {
Observable<ResponseType> fromCache = requestCache.get(getCacheKey());
if (fromCache != null) {
/* mark that we received this response from cache */
// TODO Add collapser metrics so we can capture this information
// we can't add it to the command metrics because the command can change each time (dynamic key for example)
// and we don't have access to it when responding from cache
// collapserMetrics.markResponseFromCache();
metrics.markResponseFromCache();
return fromCache;
}
}

RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
metrics.markRequestBatched();
if (getProperties().requestCacheEnabled().get()) {
/*
* A race can occur here with multiple threads queuing but only one will be cached.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
package com.netflix.hystrix;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.hystrix.strategy.properties.HystrixPropertiesCollapserDefault;
import com.netflix.hystrix.util.HystrixRollingNumberEvent;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -38,11 +42,8 @@
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;

public class HystrixObservableCollapserTest {
static AtomicInteger counter = new AtomicInteger();

@Before
public void init() {
counter.set(0);
// since we're going to modify properties of the same class between tests, wipe the cache each time
HystrixCollapser.reset();
/* we must call this to simulate a new request lifecycle running and clearing caches */
Expand All @@ -61,57 +62,66 @@ public void cleanup() {
@Test
public void testTwoRequests() throws Exception {
TestCollapserTimer timer = new TestCollapserTimer();
Future<String> response1 = new TestRequestCollapser(timer, counter, 1).observe().toBlocking().toFuture();
Future<String> response2 = new TestRequestCollapser(timer, counter, 2).observe().toBlocking().toFuture();
HystrixObservableCollapser<String, String, String, String> collapser1 = new TestRequestCollapser(timer, 1);
HystrixObservableCollapser<String, String, String, String> collapser2 = new TestRequestCollapser(timer, 2);
Future<String> response1 = collapser1.observe().toBlocking().toFuture();
Future<String> response2 = collapser2.observe().toBlocking().toFuture();
timer.incrementTime(10); // let time pass that equals the default delay/period

assertEquals("1", response1.get());
assertEquals("2", response2.get());

assertEquals(1, counter.get());

assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());

HystrixCollapserMetrics metrics = collapser1.getMetrics();
assertSame(metrics, collapser2.getMetrics());
assertEquals(2L, metrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_REQUEST_BATCHED));
assertEquals(1L, metrics.getRollingCount(HystrixRollingNumberEvent.COLLAPSER_BATCH));
assertEquals(0L, metrics.getRollingCount(HystrixRollingNumberEvent.RESPONSE_FROM_CACHE));
}

private static class TestRequestCollapser extends HystrixObservableCollapser<String, String, String, String> {

private final AtomicInteger count;
private final String value;
private ConcurrentLinkedQueue<HystrixObservableCommand<String>> commandsExecuted;

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value) {
this(timer, counter, String.valueOf(value));
public TestRequestCollapser(TestCollapserTimer timer, int value) {
this(timer, String.valueOf(value));
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value) {
this(timer, counter, value, 10000, 10);
public TestRequestCollapser(TestCollapserTimer timer, String value) {
this(timer, value, 10000, 10);
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(timer, counter, value, 10000, 10, executionLog);
public TestRequestCollapser(TestCollapserTimer timer, String value, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(timer, value, 10000, 10, executionLog);
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(timer, counter, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds);
public TestRequestCollapser(TestCollapserTimer timer, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(timer, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds);
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
public TestRequestCollapser(TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
}

public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(scope, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
public TestRequestCollapser(Scope scope, TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
this(scope, timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
}

public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(Scope.REQUEST, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog);
public TestRequestCollapser(TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(Scope.REQUEST, timer, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog);
}

public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
private static HystrixCollapserMetrics createMetrics() {
HystrixCollapserKey key = HystrixCollapserKey.Factory.asKey("COLLAPSER_ONE");
return HystrixCollapserMetrics.getInstance(key, new HystrixPropertiesCollapserDefault(key, HystrixCollapserProperties.Setter()));
}

public TestRequestCollapser(Scope scope, TestCollapserTimer timer, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
// use a CollapserKey based on the CollapserTimer object reference so it's unique for each timer as we don't want caching
// of properties to occur and we're using the default HystrixProperty which typically does caching
super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds));
this.count = counter;
super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds), createMetrics());
this.value = value;
this.commandsExecuted = executionLog;
}
Expand All @@ -133,9 +143,6 @@ public HystrixObservableCommand<String> createCommand(final Collection<Collapsed

@Override
protected Func1<String, String> getBatchReturnTypeToResponseTypeMapper() {
// count how many times a batch is executed (this method is executed once per batch)
System.out.println("increment count: " + count.incrementAndGet());

return new Func1<String, String>() {

@Override
Expand Down

0 comments on commit 464da00

Please sign in to comment.