Skip to content

Commit

Permalink
triple invoke support callback&future (sofastack#1249)
Browse files Browse the repository at this point in the history
* triple invoke support callback

Co-authored-by: liujianjun.ljj <[email protected]>
  • Loading branch information
zhenjunMa and liujianjun.ljj authored Nov 9, 2022
1 parent 1d0dbe2 commit 2a0556d
Show file tree
Hide file tree
Showing 9 changed files with 396 additions and 27 deletions.
12 changes: 11 additions & 1 deletion all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
<bolt.swagger.version>1.5.18</bolt.swagger.version>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.13</httpcore.version>
<grpc.version>1.33.0</grpc.version>
<grpc.version>1.33.1</grpc.version>
<guava.version>27.0-jre</guava.version>
<transmittable.version>2.12.1</transmittable.version>
</properties>
Expand Down Expand Up @@ -383,6 +383,16 @@
<artifactId>httpmime</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
Expand Down
12 changes: 11 additions & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<jackson.databind.version>2.9.10.8</jackson.databind.version>
<msgpack.version>0.6.12</msgpack.version>
<protostuff.version>1.5.9</protostuff.version>
<grpc.version>1.33.0</grpc.version>
<grpc.version>1.33.1</grpc.version>

<!--common-->
<httpcore.version>4.4.13</httpcore.version>
Expand Down Expand Up @@ -464,6 +464,16 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<protoc.version>3.11.0</protoc.version>
<protoc-gen-grpc-java.version>1.17.0</protoc-gen-grpc-java.version>
<sofa.rpc.compiler.version>0.0.3</sofa.rpc.compiler.version>
<grpc.version>1.33.0</grpc.version>
<grpc.version>1.33.1</grpc.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.message.triple;

import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.message.AbstractResponseFuture;

import java.util.List;
import java.util.concurrent.ExecutionException;

/**
* @author gujin
* Created on 2022/9/27 5:00 下午
*/
public class TripleResponseFuture<V> extends AbstractResponseFuture<V> {

/**
* sofa请求
*/
protected final SofaRequest request;

/**
* 构造函数
*/
public TripleResponseFuture(SofaRequest request, int timeout) {
super(timeout);
this.request = request;
}

@Override
protected V getNow() throws ExecutionException {
if (cause != null) {
throw new ExecutionException(cause);
} else {
return (V) result;
}
}

@Override
protected void releaseIfNeed(Object result) {

}

@Override
public TripleResponseFuture addListeners(List<SofaResponseCallback> list) {
throw new UnsupportedOperationException("Not supported, Please use callback function");
}

@Override
public TripleResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
throw new UnsupportedOperationException("Not supported, Please use callback function");
}

@Override
public void notifyListeners() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,30 @@
import com.alipay.sofa.rpc.codec.SerializerFactory;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.message.ResponseFuture;
import com.alipay.sofa.rpc.message.triple.TripleResponseFuture;
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.protobuf.ProtoUtils;
import io.grpc.stub.ClientCalls;
import io.grpc.stub.StreamObserver;
import triple.Request;
import triple.Response;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static com.alipay.sofa.rpc.common.RpcConstants.SERIALIZE_HESSIAN2;
Expand Down Expand Up @@ -63,6 +73,8 @@ public class TripleClientInvoker implements TripleInvoker {
private Serializer serializer;
private String serialization;

private Map<String, Method> methodMap = new ConcurrentHashMap<>();

public TripleClientInvoker(ConsumerConfig consumerConfig, Channel channel) {
this.channel = channel;
this.consumerConfig = consumerConfig;
Expand Down Expand Up @@ -104,24 +116,8 @@ public SofaResponse invoke(SofaRequest sofaRequest, int timeout)
sofaResponse.setAppResponse(appResponse);
return sofaResponse;
} else {
String serviceName = sofaRequest.getInterfaceName();
String methodName = sofaRequest.getMethodName();
MethodDescriptor.Marshaller<?> requestMarshaller = null;
MethodDescriptor.Marshaller<?> responseMarshaller = null;
requestMarshaller = io.grpc.protobuf.ProtoUtils.marshaller(Request.getDefaultInstance());
responseMarshaller = io.grpc.protobuf.ProtoUtils.marshaller(Response.getDefaultInstance());
String fullMethodName = generateFullMethodName(serviceName, methodName);
MethodDescriptor methodDescriptor = io.grpc.MethodDescriptor
.newBuilder()
.setType(io.grpc.MethodDescriptor.MethodType.UNARY)
.setFullMethodName(fullMethodName)
.setSampledToLocalTracing(true)
.setRequestMarshaller((MethodDescriptor.Marshaller<Object>) requestMarshaller)
.setResponseMarshaller((MethodDescriptor.Marshaller<Object>) responseMarshaller)
.build();

MethodDescriptor methodDescriptor = getMethodDescriptor(sofaRequest);
Request request = getRequest(sofaRequest, serialization, serializer);

Response response = (Response) ClientCalls.blockingUnaryCall(channel, methodDescriptor,
buildCustomCallOptions(sofaRequest, timeout), request);

Expand All @@ -131,9 +127,7 @@ public SofaResponse invoke(SofaRequest sofaRequest, int timeout)
if (returnType != void.class) {
if (responseDate != null && responseDate.length > 0) {
Serializer responseSerializer = SerializerFactory.getSerializer(response.getSerializeType());
Object appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseDate),
returnType,
null);
Object appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null);
sofaResponse.setAppResponse(appResponse);
}
}
Expand All @@ -143,6 +137,124 @@ public SofaResponse invoke(SofaRequest sofaRequest, int timeout)

}

@Override
public ResponseFuture asyncInvoke(SofaRequest sofaRequest, int timeout) throws Exception {
SofaResponseCallback sofaResponseCallback = sofaRequest.getSofaResponseCallback();
TripleResponseFuture future = new TripleResponseFuture(sofaRequest, timeout);

if (!useGeneric) {
Method m = methodMap.get(sofaRequest.getMethodName());
if (m == null) {
synchronized (this) {
m = methodMap.get(sofaRequest.getMethodName());
if (m == null) {
Class<?> clazz = Class.forName(sofaRequest.getInterfaceName());
Method[] declaredMethods = clazz.getDeclaredMethods();
for (Method tempM : declaredMethods) {
if (StringUtils.equals(tempM.getName(), sofaRequest.getMethodName()) && tempM.getParameterCount() == 2
&& StringUtils.equals(tempM.getParameterTypes()[1].getCanonicalName(), StreamObserver.class.getCanonicalName())) {
m = tempM;
methodMap.put(sofaRequest.getMethodName(), m);
break;
}
}
}
}
}
Object stub = sofaStub.invoke(null, channel, buildCustomCallOptions(sofaRequest, timeout),
null, consumerConfig, timeout);
m.invoke(stub, sofaRequest.getMethodArgs()[0], new StreamObserver<Object>() {
@Override
public void onNext(Object o) {
if (sofaResponseCallback != null) {
sofaResponseCallback.onAppResponse(o, sofaRequest.getMethodName(), sofaRequest);
} else {
future.setSuccess(o);
}
}

@Override
public void onError(Throwable throwable) {
if (sofaResponseCallback != null) {
Status status = Status.fromThrowable(throwable);
if (status.getCode() == Status.Code.UNKNOWN) {
sofaResponseCallback.onAppException(throwable, sofaRequest.getMethodName(), sofaRequest);
}else {
sofaResponseCallback.onSofaException(new SofaRpcException(RpcErrorType.UNKNOWN, status.getCause()), sofaRequest.getMethodName(), sofaRequest);
}
} else {
future.setFailure(throwable);
}
}

@Override
public void onCompleted() {

}
});
} else {
MethodDescriptor methodDescriptor = getMethodDescriptor(sofaRequest);
Request request = getRequest(sofaRequest, serialization, serializer);
ClientCalls.asyncUnaryCall(channel.newCall(methodDescriptor, buildCustomCallOptions(sofaRequest, timeout)), request, new StreamObserver<Object>() {
@Override
public void onNext(Object o) {
Object appResponse = null;
Response response = (Response) o;
byte[] responseDate = response.getData().toByteArray();
Class returnType = sofaRequest.getMethod().getReturnType();
if (returnType != void.class) {
if (responseDate != null && responseDate.length > 0) {
Serializer responseSerializer = SerializerFactory.getSerializer(response.getSerializeType());
appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null);
}
}
if (sofaResponseCallback != null) {
sofaResponseCallback.onAppResponse(appResponse, sofaRequest.getMethodName(), sofaRequest);
} else {
future.setSuccess(appResponse);
}
}

@Override
public void onError(Throwable throwable) {
if (sofaResponseCallback != null) {
Status status = Status.fromThrowable(throwable);
if (status.getCode() == Status.Code.UNKNOWN) {
sofaResponseCallback.onAppException(throwable, sofaRequest.getMethodName(), sofaRequest);
}else {
sofaResponseCallback.onSofaException(new SofaRpcException(RpcErrorType.UNKNOWN, status.getCause()), sofaRequest.getMethodName(), sofaRequest);
}
} else {
future.setFailure(throwable);
}
}

@Override
public void onCompleted() {

}
});
}
return future;
}

private MethodDescriptor getMethodDescriptor(SofaRequest sofaRequest) {
String serviceName = sofaRequest.getInterfaceName();
String methodName = sofaRequest.getMethodName();
MethodDescriptor.Marshaller<?> requestMarshaller = ProtoUtils.marshaller(Request.getDefaultInstance());
MethodDescriptor.Marshaller<?> responseMarshaller = ProtoUtils.marshaller(Response.getDefaultInstance());
String fullMethodName = generateFullMethodName(serviceName, methodName);
MethodDescriptor methodDescriptor = MethodDescriptor
.newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName(fullMethodName)
.setSampledToLocalTracing(true)
.setRequestMarshaller((MethodDescriptor.Marshaller<Object>) requestMarshaller)
.setResponseMarshaller((MethodDescriptor.Marshaller<Object>) responseMarshaller)
.build();
return methodDescriptor;
}

public static Request getRequest(SofaRequest sofaRequest, String serialization, Serializer serializer) {
Request.Builder builder = Request.newBuilder();
builder.setSerializeType(serialization);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
package com.alipay.sofa.rpc.transport.triple;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.utils.ClassLoaderUtils;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.event.ClientBeforeSendEvent;
Expand Down Expand Up @@ -158,8 +160,31 @@ public int currentRequests() {
}

@Override
public ResponseFuture asyncSend(SofaRequest message, int timeout) throws SofaRpcException {
throw new UnsupportedOperationException("Not supported");
public ResponseFuture asyncSend(SofaRequest request, int timeout) throws SofaRpcException {
SofaResponse sofaResponse = null;
SofaRpcException throwable = null;

try {
RpcInternalContext context = RpcInternalContext.getContext();
beforeSend(context, request);
RpcInvokeContext invokeContext = RpcInvokeContext.getContext();
invokeContext.put(TripleContants.SOFA_REQUEST_KEY, request);
invokeContext.put(TripleContants.SOFA_CONSUMER_CONFIG_KEY, transportConfig.getConsumerConfig());
ResponseFuture responseFuture = tripleClientInvoker.asyncInvoke(request, timeout);
if (request.getSofaResponseCallback() == null) {
return responseFuture;
}
} catch (Exception e) {
throwable = convertToRpcException(e);
throw throwable;
} finally {
if (EventBus.isEnable(ClientSyncReceiveEvent.class)) {
EventBus.post(new ClientSyncReceiveEvent(transportConfig.getConsumerConfig(),
transportConfig.getProviderInfo(), request, sofaResponse, throwable));
}
}

return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@

import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.message.ResponseFuture;

public interface TripleInvoker {

public SofaResponse invoke(SofaRequest sofaRequest, int timeout) throws Exception;

public ResponseFuture asyncInvoke(SofaRequest sofaRequest, int timeout) throws Exception;

}
Loading

0 comments on commit 2a0556d

Please sign in to comment.