Skip to content

Commit

Permalink
Fix async tracer. (sofastack#739)
Browse files Browse the repository at this point in the history
  • Loading branch information
leizhiyuan committed Aug 21, 2019
1 parent 3b3fab1 commit 90ca2b5
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.alipay.sofa.rpc.tracer.Tracers;

/**
*
* @author <a href="mailto:[email protected]">zhanggeng</a>
*/
public class SofaTracerSubscriber extends Subscriber {
Expand All @@ -34,17 +33,15 @@ public void onEvent(Event originEvent) {
if (eventClass == ClientStartInvokeEvent.class) {
ClientStartInvokeEvent event = (ClientStartInvokeEvent) originEvent;
Tracers.startRpc(event.getRequest());
}

else if (eventClass == ClientBeforeSendEvent.class) {
} else if (eventClass == ClientBeforeSendEvent.class) {
ClientBeforeSendEvent event = (ClientBeforeSendEvent) originEvent;
Tracers.clientBeforeSend(event.getRequest());
} else if (eventClass == ClientAfterSendEvent.class) {
// 异步发送完毕
ClientAfterSendEvent event = (ClientAfterSendEvent) originEvent;
Tracers.clientAsyncAfterSend(event.getRequest());
}

// else if (eventClass == ClientAfterSendEvent.class) {
// // 异步发送完毕
// }

// else if (eventClass == ClientSyncReceiveEvent.class) {
// // 同步返回结果
// }
Expand All @@ -55,31 +52,23 @@ else if (eventClass == ClientAsyncReceiveEvent.class) {
Tracers.clientAsyncReceivedPrepare();
// 记录收到返回
Tracers.clientReceived(event.getRequest(), event.getResponse(), event.getThrowable());
}

else if (eventClass == ClientEndInvokeEvent.class) {
} else if (eventClass == ClientEndInvokeEvent.class) {
ClientEndInvokeEvent event = (ClientEndInvokeEvent) originEvent;
if (!event.getRequest().isAsync()) {
// 因为同步调用重试行为,需要放到最后才能算 received
Tracers.clientReceived(event.getRequest(), event.getResponse(), event.getThrowable());
}
// 检查下状态
Tracers.checkState();
}

else if (eventClass == ServerReceiveEvent.class) {
} else if (eventClass == ServerReceiveEvent.class) {
ServerReceiveEvent event = (ServerReceiveEvent) originEvent;
// 接到请求
Tracers.serverReceived(event.getRequest());
}

else if (eventClass == ServerSendEvent.class) {
} else if (eventClass == ServerSendEvent.class) {
// 发送响应
ServerSendEvent event = (ServerSendEvent) originEvent;
Tracers.serverSend(event.getRequest(), event.getResponse(), event.getThrowable());
}

else if (eventClass == ServerEndHandleEvent.class) {
} else if (eventClass == ServerEndHandleEvent.class) {
// 检查下状态
Tracers.checkState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,28 +213,6 @@ public void clientBeforeSend(SofaRequest request) {
getEmptyStringIfNull(attachments, RpcSpanTags.REMOTE_IP));
request.addRequestProp(RemotingConstants.RPC_TRACE_NAME, oldTracerContext);
}

// 异步callback同步
if (request.isAsync()) {
//异步,这个时候除了缓存spanContext clientBeforeSendRequest() rpc 已经调用
//还需要这个时候需要还原回父 span
//弹出;不弹出的话当前线程就会一直是client了
clientSpan = sofaTraceContext.pop();
if (clientSpan != null) {
// Record client send event
clientSpan.log(LogData.CLIENT_SEND_EVENT_VALUE);
}
//将当前 span 缓存在 request 中,注意:这个只是缓存不需要序列化到服务端
rpcInternalContext.setAttachment(RpcConstants.INTERNAL_KEY_TRACER_SPAN, clientSpan);
if (clientSpan != null && clientSpan.getParentSofaTracerSpan() != null) {
//restore parent
sofaTraceContext.push(clientSpan.getParentSofaTracerSpan());
}
} else {
// Record client send event
clientSpan.log(LogData.CLIENT_SEND_EVENT_VALUE);
}

}

private String getEmptyStringIfNull(Map map, String key) {
Expand Down Expand Up @@ -569,7 +547,38 @@ private void generateServerErrorContext(Map<String, String> context, SofaRequest

@Override
public void clientAsyncAfterSend(SofaRequest request) {
//do nothing

//客户端的启动
SofaTraceContext sofaTraceContext = SofaTraceContextHolder.getSofaTraceContext();
//获取并不弹出
SofaTracerSpan clientSpan = sofaTraceContext.getCurrentSpan();
if (clientSpan == null) {
SelfLog.warn("ClientSpan is null.Before call interface=" + request.getInterfaceName() + ",method=" +
request.getMethodName());
return;
}
RpcInternalContext rpcInternalContext = RpcInternalContext.getContext();

// 异步callback同步
if (request.isAsync()) {
//异步,这个时候除了缓存spanContext clientBeforeSendRequest() rpc 已经调用
//还需要这个时候需要还原回父 span
//弹出;不弹出的话当前线程就会一直是client了
clientSpan = sofaTraceContext.pop();
if (clientSpan != null) {
// Record client send event
clientSpan.log(LogData.CLIENT_SEND_EVENT_VALUE);
}
//将当前 span 缓存在 request 中,注意:这个只是缓存不需要序列化到服务端
rpcInternalContext.setAttachment(RpcConstants.INTERNAL_KEY_TRACER_SPAN, clientSpan);
if (clientSpan != null && clientSpan.getParentSofaTracerSpan() != null) {
//restore parent
sofaTraceContext.push(clientSpan.getParentSofaTracerSpan());
}
} else {
// Record client send event
clientSpan.log(LogData.CLIENT_SEND_EVENT_VALUE);
}
}

@Override
Expand Down

0 comments on commit 90ca2b5

Please sign in to comment.