Skip to content

Commit

Permalink
SAMZA-834: fix perf degradation due to frequent update of Timer metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyuiscool authored and nickpan47 committed Dec 8, 2015
1 parent d859378 commit adf4f39
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
public class SlidingTimeWindowReservoir implements Reservoir {

/**
* Allow this amount of values to have the same updating time.
* default collision buffer
*/
private static final int TIME_COLLISION_BUFFER = 256;
private static final int DEFAULT_TIME_COLLISION_BUFFER = 1;

/**
* Run {@link #removeExpireValues} once every this amount of {@link #update}s
Expand All @@ -45,9 +45,14 @@ public class SlidingTimeWindowReservoir implements Reservoir {
*/
private static final int DEFAULT_WINDOW_SIZE_MS = 300000;

/**
* Allow this amount of values to have the same updating time.
*/
private final int collisionBuffer;

/**
* Size of the window. The unit is millisecond. It is as
* <code>TIME_COLLISION_BUFFER</code> times big as the original window size.
* <code>collisionBuffer</code> times big as the original window size.
*/
private final long windowMs;

Expand Down Expand Up @@ -93,11 +98,16 @@ public long currentTimeMillis() {
}

public SlidingTimeWindowReservoir(long windowMs, Clock clock) {
this.windowMs = windowMs * TIME_COLLISION_BUFFER;
this(windowMs, DEFAULT_TIME_COLLISION_BUFFER, clock);
}

public SlidingTimeWindowReservoir(long windowMs, int collisionBuffer, Clock clock) {
this.windowMs = windowMs * collisionBuffer;
this.storage = new ConcurrentSkipListMap<Long, Long>();
this.count = new AtomicLong();
this.lastUpdatingTime = new AtomicLong();
this.clock = clock;
this.collisionBuffer = collisionBuffer;
}

@Override
Expand Down Expand Up @@ -126,15 +136,19 @@ private void removeExpireValues() {
* value's, use the last updating time + 1 as the new updating time. This
* operation guarantees all the updating times in the <code>storage</code>
* strictly increment. No override happens before reaching the
* <code>TIME_COLLISION_BUFFER</code>.
* <code>collisionBuffer</code>.
*
* @return the updating time
*/
private long getUpdatingTime() {
while (true) {
long oldTime = lastUpdatingTime.get();
long newTime = clock.currentTimeMillis() * TIME_COLLISION_BUFFER;
long newTime = clock.currentTimeMillis() * collisionBuffer;
long updatingTime = newTime > oldTime ? newTime : oldTime + 1;
// make sure the updateTime doesn't overflow to the next millisecond
if (updatingTime == newTime + collisionBuffer) {
--updatingTime;
}
// make sure no other threads modify the lastUpdatingTime
if (lastUpdatingTime.compareAndSet(oldTime, updatingTime)) {
return updatingTime;
Expand Down
12 changes: 12 additions & 0 deletions samza-api/src/main/java/org/apache/samza/metrics/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ public Timer(String name, long windowMs, Clock clock) {
this(name, new SlidingTimeWindowReservoir(windowMs, clock));
}

/**
* Construct a {@link Timer} with given window size and collision buffer
*
* @param name name of this timer
* @param windowMs the window size. unit is millisecond
* @param collisionBuffer amount of collisions allowed in one millisecond.
* @param clock the clock for the reservoir
*/
public Timer(String name, long windowMs, int collisionBuffer, Clock clock) {
this(name, new SlidingTimeWindowReservoir(windowMs, collisionBuffer, clock));
}

/**
* Construct a {@link Timer} with given {@link Reservoir}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class TestSlidingTimeWindowReservoir {
@Test
public void testUpdateSizeSnapshot() {
SlidingTimeWindowReservoir slidingTimeWindowReservoir =
new SlidingTimeWindowReservoir(300, clock);
new SlidingTimeWindowReservoir(300, 8, clock);

when(clock.currentTimeMillis()).thenReturn(0L);
slidingTimeWindowReservoir.update(1L);
Expand All @@ -55,20 +55,26 @@ public void testUpdateSizeSnapshot() {
@Test
public void testDuplicateTime() {
SlidingTimeWindowReservoir slidingTimeWindowReservoir =
new SlidingTimeWindowReservoir(300, clock);
when(clock.currentTimeMillis()).thenReturn(0L);
new SlidingTimeWindowReservoir(300, 2, clock);
when(clock.currentTimeMillis()).thenReturn(1L);
slidingTimeWindowReservoir.update(1L);
slidingTimeWindowReservoir.update(2L);

Snapshot snapshot = slidingTimeWindowReservoir.getSnapshot();
assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 2L)));
assertEquals(2, snapshot.getSize());

// update causes collision, will override the last update
slidingTimeWindowReservoir.update(3L);
snapshot = slidingTimeWindowReservoir.getSnapshot();
assertTrue(snapshot.getValues().containsAll(Arrays.asList(1L, 3L)));
assertEquals(2, snapshot.getSize());
}

@Test
public void testRemoveExpiredValues() {
SlidingTimeWindowReservoir slidingTimeWindowReservoir =
new SlidingTimeWindowReservoir(300, clock);
new SlidingTimeWindowReservoir(300, 8, clock);
when(clock.currentTimeMillis()).thenReturn(0L);
slidingTimeWindowReservoir.update(1L);

Expand All @@ -85,4 +91,5 @@ public void testRemoveExpiredValues() {
assertTrue(snapshot.getValues().containsAll(Arrays.asList(3L, 4L)));
assertEquals(2, snapshot.getSize());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public long currentTimeMillis() {

@Test
public void testDefaultTimerUpdateAndGetSnapshot() {
Timer timer = new Timer("test");
Timer timer = new Timer("test", 300, clock);
timer.update(1L);
timer.update(2L);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

package org.apache.samza.container


import org.apache.samza.metrics.{Timer, SlidingTimeWindowReservoir, MetricsRegistryMap}
import org.apache.samza.util.Clock
import org.junit.Test
import org.junit.Assert._
import org.mockito.Matchers
import org.mockito.Mockito._
import org.mockito.internal.util.reflection.Whitebox
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.junit.AssertionsForJUnit
Expand Down Expand Up @@ -183,7 +187,19 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
var now = 0L
val consumers = mock[SystemConsumers]
when(consumers.choose).thenReturn(envelope0)
val testMetrics = new SamzaContainerMetrics
val clock = new Clock {
var c = 0L
def currentTimeMillis: Long = {
c += 1L
c
}
}
val testMetrics = new SamzaContainerMetrics("test", new MetricsRegistryMap() {
override def newTimer(group: String, name: String) = {
newTimer(group, new Timer(name, new SlidingTimeWindowReservoir(300000, clock)))
}
})

val runLoop = new RunLoop(
taskInstances = getMockTaskInstances,
consumerMultiplexer = consumers,
Expand Down

0 comments on commit adf4f39

Please sign in to comment.