Skip to content

Commit

Permalink
Use multiple separate counters for the single service with multiple p…
Browse files Browse the repository at this point in the history
…rotocols. (sofastack#222)
  • Loading branch information
leizhiyuan authored and ujjboy committed Jul 13, 2018
1 parent 6f3dec6 commit b0b4176
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,36 +110,50 @@ private void doExport() {
if (exported) {
return;
}
String key = providerConfig.buildKey();
String appName = providerConfig.getAppName();

// 检查参数
checkParameters();
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
}

// 注意同一interface,同一uniqleId,不同server情况
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
if (cnt == null) { // 没有发布过
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
int c = cnt.incrementAndGet();
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
if (c > maxProxyCount) {
cnt.decrementAndGet();
// 超过最大数量,直接抛出异常
throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
+ " has been exported more than " + maxProxyCount + " times!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!");
} else if (c > 1) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
String appName = providerConfig.getAppName();

//key is the protocol of server,for concurrent safe
Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
// 将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
String protocol = serverConfig.getProtocol();

String key = providerConfig.buildKey() + ":" + protocol;

if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
}

// 注意同一interface,同一uniqleId,不同server情况
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
if (cnt == null) { // 没有发布过
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
int c = cnt.incrementAndGet();
hasExportedInCurrent.put(serverConfig.getProtocol(), true);
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
if (c > maxProxyCount) {
decrementCounter(hasExportedInCurrent);
// 超过最大数量,直接抛出异常
throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
+ " has been exported more than " + maxProxyCount + " times!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!", key);
+ " Ignore this if you did that on purpose!");
} else if (c > 1) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!", key);
}
}
}

}

try {
Expand All @@ -155,7 +169,6 @@ private void doExport() {
}
}
// 将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
try {
Server server = serverConfig.buildIfAbsent();
Expand All @@ -164,6 +177,7 @@ private void doExport() {
if (serverConfig.isAutoStart()) {
server.start();
}

} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
Expand All @@ -176,7 +190,8 @@ private void doExport() {
providerConfig.setConfigListener(new ProviderAttributeListener());
register();
} catch (Exception e) {
cnt.decrementAndGet();
decrementCounter(hasExportedInCurrent);

if (e instanceof SofaRpcRuntimeException) {
throw (SofaRpcRuntimeException) e;
} else {
Expand All @@ -189,6 +204,22 @@ private void doExport() {
exported = true;
}

/**
* decrease counter
* @param hasExportedInCurrent
*/
private void decrementCounter(Map<String, Boolean> hasExportedInCurrent) {
//once error, we decrementAndGet the counter
for (Map.Entry<String, Boolean> entry : hasExportedInCurrent.entrySet()) {
String protocol = entry.getKey();
String key = providerConfig.buildKey() + ":" + protocol;
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
if (cnt != null && cnt.get() > 0) {
cnt.decrementAndGet();
}
}
}

/**
* for check fields and parameters of consumer config
*/
Expand Down Expand Up @@ -246,12 +277,16 @@ public void unExport() {
if (!exported) {
return;
}

String key = providerConfig.buildKey();
String appName = providerConfig.getAppName();
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Unexport provider config : {} {}", key, providerConfig.getId() != null
? "with bean id " + providerConfig.getId() : "");

List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
String protocol = serverConfig.getProtocol();
String key = providerConfig.buildKey() + ":" + protocol;
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Unexport provider config : {} {}", key, providerConfig.getId() != null
? "with bean id " + providerConfig.getId() : "");
}
}

// 取消注册到注册中心
Expand All @@ -260,7 +295,6 @@ public void unExport() {
providerProxyInvoker = null;

// 取消将处理器注册到server
List<ServerConfig> serverConfigs = providerConfig.getServer();
if (serverConfigs != null) {
for (ServerConfig serverConfig : serverConfigs) {
Server server = serverConfig.getServer();
Expand All @@ -281,10 +315,15 @@ public void unExport() {
providerConfig.setConfigListener(null);

// 清除缓存状态
AtomicInteger cnt = EXPORTED_KEYS.get(key);
if (cnt != null && cnt.decrementAndGet() <= 0) {
EXPORTED_KEYS.remove(key);
for (ServerConfig serverConfig : serverConfigs) {
String protocol = serverConfig.getProtocol();
String key = providerConfig.buildKey() + ":" + protocol;
AtomicInteger cnt = EXPORTED_KEYS.get(key);
if (cnt != null && cnt.decrementAndGet() <= 0) {
EXPORTED_KEYS.remove(key);
}
}

RpcRuntimeContext.invalidateProviderConfig(this);
exported = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ static List<String> convertProviderToUrls(ProviderConfig providerConfig) {
.append(
getKeyPairs(RpcConstants.CONFIG_KEY_DYNAMIC, providerConfig.isDynamic()))
.append(getKeyPairs(ProviderInfoAttrs.ATTR_WEIGHT, providerConfig.getWeight()))
.append(getKeyPairs(ProviderInfoAttrs.ATTR_WARMUP_TIME,
providerConfig.getParameter(ProviderInfoAttrs.ATTR_WARMUP_TIME)))
.append(getKeyPairs(ProviderInfoAttrs.ATTR_WARMUP_WEIGHT,
providerConfig.getParameter(ProviderInfoAttrs.ATTR_WARMUP_WEIGHT)))
.append(getKeyPairs("accepts", server.getAccepts()))
.append(getKeyPairs(ProviderInfoAttrs.ATTR_START_TIME, RpcRuntimeContext.now()))
.append(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.server.multi;

import com.alipay.sofa.rpc.bootstrap.DefaultProviderBootstrap;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.context.RpcRuntimeContext;
import com.alipay.sofa.rpc.server.rest.RestService;
import com.alipay.sofa.rpc.server.rest.RestServiceImpl;
import com.alipay.sofa.rpc.test.ActivelyDestroyTest;
import org.junit.Assert;
import org.junit.Test;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author <a href="mailto:[email protected]">GengZhang</a>
*/
public class MultiProtolServerTest extends ActivelyDestroyTest {

@Test
public void testMultiProtocol() {

try {
// 只有2个线程 执行
ServerConfig serverConfig = new ServerConfig()
.setStopTimeout(0)
.setPort(22222)
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setQueues(100).setCoreThreads(1).setMaxThreads(2);

// 发布一个服务,每个请求要执行1秒
ProviderConfig<RestService> providerConfig = new ProviderConfig<RestService>()
.setInterfaceId(RestService.class.getName())
.setRef(new RestServiceImpl())
.setServer(serverConfig)
.setRepeatedExportLimit(1)
.setRegister(false);
providerConfig.export();

ServerConfig serverConfig2 = new ServerConfig()
.setStopTimeout(0)
.setPort(22223)
.setProtocol(RpcConstants.PROTOCOL_TYPE_REST)
.setQueues(100).setCoreThreads(1).setMaxThreads(2);

// 发布一个服务,每个请求要执行1秒
ProviderConfig<RestService> providerConfig2 = new ProviderConfig<RestService>()
.setInterfaceId(RestService.class.getName())
.setRef(new RestServiceImpl())
.setServer(serverConfig2)
.setRepeatedExportLimit(1)
.setRegister(false);
providerConfig2.export();
} catch (Throwable e) {
Assert.fail();
}

}

@Test
public void testMultiProtocolExp() throws NoSuchFieldException {
try {
// 只有2个线程 执行
ServerConfig serverConfig = new ServerConfig()
.setStopTimeout(0)
.setPort(22222)
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setQueues(100).setCoreThreads(1).setMaxThreads(2);

// 发布一个服务,每个请求要执行1秒
ProviderConfig<RestService> providerConfig = new ProviderConfig<RestService>()
.setInterfaceId(RestService.class.getName())
.setRef(new RestServiceImpl())
.setServer(serverConfig)
.setRepeatedExportLimit(1)
.setRegister(false);
providerConfig.export();

ServerConfig serverConfig2 = new ServerConfig()
.setStopTimeout(0)
.setPort(22223)
.setProtocol(RpcConstants.PROTOCOL_TYPE_REST)
.setQueues(100).setCoreThreads(1).setMaxThreads(2);

// 发布一个服务,每个请求要执行1秒
ProviderConfig<RestService> providerConfig2 = new ProviderConfig<RestService>()
.setInterfaceId(RestService.class.getName())
.setRef(new RestServiceImpl())
.setServer(serverConfig2)
.setRepeatedExportLimit(1)
.setRegister(false);
providerConfig2.export();

ProviderConfig<RestService> providerConfig3 = new ProviderConfig<RestService>()
.setInterfaceId(RestService.class.getName())
.setRef(new RestServiceImpl())
.setServer(serverConfig2)
.setRepeatedExportLimit(1)
.setRegister(false);
providerConfig3.export();

Assert.fail();
} catch (Throwable e) {
//reflect to fetch export key
ConcurrentHashMap<String, AtomicInteger> map = null;

Field field = DefaultProviderBootstrap.class.getDeclaredField("EXPORTED_KEYS");
try {
field.setAccessible(true);
map = (ConcurrentHashMap<String, AtomicInteger>) field.get(null);
} catch (IllegalAccessException e1) {
e1.printStackTrace();
}

//two providers publish done, the third will false, and revert counter, export value is 1
for (Map.Entry<String, AtomicInteger> entry : map.entrySet()) {
AtomicInteger atomicInteger = entry.getValue();
Assert.assertEquals(1, atomicInteger.get());
}
}

}
}

0 comments on commit b0b4176

Please sign in to comment.