Skip to content

Commit

Permalink
triple async call support trace log (sofastack#1282)
Browse files Browse the repository at this point in the history
* triple async call support trace log

* review update

* review update

Co-authored-by: liujianjun.ljj <[email protected]>
  • Loading branch information
zhenjunMa and liujianjun.ljj authored Jan 5, 2023
1 parent 7809a25 commit e203f2b
Show file tree
Hide file tree
Showing 2 changed files with 152 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,26 @@
*/
package com.alipay.sofa.rpc.transport.triple;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.codec.Serializer;
import com.alipay.sofa.rpc.codec.SerializerFactory;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.utils.ClassLoaderUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.context.BaggageResolver;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.context.RpcRuntimeContext;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.event.ClientAsyncReceiveEvent;
import com.alipay.sofa.rpc.event.ClientEndInvokeEvent;
import com.alipay.sofa.rpc.event.EventBus;
import com.alipay.sofa.rpc.filter.FilterChain;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.message.ResponseFuture;
Expand Down Expand Up @@ -66,6 +77,8 @@ public class TripleClientInvoker implements TripleInvoker {

protected ConsumerConfig consumerConfig;

protected ProviderInfo providerInfo;

protected Method sofaStub;

protected boolean useGeneric;
Expand All @@ -75,9 +88,11 @@ public class TripleClientInvoker implements TripleInvoker {

private Map<String, Method> methodMap = new ConcurrentHashMap<>();

public TripleClientInvoker(ConsumerConfig consumerConfig, Channel channel) {
public TripleClientInvoker(ConsumerConfig consumerConfig, ProviderInfo providerInfo, Channel channel) {
this.channel = channel;
this.consumerConfig = consumerConfig;
this.providerInfo = providerInfo;

useGeneric = checkIfUseGeneric(consumerConfig);
cacheCommonData(consumerConfig);

Expand Down Expand Up @@ -142,6 +157,9 @@ public ResponseFuture asyncInvoke(SofaRequest sofaRequest, int timeout) throws E
SofaResponseCallback sofaResponseCallback = sofaRequest.getSofaResponseCallback();
TripleResponseFuture future = new TripleResponseFuture(sofaRequest, timeout);

ClassLoader currentClassLoader = ClassLoaderUtils.getCurrentClassLoader();
RpcInternalContext context = RpcInternalContext.getContext();

if (!useGeneric) {
Method m = methodMap.get(sofaRequest.getMethodName());
if (m == null) {
Expand All @@ -166,25 +184,12 @@ public ResponseFuture asyncInvoke(SofaRequest sofaRequest, int timeout) throws E
m.invoke(stub, sofaRequest.getMethodArgs()[0], new StreamObserver<Object>() {
@Override
public void onNext(Object o) {
if (sofaResponseCallback != null) {
sofaResponseCallback.onAppResponse(o, sofaRequest.getMethodName(), sofaRequest);
} else {
future.setSuccess(o);
}
processSuccess(false, context, sofaRequest, o, sofaResponseCallback, future, currentClassLoader);
}

@Override
public void onError(Throwable throwable) {
if (sofaResponseCallback != null) {
Status status = Status.fromThrowable(throwable);
if (status.getCode() == Status.Code.UNKNOWN) {
sofaResponseCallback.onAppException(throwable, sofaRequest.getMethodName(), sofaRequest);
}else {
sofaResponseCallback.onSofaException(new SofaRpcException(RpcErrorType.UNKNOWN, status.getCause()), sofaRequest.getMethodName(), sofaRequest);
}
} else {
future.setFailure(throwable);
}
processError(context, sofaRequest, throwable, sofaResponseCallback, future, currentClassLoader);
}

@Override
Expand All @@ -198,35 +203,12 @@ public void onCompleted() {
ClientCalls.asyncUnaryCall(channel.newCall(methodDescriptor, buildCustomCallOptions(sofaRequest, timeout)), request, new StreamObserver<Object>() {
@Override
public void onNext(Object o) {
Object appResponse = null;
Response response = (Response) o;
byte[] responseDate = response.getData().toByteArray();
Class returnType = sofaRequest.getMethod().getReturnType();
if (returnType != void.class) {
if (responseDate != null && responseDate.length > 0) {
Serializer responseSerializer = SerializerFactory.getSerializer(response.getSerializeType());
appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null);
}
}
if (sofaResponseCallback != null) {
sofaResponseCallback.onAppResponse(appResponse, sofaRequest.getMethodName(), sofaRequest);
} else {
future.setSuccess(appResponse);
}
processSuccess(true, context, sofaRequest, o, sofaResponseCallback, future, currentClassLoader);
}

@Override
public void onError(Throwable throwable) {
if (sofaResponseCallback != null) {
Status status = Status.fromThrowable(throwable);
if (status.getCode() == Status.Code.UNKNOWN) {
sofaResponseCallback.onAppException(throwable, sofaRequest.getMethodName(), sofaRequest);
}else {
sofaResponseCallback.onSofaException(new SofaRpcException(RpcErrorType.UNKNOWN, status.getCause()), sofaRequest.getMethodName(), sofaRequest);
}
} else {
future.setFailure(throwable);
}
processError(context, sofaRequest, throwable, sofaResponseCallback, future, currentClassLoader);
}

@Override
Expand All @@ -238,6 +220,128 @@ public void onCompleted() {
return future;
}

private void processSuccess(boolean needDecode, RpcInternalContext context, SofaRequest sofaRequest, Object o, SofaResponseCallback sofaResponseCallback, TripleResponseFuture future, ClassLoader classLoader) {
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(classLoader);
RpcInternalContext.setContext(context);

SofaResponse sofaResponse = new SofaResponse();
sofaResponse.setAppResponse(o);
if (EventBus.isEnable(ClientAsyncReceiveEvent.class)) {
EventBus.post(new ClientAsyncReceiveEvent(consumerConfig, providerInfo,
sofaRequest, sofaResponse, null));
}

pickupBaggage(context, sofaResponse);

// do async filter after respond server
FilterChain chain = consumerConfig.getConsumerBootstrap().getCluster().getFilterChain();
if (chain != null) {
chain.onAsyncResponse(consumerConfig, sofaRequest, sofaResponse, null);
}

recordClientElapseTime(context);

if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
EventBus.post(new ClientEndInvokeEvent(sofaRequest, sofaResponse, null));
}

Object appResponse = o;
if (needDecode) {
Response response = (Response) o;
byte[] responseDate = response.getData().toByteArray();
Class returnType = sofaRequest.getMethod().getReturnType();
if (returnType != void.class) {
if (responseDate != null && responseDate.length > 0) {
Serializer responseSerializer = SerializerFactory.getSerializer(response.getSerializeType());
appResponse = responseSerializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null);
}
}
}

if (sofaResponseCallback != null) {
sofaResponseCallback.onAppResponse(appResponse, sofaRequest.getMethodName(), sofaRequest);
} else {
future.setSuccess(appResponse);
}
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
RpcInvokeContext.removeContext();
RpcInternalContext.removeAllContext();
}
}

private void processError(RpcInternalContext context, SofaRequest sofaRequest, Throwable throwable, SofaResponseCallback sofaResponseCallback, TripleResponseFuture future, ClassLoader classLoader) {
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(classLoader);
RpcInternalContext.setContext(context);

if (EventBus.isEnable(ClientAsyncReceiveEvent.class)) {
EventBus.post(new ClientAsyncReceiveEvent(consumerConfig, providerInfo,
sofaRequest, null, throwable));
}

// do async filter after respond server
FilterChain chain = consumerConfig.getConsumerBootstrap().getCluster().getFilterChain();
if (chain != null) {
chain.onAsyncResponse(consumerConfig, sofaRequest, null, throwable);
}

recordClientElapseTime(context);

if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
EventBus.post(new ClientEndInvokeEvent(sofaRequest, null, throwable));
}

if (sofaResponseCallback != null) {
Status status = Status.fromThrowable(throwable);
if (status.getCode() == Status.Code.UNKNOWN) {
sofaResponseCallback.onAppException(throwable, sofaRequest.getMethodName(), sofaRequest);
}else {
sofaResponseCallback.onSofaException(new SofaRpcException(RpcErrorType.UNKNOWN, status.getCause()), sofaRequest.getMethodName(), sofaRequest);
}
} else {
future.setFailure(throwable);
}
} finally {
Thread.currentThread().setContextClassLoader(oldCl);
RpcInvokeContext.removeContext();
RpcInternalContext.removeAllContext();
}
}

protected void recordClientElapseTime(RpcInternalContext context) {
if (context != null) {
Long startTime = (Long) context.removeAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME);
if (startTime != null) {
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, RpcRuntimeContext.now() - startTime);
}
}
}

protected void pickupBaggage(RpcInternalContext context, SofaResponse response) {
if (RpcInvokeContext.isBaggageEnable()) {
RpcInvokeContext old = null;
RpcInvokeContext newContext = null;
if (context != null) {
old = (RpcInvokeContext) context.getAttachment(RpcConstants.HIDDEN_KEY_INVOKE_CONTEXT);
}
if (old != null) {
RpcInvokeContext.setContext(old);
}
newContext = RpcInvokeContext.getContext();
BaggageResolver.pickupFromResponse(newContext, response);

if (old != null) {
old.getAllResponseBaggage().putAll(newContext.getAllResponseBaggage());
old.getAllRequestBaggage().putAll(newContext.getAllRequestBaggage());
}

}
}

private MethodDescriptor getMethodDescriptor(SofaRequest sofaRequest) {
String serviceName = sofaRequest.getInterfaceName();
String methodName = sofaRequest.getMethodName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@
package com.alipay.sofa.rpc.transport.triple;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.utils.ClassLoaderUtils;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.exception.SofaTimeOutException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;
import com.alipay.sofa.rpc.event.ClientAfterSendEvent;
import com.alipay.sofa.rpc.event.ClientBeforeSendEvent;
import com.alipay.sofa.rpc.event.ClientSyncReceiveEvent;
import com.alipay.sofa.rpc.event.EventBus;
Expand Down Expand Up @@ -101,7 +100,7 @@ public void connect() {
}

protected TripleClientInvoker buildClientInvoker() {
return new TripleClientInvoker(transportConfig.getConsumerConfig(), channel);
return new TripleClientInvoker(transportConfig.getConsumerConfig(), providerInfo, channel);
}

@Override
Expand Down Expand Up @@ -161,7 +160,6 @@ public int currentRequests() {

@Override
public ResponseFuture asyncSend(SofaRequest request, int timeout) throws SofaRpcException {
SofaResponse sofaResponse = null;
SofaRpcException throwable = null;

try {
Expand All @@ -178,9 +176,8 @@ public ResponseFuture asyncSend(SofaRequest request, int timeout) throws SofaRpc
throwable = convertToRpcException(e);
throw throwable;
} finally {
if (EventBus.isEnable(ClientSyncReceiveEvent.class)) {
EventBus.post(new ClientSyncReceiveEvent(transportConfig.getConsumerConfig(),
transportConfig.getProviderInfo(), request, sofaResponse, throwable));
if (EventBus.isEnable(ClientAfterSendEvent.class)) {
EventBus.post(new ClientAfterSendEvent(request));
}
}

Expand All @@ -194,9 +191,7 @@ public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcExc

try {
RpcInternalContext context = RpcInternalContext.getContext();

beforeSend(context, request);

RpcInvokeContext invokeContext = RpcInvokeContext.getContext();
invokeContext.put(TripleContants.SOFA_REQUEST_KEY, request);
invokeContext.put(TripleContants.SOFA_CONSUMER_CONFIG_KEY, transportConfig.getConsumerConfig());
Expand All @@ -206,6 +201,9 @@ public SofaResponse syncSend(SofaRequest request, int timeout) throws SofaRpcExc
throwable = convertToRpcException(e);
throw throwable;
} finally {
if (EventBus.isEnable(ClientAfterSendEvent.class)) {
EventBus.post(new ClientAfterSendEvent(request));
}
if (EventBus.isEnable(ClientSyncReceiveEvent.class)) {
EventBus.post(new ClientSyncReceiveEvent(transportConfig.getConsumerConfig(),
transportConfig.getProviderInfo(), request, sofaResponse, throwable));
Expand Down

0 comments on commit e203f2b

Please sign in to comment.