Skip to content

Commit

Permalink
Feat/triple unique (sofastack#981)
Browse files Browse the repository at this point in the history
* append uniqueId to path
  • Loading branch information
OrezzerO authored Sep 29, 2020
1 parent 08d1c98 commit 84d09c2
Show file tree
Hide file tree
Showing 10 changed files with 434 additions and 17 deletions.
2 changes: 1 addition & 1 deletion compiler/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-compiler</artifactId>
<version>0.0.2</version>
<version>0.0.3</version>

<packaging>jar</packaging>

Expand Down
28 changes: 24 additions & 4 deletions compiler/src/main/resources/SofaTripleStub.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

import java.util.concurrent.TimeUnit;

import static com.alipay.sofa.rpc.constant.TripleConstant.UNIQUE_ID;
import static {{packageName}}.{{serviceName}}Grpc.getServiceDescriptor;
import static io.grpc.stub.ClientCalls.blockingUnaryCall;
import static io.grpc.stub.ServerCalls.asyncUnaryCall;
import static io.grpc.stub.ServerCalls.asyncServerStreamingCall;
import static io.grpc.stub.ServerCalls.asyncClientStreamingCall;
Expand All @@ -28,10 +30,13 @@ protected int timeout;
protected {{serviceName}}Grpc.{{serviceName}}BlockingStub blockingStub;
protected {{serviceName}}Grpc.{{serviceName}}FutureStub futureStub;
protected {{serviceName}}Grpc.{{serviceName}}Stub stub;
protected io.grpc.CallOptions callOptions;
protected String uniqueId;

public Sofa{{serviceName}}Stub(io.grpc.Channel channel, io.grpc.CallOptions callOptions, int timeout) {
this.timeout = timeout;
this.callOptions = callOptions;
uniqueId= callOptions.getOption(UNIQUE_ID);
blockingStub = {{serviceName}}Grpc.newBlockingStub(channel).build(channel, callOptions);
futureStub = {{serviceName}}Grpc.newFutureStub(channel).build(channel, callOptions);
stub = {{serviceName}}Grpc.newStub(channel).build(channel, callOptions);
Expand All @@ -45,9 +50,11 @@ stub = {{serviceName}}Grpc.newStub(channel).build(channel, callOptions);
@java.lang.Deprecated
{{/deprecated}}
public {{outputType}} {{methodName}}({{inputType}} request) {
return blockingStub
.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS)
.{{methodName}}(request);
io.grpc.MethodDescriptor<{{inputType}}, {{outputType}}> method = {{serviceName}}Grpc.get{{methodNamePascalCase}}Method();
if(isNotEmpty(uniqueId)){
method = method.toBuilder().setFullMethodName(getFullNameWithUniqueId(method.getFullMethodName(), callOptions.getOption(UNIQUE_ID))).build();
}
return blockingUnaryCall(blockingStub.getChannel(),method,callOptions,request);
}

public com.google.common.util.concurrent.ListenableFuture<{{outputType}}> {{methodName}}Async({{inputType}} request) {
Expand All @@ -62,6 +69,19 @@ stub = {{serviceName}}Grpc.newStub(channel).build(channel, callOptions);
.{{methodName}}(request, responseObserver);
}

private boolean isNotEmpty(String uniqueId) {
return uniqueId != null && uniqueId.length() > 0;
}

private String getFullNameWithUniqueId(String fullMethodName, String uniqueId) {
int i = fullMethodName.indexOf("/");
if (i > 0 && isNotEmpty(uniqueId)) {
String[] split = fullMethodName.split("/");
fullMethodName = split[0] + "." + uniqueId + "/" + split[1];
}
return fullMethodName;
}

{{/unaryMethods}}
{{#serverStreamingMethods}}
{{#javaDoc}}
Expand Down
2 changes: 1 addition & 1 deletion example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<properties>
<protoc.version>3.11.0</protoc.version>
<protoc-gen-grpc-java.version>1.17.0</protoc-gen-grpc-java.version>
<sofa.rpc.compiler.version>0.0.2</sofa.rpc.compiler.version>
<sofa.rpc.compiler.version>0.0.3</sofa.rpc.compiler.version>
<grpc.version>1.28.0</grpc.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.constant;

import io.grpc.CallOptions;

/**
* @author zhaowang
* @version : TripleConstant.java, v 0.1 2020年09月01日 8:05 下午 zhaowang Exp $
*/
public class TripleConstant {
public static final CallOptions.Key<String> UNIQUE_ID = CallOptions.Key.createWithDefault(
"uniqueId", "");
public static final String TRIPLE_EXPOSE_OLD = "triple.use.old.path";
public static final String EXPOSE_OLD_UNIQUE_ID_SERVICE_KEY = "triple_expose_old_unique_id_service";
public static final Boolean EXPOSE_OLD_UNIQUE_ID_SERVICE = Boolean.valueOf(System.getProperty(
EXPOSE_OLD_UNIQUE_ID_SERVICE_KEY,
"false"));

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.rpc.server.triple;

import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
Expand Down Expand Up @@ -55,12 +56,16 @@
import triple.Response;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import static com.alipay.sofa.rpc.constant.TripleConstant.EXPOSE_OLD_UNIQUE_ID_SERVICE;
import static com.alipay.sofa.rpc.constant.TripleConstant.TRIPLE_EXPOSE_OLD;
import static com.alipay.sofa.rpc.utils.SofaProtoUtils.getFullNameWithUniqueId;
import static io.grpc.MethodDescriptor.generateFullMethodName;

/**
Expand All @@ -77,6 +82,12 @@ public class TripleServer implements Server {
*/
private static final Logger LOGGER = LoggerFactory
.getLogger(TripleServer.class);
/**
* The mapping relationship between BindableService and ServerServiceDefinition
*/
protected ConcurrentHashMap<ProviderConfig, ServerServiceDefinition> oldServiceInfo = new ConcurrentHashMap<ProviderConfig,
ServerServiceDefinition>();

/**
* server config
*/
Expand Down Expand Up @@ -229,7 +240,8 @@ public void stop() {
public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
Object ref = providerConfig.getRef();
try {
final ServerServiceDefinition serviceDef;
ServerServiceDefinition serviceDef;

if (SofaProtoUtils.isProtoClass(ref)) {
BindableService bindableService = (BindableService) providerConfig.getRef();
serviceDef = bindableService.bindService();
Expand All @@ -241,19 +253,74 @@ public void registerProcessor(ProviderConfig providerConfig, Invoker instance) {
genericService.setProxiedImpl(genericService);
serviceDef = buildSofaServiceDef(genericService, providerConfig);
}
ServerServiceDefinition oldServiceDef;
oldServiceDef = serviceDef;
if (StringUtils.isNotBlank(providerConfig.getUniqueId())) {
serviceDef = appendUniqueIdToServiceDef(providerConfig.getUniqueId(), serviceDef);
if (exposeOldUniqueIdService(providerConfig)) {
List<TripleServerInterceptor> interceptorList = buildInterceptorChain(oldServiceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
oldServiceDef, interceptorList);
oldServiceInfo.put(providerConfig, serviceDefinition);
ServerServiceDefinition ssd = handlerRegistry.addService(serviceDefinition);
if (ssd != null) {
throw new IllegalStateException("Can not expose service with same name:" +
serviceDefinition.getServiceDescriptor().getName());
}
}
}

List<TripleServerInterceptor> interceptorList = buildInterceptorChain(serviceDef);
ServerServiceDefinition serviceDefinition = ServerInterceptors.intercept(
serviceDef, interceptorList);
serviceInfo.put(providerConfig, serviceDefinition);
handlerRegistry.addService(serviceDefinition);
ServerServiceDefinition ssd = handlerRegistry.addService(serviceDefinition);
if (ssd != null) {
throw new IllegalStateException("Can not expose service with same name:" +
serviceDefinition.getServiceDescriptor().getName());
}
invokerCnt.incrementAndGet();
} catch (Exception e) {
LOGGER.error("Register triple service error", e);
String msg = "Register triple service error";
LOGGER.error(msg, e);
serviceInfo.remove(providerConfig);
throw new SofaRpcRuntimeException(msg, e);
}

}

private boolean exposeOldUniqueIdService(ProviderConfig providerConfig) {
//default false
String exposeOld = providerConfig.getParameter(TRIPLE_EXPOSE_OLD);
if (StringUtils.isBlank(exposeOld)) {
return EXPOSE_OLD_UNIQUE_ID_SERVICE;
} else {
return Boolean.parseBoolean(exposeOld);
}
}

private ServerServiceDefinition appendUniqueIdToServiceDef(String uniqueId, ServerServiceDefinition serviceDef) {
final String newServiceName = serviceDef.getServiceDescriptor().getName() + "." + uniqueId;

ServerServiceDefinition.Builder builder = ServerServiceDefinition.builder(newServiceName);

Collection<ServerMethodDefinition<?, ?>> methods = serviceDef.getMethods();
for (ServerMethodDefinition method : methods) {
MethodDescriptor<?, ?> methodDescriptor = method.getMethodDescriptor();
String fullMethodName = methodDescriptor.getFullMethodName();
MethodDescriptor<?, ?> methodDescriptorWithUniqueId = methodDescriptor
.toBuilder()
.setFullMethodName(
getFullNameWithUniqueId(fullMethodName, uniqueId))
.build();
ServerMethodDefinition<?, ?> newServerMethodDefinition = ServerMethodDefinition.create(
methodDescriptorWithUniqueId, method.getServerCallHandler());
builder.addMethod(newServerMethodDefinition);
}
serviceDef = builder.build();
return serviceDef;
}

private ServerServiceDefinition buildSofaServiceDef(GenericServiceImpl genericService,
ProviderConfig providerConfig) {
ServerServiceDefinition templateDefinition = genericService.bindService();
Expand Down Expand Up @@ -330,6 +397,12 @@ protected List<TripleServerInterceptor> buildInterceptorChain(ServerServiceDefin
public void unRegisterProcessor(ProviderConfig providerConfig, boolean closeIfNoEntry) {
try {
ServerServiceDefinition serverServiceDefinition = serviceInfo.get(providerConfig);
if (exposeOldUniqueIdService(providerConfig)) {
ServerServiceDefinition oldServiceDef = oldServiceInfo.get(providerConfig);
if (oldServiceDef != null) {
handlerRegistry.removeService(oldServiceDef);
}
}
handlerRegistry.removeService(serverServiceDefinition);
invokerCnt.decrementAndGet();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package com.alipay.sofa.rpc.transport.triple;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.codec.Serializer;
import com.alipay.sofa.rpc.codec.SerializerFactory;
import com.alipay.sofa.rpc.common.utils.StringUtils;
Expand All @@ -38,7 +37,10 @@
import java.util.concurrent.TimeUnit;

import static com.alipay.sofa.rpc.common.RpcConstants.SERIALIZE_HESSIAN2;
import static com.alipay.sofa.rpc.constant.TripleConstant.TRIPLE_EXPOSE_OLD;
import static com.alipay.sofa.rpc.constant.TripleConstant.UNIQUE_ID;
import static com.alipay.sofa.rpc.utils.SofaProtoUtils.checkIfUseGeneric;
import static com.alipay.sofa.rpc.utils.SofaProtoUtils.getFullNameWithUniqueId;
import static io.grpc.MethodDescriptor.generateFullMethodName;

/**
Expand All @@ -62,11 +64,14 @@ public class TripleClientInvoker implements TripleInvoker {

private Serializer serializer;
private String serialization;
private boolean useOldPath;

public TripleClientInvoker(ConsumerConfig consumerConfig, Channel channel) {
this.channel = channel;
this.consumerConfig = consumerConfig;
useGeneric = checkIfUseGeneric(consumerConfig);
//default false
useOldPath = Boolean.parseBoolean(consumerConfig.getParameter(TRIPLE_EXPOSE_OLD));
cacheCommonData(consumerConfig);

if (!useGeneric) {
Expand Down Expand Up @@ -97,8 +102,8 @@ public SofaResponse invoke(SofaRequest sofaRequest, int timeout)
throws Exception {
if (!useGeneric) {
SofaResponse sofaResponse = new SofaResponse();
ProviderInfo providerInfo = null;
Object stub = sofaStub.invoke(null, channel, buildCustomCallOptions(sofaRequest, timeout), timeout);
Object stub = sofaStub.invoke(null, channel, buildCustomCallOptions(sofaRequest, timeout),
timeout);
final Method method = sofaRequest.getMethod();
Object appResponse = method.invoke(stub, sofaRequest.getMethodArgs()[0]);
sofaResponse.setAppResponse(appResponse);
Expand All @@ -110,9 +115,14 @@ public SofaResponse invoke(SofaRequest sofaRequest, int timeout)
MethodDescriptor.Marshaller<?> responseMarshaller = null;
requestMarshaller = io.grpc.protobuf.ProtoUtils.marshaller(Request.getDefaultInstance());
responseMarshaller = io.grpc.protobuf.ProtoUtils.marshaller(Response.getDefaultInstance());
MethodDescriptor methodDescriptor = io.grpc.MethodDescriptor.newBuilder()
String fullMethodName = generateFullMethodName(serviceName, methodName);
MethodDescriptor methodDescriptor = io.grpc.MethodDescriptor
.newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(generateFullMethodName(serviceName, methodName)).setSampledToLocalTracing(true)
.setFullMethodName(useOldPath ? fullMethodName :
getFullNameWithUniqueId(fullMethodName,
consumerConfig.getUniqueId()))
.setSampledToLocalTracing(true)
.setRequestMarshaller((MethodDescriptor.Marshaller<Object>) requestMarshaller)
.setResponseMarshaller((MethodDescriptor.Marshaller<Object>) responseMarshaller)
.build();
Expand Down Expand Up @@ -171,6 +181,9 @@ protected CallOptions buildCustomCallOptions(SofaRequest sofaRequest, int timeou
if (timeout >= 0) {
tripleCallOptions = tripleCallOptions.withDeadlineAfter(timeout, TimeUnit.MILLISECONDS);
}
if (StringUtils.isNotBlank(consumerConfig.getUniqueId())) {
tripleCallOptions = tripleCallOptions.withOption(UNIQUE_ID, consumerConfig.getUniqueId());
}
return tripleCallOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.rpc.utils;

import com.alipay.sofa.rpc.common.utils.ClassUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import io.grpc.BindableService;
import io.grpc.CallOptions;
Expand Down Expand Up @@ -62,4 +63,13 @@ public static boolean checkIfUseGeneric(ConsumerConfig consumerConfig) {
return true;
}

public static String getFullNameWithUniqueId(String fullMethodName, String uniqueId) {
int i = fullMethodName.indexOf("/");
if (i > 0 && StringUtils.isNotBlank(uniqueId)) {
String[] split = fullMethodName.split("/");
fullMethodName = split[0] + "." + uniqueId + "/" + split[1];
}
return fullMethodName;
}

}
2 changes: 1 addition & 1 deletion test/test-integration/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<properties>
<protoc.version>3.11.0</protoc.version>
<protoc-gen-grpc-java.version>1.17.0</protoc-gen-grpc-java.version>
<sofa.rpc.compiler.version>0.0.2</sofa.rpc.compiler.version>
<sofa.rpc.compiler.version>0.0.3</sofa.rpc.compiler.version>
<grpc.version>1.28.0</grpc.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
</properties>
Expand Down
Loading

0 comments on commit 84d09c2

Please sign in to comment.