Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
qiangdavidliu committed Oct 7, 2016
1 parent 82cddcb commit e9c95ff
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public class EurekaNotificationServerListUpdater implements ServerListUpdater {

private static class LazyHolder {
private final static String CORE_THREAD = "EurekaNotificationServerListUpdater.ThreadPoolSize";
private final static String QUEUE_SIZE = "EurekaNotificationServerListUpdater.queueSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
private final static DynamicIntProperty queueSizeProp = new DynamicIntProperty(QUEUE_SIZE, 1000);

private static ThreadPoolExecutor DEFAULT_SERVER_LIST_UPDATE_EXECUTOR;
private static Thread SHUTDOWN_THREAD;
Expand All @@ -46,7 +48,7 @@ private static class LazyHolder {
corePoolSize * 5,
0,
TimeUnit.NANOSECONDS,
new ArrayBlockingQueue<Runnable>(1000),
new ArrayBlockingQueue<Runnable>(queueSizeProp.get()),
new ThreadFactoryBuilder()
.setNameFormat("EurekaNotificationServerListUpdater-%d")
.setDaemon(true)
Expand Down Expand Up @@ -91,6 +93,7 @@ public static ExecutorService getDefaultRefreshExecutor() {
return LazyHolder.DEFAULT_SERVER_LIST_UPDATE_EXECUTOR;
}

/* visible for testing */ final AtomicBoolean updateQueued = new AtomicBoolean(false);
private final AtomicBoolean isActive = new AtomicBoolean(false);
private final AtomicLong lastUpdated = new AtomicLong(System.currentTimeMillis());
private final Provider<EurekaClient> eurekaClientProvider;
Expand Down Expand Up @@ -119,6 +122,11 @@ public synchronized void start(final UpdateAction updateAction) {
@Override
public void onEvent(EurekaEvent event) {
if (event instanceof CacheRefreshedEvent) {
if (!updateQueued.compareAndSet(false, true)) { // if an update is already queued
logger.info("an update action is already queued, returning as no-op");
return;
}

try {
refreshExecutor.submit(new Runnable() {
@Override
Expand All @@ -128,11 +136,14 @@ public void run() {
lastUpdated.set(System.currentTimeMillis());
} catch (Exception e) {
logger.warn("Failed to update serverList", e);
} finally {
updateQueued.set(false);
}
}
}); // fire and forget
} catch (Exception e) {
logger.warn("Error submitting update task to executor, skipping one round of updates", e);
updateQueued.set(false); // if submit fails, need to reset updateQueued to false
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import javax.inject.Provider;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -45,18 +47,24 @@ public EurekaClient get() {

EasyMock.replay(eurekaClientMock);

final CountDownLatch updateCountLatch = new CountDownLatch(2);
final CountDownLatch firstLatch = new CountDownLatch(1);
final CountDownLatch secondLatch = new CountDownLatch(1);
serverListUpdater.start(new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateCountLatch.countDown();
if (firstLatch.getCount() == 0) {
secondLatch.countDown();
} else {
firstLatch.countDown();
}
}
});

eventListenerCapture.getValue().onEvent(new CacheRefreshedEvent());
eventListenerCapture.getValue().onEvent(new CacheRefreshedEvent());
Assert.assertTrue(firstLatch.await(1, TimeUnit.SECONDS));

Assert.assertTrue(updateCountLatch.await(2, TimeUnit.SECONDS));
eventListenerCapture.getValue().onEvent(new CacheRefreshedEvent());
Assert.assertTrue(secondLatch.await(1, TimeUnit.SECONDS));
} finally {
serverListUpdater.stop();

Expand Down Expand Up @@ -120,6 +128,90 @@ public void doUpdate() {
EasyMock.verify(eurekaClientMock2);
}

@Test
public void testTaskAlreadyQueued() throws Exception {
EurekaNotificationServerListUpdater serverListUpdater = new EurekaNotificationServerListUpdater(
new Provider<EurekaClient>() {
@Override
public EurekaClient get() {
return eurekaClientMock;
}
}
);

try {
Capture<EurekaEventListener> eventListenerCapture = new Capture<EurekaEventListener>();
eurekaClientMock.registerEventListener(EasyMock.capture(eventListenerCapture));

EasyMock.replay(eurekaClientMock);

final CountDownLatch countDownLatch = new CountDownLatch(1);
serverListUpdater.start(new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
if (countDownLatch.getCount() == 0) {
Assert.fail("should only countdown once");
}
countDownLatch.countDown();
}
});

eventListenerCapture.getValue().onEvent(new CacheRefreshedEvent());
eventListenerCapture.getValue().onEvent(new CacheRefreshedEvent());

Assert.assertTrue(countDownLatch.await(2, TimeUnit.SECONDS));
Thread.sleep(100); // sleep a bit more

Assert.assertFalse(serverListUpdater.updateQueued.get());
} finally {
serverListUpdater.stop();

EasyMock.verify(eurekaClientMock);
}
}

@Test
public void testSubmitExceptionClearQueued() {
ThreadPoolExecutor executorMock = EasyMock.createMock(ThreadPoolExecutor.class);
EasyMock.expect(executorMock.submit(EasyMock.isA(Runnable.class)))
.andThrow(new RejectedExecutionException("test exception"));

EurekaNotificationServerListUpdater serverListUpdater = new EurekaNotificationServerListUpdater(
new Provider<EurekaClient>() {
@Override
public EurekaClient get() {
return eurekaClientMock;
}
},
executorMock
);

try {
Capture<EurekaEventListener> eventListenerCapture = new Capture<EurekaEventListener>();
eurekaClientMock.registerEventListener(EasyMock.capture(eventListenerCapture));

EasyMock.replay(eurekaClientMock);
EasyMock.replay(executorMock);

serverListUpdater.start(new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
Assert.fail("should not reach here");
}
});

eventListenerCapture.getValue().onEvent(new CacheRefreshedEvent());

Assert.assertFalse(serverListUpdater.updateQueued.get());
} finally {
serverListUpdater.stop();

EasyMock.verify(executorMock);
EasyMock.verify(eurekaClientMock);
}

}

@Test(expected = IllegalStateException.class)
public void testFailIfDiscoveryIsNotAvailable() {
EurekaNotificationServerListUpdater serverListUpdater = new EurekaNotificationServerListUpdater(
Expand Down

0 comments on commit e9c95ff

Please sign in to comment.