Skip to content

Commit

Permalink
do async notification if there are too many clients to notify
Browse files Browse the repository at this point in the history
  • Loading branch information
nobodyiam committed Jan 22, 2017
1 parent 35cde2b commit ad94cc7
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,11 @@ public TimeUnit releaseMessageCacheScanIntervalTimeUnit() {
return TimeUnit.SECONDS;
}

public int releaseMessageNotificationBatch() {
return getIntProperty("apollo.release-message.notification.batch", 100);
}

public int releaseMessageNotificationBatchIntervalInMilli() {
return getIntProperty("apollo.release-message.notification.batch.interval", 100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.ReleaseMessageListener;
import com.ctrip.framework.apollo.biz.message.Topics;
Expand All @@ -22,6 +23,7 @@
import com.ctrip.framework.apollo.configservice.util.WatchKeysUtil;
import com.ctrip.framework.apollo.core.ConfigConsts;
import com.ctrip.framework.apollo.core.dto.ApolloConfigNotification;
import com.ctrip.framework.apollo.core.utils.ApolloThreadFactory;
import com.ctrip.framework.apollo.tracer.Tracer;

import org.slf4j.Logger;
Expand All @@ -41,6 +43,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* @author Jason Song([email protected])
Expand All @@ -61,6 +66,8 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
new TypeToken<List<ApolloConfigNotification>>() {
}.getType();

private final ExecutorService largeNotificationBatchExecutorService;

@Autowired
private WatchKeysUtil watchKeysUtil;

Expand All @@ -76,6 +83,14 @@ public class NotificationControllerV2 implements ReleaseMessageListener {
@Autowired
private Gson gson;

@Autowired
private BizConfig bizConfig;

public NotificationControllerV2() {
largeNotificationBatchExecutorService = Executors.newSingleThreadExecutor(ApolloThreadFactory.create
("NotificationControllerV2", true));
}

@RequestMapping(method = RequestMethod.GET)
public DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(
@RequestParam(value = "appId") String appId,
Expand Down Expand Up @@ -220,6 +235,27 @@ public void handleMessage(ReleaseMessage message, String channel) {
//create a new list to avoid ConcurrentModificationException
List<DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>> results =
Lists.newArrayList(deferredResults.get(content));

//do async notification if too many clients
if (results.size() > bizConfig.releaseMessageNotificationBatch()) {
largeNotificationBatchExecutorService.submit(() -> {
logger.debug("Async notify {} clients for key {} with batch {}", results.size(), content,
bizConfig.releaseMessageNotificationBatch());
for (int i = 0; i < results.size(); i++) {
if (i > 0 && i % bizConfig.releaseMessageNotificationBatch() == 0) {
try {
TimeUnit.MILLISECONDS.sleep(bizConfig.releaseMessageNotificationBatchIntervalInMilli());
} catch (InterruptedException e) {
//ignore
}
}
logger.debug("Async notify {}", results.get(i));
results.get(i).setResult(notification);
}
});
return;
}

logger.debug("Notify {} clients for key {}", results.size(), content);

for (DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> result : results) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.collect.Sets;
import com.google.gson.Gson;

import com.ctrip.framework.apollo.biz.config.BizConfig;
import com.ctrip.framework.apollo.biz.entity.ReleaseMessage;
import com.ctrip.framework.apollo.biz.message.Topics;
import com.ctrip.framework.apollo.biz.utils.EntityManagerUtil;
Expand All @@ -27,6 +28,7 @@
import org.springframework.web.context.request.async.DeferredResult;

import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -57,6 +59,9 @@ public class NotificationControllerV2Test {
private NamespaceUtil namespaceUtil;
@Mock
private WatchKeysUtil watchKeysUtil;
@Mock
private BizConfig bizConfig;

private Gson gson;

private Multimap<String, DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>>
Expand All @@ -66,11 +71,16 @@ public class NotificationControllerV2Test {
public void setUp() throws Exception {
controller = new NotificationControllerV2();
gson = new Gson();

when(bizConfig.releaseMessageNotificationBatch()).thenReturn(100);
when(bizConfig.releaseMessageNotificationBatchIntervalInMilli()).thenReturn(5);

ReflectionTestUtils.setField(controller, "releaseMessageService", releaseMessageService);
ReflectionTestUtils.setField(controller, "entityManagerUtil", entityManagerUtil);
ReflectionTestUtils.setField(controller, "namespaceUtil", namespaceUtil);
ReflectionTestUtils.setField(controller, "watchKeysUtil", watchKeysUtil);
ReflectionTestUtils.setField(controller, "gson", gson);
ReflectionTestUtils.setField(controller, "bizConfig", bizConfig);

someAppId = "someAppId";
someCluster = "someCluster";
Expand Down Expand Up @@ -283,6 +293,48 @@ public void testPollNotificationWithMultipleNamespacesAndHandleMessage() throws
assertEquals(someId, notification.getNotificationId());
}

@Test
public void testPollNotificationWithHandleMessageInBatch() throws Exception {
String someWatchKey = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR)
.join(someAppId, someCluster, defaultNamespace);
int someBatch = 1;
int someBatchInterval = 10;

Multimap<String, String> watchKeysMap =
assembleMultiMap(defaultNamespace, Lists.newArrayList(someWatchKey));

String notificationAsString =
transformApolloConfigNotificationsToString(defaultNamespace, someNotificationId);

when(watchKeysUtil
.assembleAllWatchKeys(someAppId, someCluster, Sets.newHashSet(defaultNamespace),
someDataCenter)).thenReturn(watchKeysMap);

when(bizConfig.releaseMessageNotificationBatch()).thenReturn(someBatch);
when(bizConfig.releaseMessageNotificationBatchIntervalInMilli()).thenReturn(someBatchInterval);

DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
deferredResult = controller
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
someClientIp);
DeferredResult<ResponseEntity<List<ApolloConfigNotification>>>
anotherDeferredResult = controller
.pollNotification(someAppId, someCluster, notificationAsString, someDataCenter,
someClientIp);

long someId = 1;
ReleaseMessage someReleaseMessage = new ReleaseMessage(someWatchKey);
someReleaseMessage.setId(someId);

controller.handleMessage(someReleaseMessage, Topics.APOLLO_RELEASE_TOPIC);

assertTrue(!anotherDeferredResult.hasResult());

TimeUnit.MILLISECONDS.sleep(someBatchInterval * 3);

assertTrue(anotherDeferredResult.hasResult());
}

private String transformApolloConfigNotificationsToString(
String namespace, long notificationId) {
List<ApolloConfigNotification> notifications =
Expand Down

0 comments on commit ad94cc7

Please sign in to comment.