Skip to content

Commit

Permalink
feature: modify triple unique id logic (sofastack#1006)
Browse files Browse the repository at this point in the history
  • Loading branch information
hui-cha authored Feb 10, 2022
1 parent 8f2fae2 commit 443b1c6
Show file tree
Hide file tree
Showing 13 changed files with 428 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
import com.alipay.sofa.rpc.bootstrap.DefaultProviderBootstrap;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.proxy.ProxyFactory;
import com.alipay.sofa.rpc.server.ProviderProxyInvoker;

import java.lang.reflect.Method;

Expand All @@ -34,8 +30,6 @@
@Extension("tri")
public class TripleProviderBootstrap<T> extends DefaultProviderBootstrap<T> {

private static final Logger LOGGER = LoggerFactory.getLogger(TripleProviderBootstrap.class);

/**
* 构造函数
*
Expand All @@ -45,26 +39,6 @@ protected TripleProviderBootstrap(ProviderConfig<T> providerConfig) {
super(providerConfig);
}

@Override
protected void preProcessProviderTarget(ProviderConfig providerConfig, ProviderProxyInvoker providerProxyInvoker) {
Class<?> implClass = providerConfig.getRef().getClass();
try {
Method method = implClass.getMethod("setProxiedImpl", providerConfig.getProxyClass());
Object obj = ProxyFactory.buildProxy(providerConfig.getProxy(), providerConfig.getProxyClass(),
providerProxyInvoker);
method.invoke(providerConfig.getRef(), obj);
} catch (NoSuchMethodException e) {
LOGGER
.info(
"{} don't hava method setProxiedImpl, will treated as origin provider service instead of grpc service.",
implClass);
} catch (Exception e) {
throw new IllegalStateException(
"Failed to set sofa proxied service impl to stub, please make sure your stub "
+ "was generated by the sofa-protoc-compiler.", e);
}
}

@Override
public void export() {
Class enclosingClass = this.getProviderConfig().getProxyClass().getEnclosingClass();
Expand Down
2 changes: 1 addition & 1 deletion example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<properties>
<protoc.version>3.11.0</protoc.version>
<protoc-gen-grpc-java.version>1.17.0</protoc-gen-grpc-java.version>
<sofa.rpc.compiler.version>0.0.3</sofa.rpc.compiler.version>
<sofa.rpc.compiler.version>0.0.2</sofa.rpc.compiler.version>
<grpc.version>1.28.0</grpc.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,4 @@ public class TripleConstant {
public static final String TRIPLE_EXPOSE_OLD = "triple.use.old.path";
public static final Boolean EXPOSE_OLD_UNIQUE_ID_SERVICE = SofaConfigs
.getOrDefault(RpcConfigKeys.TRIPLE_EXPOSE_OLD_UNIQUE_ID_SERVICE);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,6 @@ public class TripleContants {

public static final String SOFA_CONSUMER_CONFIG_KEY = "_SOFA_CONSUMER_CONFIG";

public static final String SOFA_UNIQUE_ID = "_SOFA_UNIQUE_ID";

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.alipay.sofa.rpc.server.triple;

import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
Expand Down Expand Up @@ -54,17 +53,17 @@
import triple.Request;
import triple.Response;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import static com.alipay.sofa.rpc.constant.TripleConstant.EXPOSE_OLD_UNIQUE_ID_SERVICE;
import static com.alipay.sofa.rpc.constant.TripleConstant.TRIPLE_EXPOSE_OLD;
import static com.alipay.sofa.rpc.utils.SofaProtoUtils.getFullNameWithUniqueId;
import static io.grpc.MethodDescriptor.generateFullMethodName;

/**
Expand All @@ -81,12 +80,6 @@ public class TripleServer implements Server {
*/
private static final Logger LOGGER = LoggerFactory
.getLogger(TripleServer.class);
/**
* The mapping relationship between BindableService and ServerServiceDefinition
*/
protected ConcurrentHashMap<ProviderConfig, ServerServiceDefinition> oldServiceInfo = new ConcurrentHashMap<ProviderConfig,
ServerServiceDefinition>();

/**
* server config
*/
Expand All @@ -112,6 +105,10 @@ public class TripleServer implements Server {
*/
protected ConcurrentHashMap<ProviderConfig, ServerServiceDefinition> serviceInfo = new ConcurrentHashMap<ProviderConfig,
ServerServiceDefinition>();
/**
* The mapping relationship between service name and unique id invoker
*/
protected Map<String, UniqueIdInvoker> invokerMap = new ConcurrentHashMap<>();

/**
* invoker count
Expand All @@ -124,6 +121,11 @@ public class TripleServer implements Server {
*/
private ThreadPoolExecutor bizThreadPool;

/**
* lock
*/
private Lock lock;

@Override
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
Expand All @@ -135,6 +137,7 @@ public void init(ServerConfig serverConfig) {
.executor(bizThreadPool)
.channelType(constructChannel())
.build();
this.lock = new ReentrantLock();
}

private Class<? extends ServerChannel> constructChannel() {
Expand Down Expand Up @@ -238,86 +241,75 @@ public void stop() {
@Override
public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
Object ref = providerConfig.getRef();
this.lock.lock();
try {
ServerServiceDefinition serviceDef;
// wrap invoker to support unique id
UniqueIdInvoker oldInvoker = this.invokerMap.putIfAbsent(providerConfig.getInterfaceId(), new UniqueIdInvoker());
if (null != oldInvoker) {
// this service has already registered into grpc server,
// we only need register given invoker into unique id invoker.
if (!oldInvoker.registerInvoker(providerConfig, instance)) {
throw new IllegalStateException("Can not expose service with interface:" + providerConfig.getInterfaceId() + " and unique id: " + providerConfig.getUniqueId());
}
return;
}

UniqueIdInvoker uniqueIdInvoker = this.invokerMap.get(providerConfig.getInterfaceId());
if (!uniqueIdInvoker.registerInvoker(providerConfig, instance)) {
throw new IllegalStateException("Can not expose service with interface:" + providerConfig.getInterfaceId() + " and unique id: " + providerConfig.getUniqueId());
}

// create service definition
ServerServiceDefinition serviceDef;
if (SofaProtoUtils.isProtoClass(ref)) {
// refer is BindableService
this.setBindableProxiedImpl(providerConfig, uniqueIdInvoker);
BindableService bindableService = (BindableService) providerConfig.getRef();
serviceDef = bindableService.bindService();

} else {
Object obj = ProxyFactory.buildProxy(providerConfig.getProxy(), providerConfig.getProxyClass(),
instance);
// normal class
Object obj = ProxyFactory.buildProxy(providerConfig.getProxy(), providerConfig.getProxyClass(), uniqueIdInvoker);
GenericServiceImpl genericService = new GenericServiceImpl(obj, providerConfig.getProxyClass());
genericService.setProxiedImpl(genericService);
serviceDef = buildSofaServiceDef(genericService, providerConfig);
}
ServerServiceDefinition oldServiceDef;
oldServiceDef = serviceDef;
if (StringUtils.isNotBlank(providerConfig.getUniqueId())) {
serviceDef = appendUniqueIdToServiceDef(providerConfig.getUniqueId(), serviceDef);
if (exposeOldUniqueIdService(providerConfig)) {
List<TripleServerInterceptor> interceptorList = buildInterceptorChain(oldServiceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
oldServiceDef, interceptorList);
oldServiceInfo.put(providerConfig, serviceDefinition);
ServerServiceDefinition ssd = handlerRegistry.addService(serviceDefinition);
if (ssd != null) {
throw new IllegalStateException("Can not expose service with same name:" +
serviceDefinition.getServiceDescriptor().getName());
}
}
}

List<TripleServerInterceptor> interceptorList = buildInterceptorChain(serviceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
serviceDef, interceptorList);
serviceInfo.put(providerConfig, serviceDefinition);
ServerServiceDefinition ssd = handlerRegistry.addService(serviceDefinition);
this.serviceInfo.put(providerConfig, serviceDefinition);
ServerServiceDefinition ssd = this.handlerRegistry.addService(serviceDefinition);
if (ssd != null) {
throw new IllegalStateException("Can not expose service with same name:" +
serviceDefinition.getServiceDescriptor().getName());
}
invokerCnt.incrementAndGet();
this.invokerCnt.incrementAndGet();
} catch (Exception e) {
String msg = "Register triple service error";
LOGGER.error(msg, e);
serviceInfo.remove(providerConfig);
this.serviceInfo.remove(providerConfig);
throw new SofaRpcRuntimeException(msg, e);
}

}

private boolean exposeOldUniqueIdService(ProviderConfig providerConfig) {
//default false
String exposeOld = providerConfig.getParameter(TRIPLE_EXPOSE_OLD);
if (StringUtils.isBlank(exposeOld)) {
return EXPOSE_OLD_UNIQUE_ID_SERVICE;
} else {
return Boolean.parseBoolean(exposeOld);
} finally {
this.lock.unlock();
}
}

private ServerServiceDefinition appendUniqueIdToServiceDef(String uniqueId, ServerServiceDefinition serviceDef) {
final String newServiceName = serviceDef.getServiceDescriptor().getName() + "." + uniqueId;

ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(newServiceName);

Collection<ServerMethodDefinition<?, ?>> methods = serviceDef.getMethods();
for (ServerMethodDefinition method : methods) {
MethodDescriptor<?, ?> methodDescriptor = method.getMethodDescriptor();
String fullMethodName = methodDescriptor.getFullMethodName();
MethodDescriptor<?, ?> methodDescriptorWithUniqueId = methodDescriptor
.toBuilder()
.setFullMethodName(
getFullNameWithUniqueId(fullMethodName, uniqueId))
.build();
ServerMethodDefinition<?, ?> newServerMethodDefinition = ServerMethodDefinition.create(
methodDescriptorWithUniqueId, method.getServerCallHandler());
builder.addMethod(newServerMethodDefinition);
private void setBindableProxiedImpl(ProviderConfig providerConfig, Invoker invoker) {
Class<?> implClass = providerConfig.getRef().getClass();
try {
Method method = implClass.getMethod("setProxiedImpl", providerConfig.getProxyClass());
Object obj = ProxyFactory.buildProxy(providerConfig.getProxy(), providerConfig.getProxyClass(), invoker);
method.invoke(providerConfig.getRef(), obj);
} catch (NoSuchMethodException e) {
LOGGER
.info(
"{} don't hava method setProxiedImpl, will treated as origin provider service instead of grpc service.",
implClass);
} catch (Exception e) {
throw new IllegalStateException(
"Failed to set sofa proxied service impl to stub, please make sure your stub "
+ "was generated by the sofa-protoc-compiler.", e);
}
serviceDef = builder.build();
return serviceDef;
}

private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericService,
Expand All @@ -332,15 +324,12 @@ private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericSe
templateDefinition, providerConfig, methodDescriptor));
for (ServerMethodDefinition<Request, Response> methodDef : methodDefs) {
builder.addMethod(methodDef);

}
return builder.build();

}

private List<ServerMethodDefinition<Request, Response>> getMethodDefinitions(ServerCallHandler<Request, Response> templateHandler,List<MethodDescriptor<Request, Response>> methodDescriptors) {
List<ServerMethodDefinition<Request, Response>> result = new ArrayList<>();

for (MethodDescriptor<Request, Response> methodDescriptor : methodDescriptors) {
ServerMethodDefinition<Request, Response> serverMethodDefinition = ServerMethodDefinition.create(methodDescriptor, templateHandler);
result.add(serverMethodDefinition);
Expand All @@ -360,9 +349,8 @@ private ServiceDescriptor getServiceDescriptor(ServerServiceDefinition template,

}

private List<MethodDescriptor<Request, Response>> getMethodDescriptor( ProviderConfig providerConfig) {
private List<MethodDescriptor<Request, Response>> getMethodDescriptor(ProviderConfig providerConfig) {
List<MethodDescriptor<Request, Response>> result = new ArrayList<>();

Set<String> methodNames = SofaProtoUtils.getMethodNames(providerConfig.getInterfaceId());
for (String name : methodNames) {
MethodDescriptor<Request, Response> methodDescriptor = MethodDescriptor.<Request, Response>newBuilder()
Expand All @@ -376,7 +364,6 @@ private List<MethodDescriptor<Request, Response>> getMethodDescriptor( ProviderC
.build();
result.add(methodDescriptor);
}

return result;
}

Expand All @@ -394,18 +381,24 @@ protected List<TripleServerInterceptor> buildInterceptorChain(ServerServiceDefin

@Override
public void unRegisterProcessor(ProviderConfig providerConfig, boolean closeIfNoEntry) {
this.lock.lock();
try {
ServerServiceDefinition serverServiceDefinition = serviceInfo.get(providerConfig);
if (exposeOldUniqueIdService(providerConfig)) {
ServerServiceDefinition oldServiceDef = oldServiceInfo.get(providerConfig);
if (oldServiceDef != null) {
handlerRegistry.removeService(oldServiceDef);
ServerServiceDefinition serverServiceDefinition = this.serviceInfo.get(providerConfig);
UniqueIdInvoker uniqueIdInvoker = this.invokerMap.get(providerConfig.getInterfaceId());
if (null != uniqueIdInvoker) {
uniqueIdInvoker.unRegisterInvoker(providerConfig);
if (!uniqueIdInvoker.hasInvoker()) {
this.invokerMap.remove(providerConfig.getInterfaceId());
this.handlerRegistry.removeService(serverServiceDefinition);
}
} else {
this.handlerRegistry.removeService(serverServiceDefinition);
}
handlerRegistry.removeService(serverServiceDefinition);
invokerCnt.decrementAndGet();
} catch (Exception e) {
LOGGER.error("Unregister triple service error", e);
} finally {
this.lock.unlock();
}
if (closeIfNoEntry && invokerCnt.get() == 0) {
stop();
Expand Down
Loading

0 comments on commit 443b1c6

Please sign in to comment.