Skip to content

Commit

Permalink
Adjust the order of events and record elapsed time when async invoke. (
Browse files Browse the repository at this point in the history
  • Loading branch information
ujjboy authored May 17, 2018
1 parent def547f commit 1b74d34
Show file tree
Hide file tree
Showing 10 changed files with 273 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,16 @@ else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
request.setSofaResponseCallback(methodResponseCallback);
}
}
// 记录发送开始时间
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
// 开始调用
transport.asyncSend(request, timeout);
response = buildEmptyResponse(request);
}
// Future调用
else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
// 记录发送开始时间
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
// 开始调用
ResponseFuture future = transport.asyncSend(request, timeout);
// 放入线程上下文
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
throw e;
} finally {
// 产生调用结束事件
if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
EventBus.post(new ClientEndInvokeEvent(request, response, throwable));
if (!request.isAsync()) {
if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
EventBus.post(new ClientEndInvokeEvent(request, response, throwable));
}
}
}
// 包装响应
Expand Down
26 changes: 16 additions & 10 deletions core/api/src/main/java/com/alipay/sofa/rpc/common/RpcConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,11 @@ public class RpcConstants {
public static final String HIDDEN_KEY_DESTROY = HIDE_KEY_PREFIX + "destroy";

/**
* 内部使用的key:_app_name
* 内部使用的key:_app_name,string
*/
public static final String INTERNAL_KEY_APP_NAME = INTERNAL_KEY_PREFIX + "app_name";
/**
* 内部使用的key:_protocol_name
* 内部使用的key:_protocol_name,string
*/
public static final String INTERNAL_KEY_PROTOCOL_NAME = INTERNAL_KEY_PREFIX + "protocol_name";
/**
Expand Down Expand Up @@ -230,27 +230,33 @@ public class RpcConstants {
*/
public static final String INTERNAL_KEY_RESP_DESERIALIZE_TIME = INTERNAL_KEY_PREFIX + "resp_des_time";
/**
* 内部使用的key:_process_wait_time 在业务线程池里等待时间
* 内部使用的key:_process_wait_time 在业务线程池里等待时间,long
*/
public static final String INTERNAL_KEY_PROCESS_WAIT_TIME = INTERNAL_KEY_PREFIX + "process_wait_time";
/**
* 内部使用的key:_conn_create_time 长连接建立时间 需要一个 (long) 类型数据
* 内部使用的key:_conn_create_time 长连接建立时间long
*/
public static final String INTERNAL_KEY_CONN_CREATE_TIME = INTERNAL_KEY_PREFIX + "conn_create_time";
/**
* 内部使用的key:_impl_elapse 业务代码执行耗时
* 内部使用的key:_impl_elapse 业务代码执行耗时,long
*/
public static final String INTERNAL_KEY_IMPL_ELAPSE = INTERNAL_KEY_PREFIX + "impl_elapse";
/**
* 内部使用的key:_client_elapse 客户端总耗时
* 内部使用的key:_client_elapse 客户端总耗时,long
*/
public static final String INTERNAL_KEY_CLIENT_ELAPSE = INTERNAL_KEY_PREFIX + "client_elapse";
/**
* 内部使用的key:_router_record 路由记录
* 内部使用的key:_client_send_time 客户端发送时间戳,long
*
* @since 5.4.0
*/
public static final String INTERNAL_KEY_CLIENT_SEND_TIME = INTERNAL_KEY_PREFIX + "client_send_time";
/**
* 内部使用的key:_router_record 路由记录,string
*/
public static final String INTERNAL_KEY_ROUTER_RECORD = INTERNAL_KEY_PREFIX + "router_record";
/**
* 内部使用的key:_invoke_times 调用次数
* 内部使用的key:_invoke_times 调用次数,int
*/
public static final String INTERNAL_KEY_INVOKE_TIMES = INTERNAL_KEY_PREFIX + "invoke_times";
/**
Expand Down Expand Up @@ -285,7 +291,7 @@ public class RpcConstants {
*/
public static final String CONFIG_KEY_GENERIC = "generic";
/**
* 配置key:async
* 配置key:invokeType
*/
public static final String CONFIG_KEY_INVOKE_TYPE = "invokeType";
/**
Expand All @@ -304,7 +310,7 @@ public class RpcConstants {
public static final String CONFIG_KEY_CONCURRENTS = "concurrents";

/**
* 配置key:params
* 配置key:parameters
*/
public static final String CONFIG_KEY_PARAMS = "parameters";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* 基于ThreadLocal的内部使用的上下文传递。一般存在于:客户端请求线程、服务端业务线程池、客户端异步线程<br>
Expand Down Expand Up @@ -168,7 +168,7 @@ protected RpcInternalContext() {
*
* @see #ATTACHMENT_ENABLE
*/
private Map<String, Object> attachments = new HashMap<String, Object>();
private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();

/**
* The Stopwatch
Expand Down Expand Up @@ -339,7 +339,7 @@ public String getRemoteHostName() {
* @return attachment attachment
*/
public Object getAttachment(String key) {
return attachments.get(key);
return key == null ? null : attachments.get(key);
}

/**
Expand Down Expand Up @@ -376,11 +376,10 @@ public RpcInternalContext setAttachment(String key, Object value) {
* remove attachment.
*
* @param key the key
* @return context rpc context
* @return Old value
*/
public RpcInternalContext removeAttachment(String key) {
attachments.remove(key);
return this;
public Object removeAttachment(String key) {
return attachments.remove(key);
}

/**
Expand Down Expand Up @@ -434,7 +433,7 @@ public StopWatch getStopWatch() {
public void clear() {
this.setRemoteAddress(null).setLocalAddress(null).setFuture(null).setProviderSide(null)
.setProviderInfo(null).setInterfaceConfig(null);
this.attachments = new HashMap<String, Object>();
this.attachments = new ConcurrentHashMap<String, Object>();
this.stopWatch.reset();
}

Expand Down Expand Up @@ -484,7 +483,7 @@ public RpcInternalContext setInterfaceConfig(AbstractInterfaceConfig interfaceCo

@Override
public String toString() {
return "RpcInternalContext{" +
return super.toString() + "{" +
"future=" + future +
", localAddress=" + localAddress +
", remoteAddress=" + remoteAddress +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@
*/
package com.alipay.sofa.rpc.context;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.client.ProviderHelper;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.message.ResponseFuture;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -35,9 +39,18 @@
*/
public class RpcInternalContextTest {

@Before
public void before() {
RpcInternalContext.removeAllContext();
}

@After
public void after() {
RpcInternalContext.removeAllContext();
}

@Test
public void testPop() {

RpcInternalContext.pushContext();

RpcInternalContext.popContext();
Expand Down Expand Up @@ -73,6 +86,47 @@ public void testPop() {
Assert.assertEquals(RpcInternalContext.getContext().getRemoteAddress().toString(), "127.0.0.1:12200");
}

@Test
public void testAddress() {
RpcInternalContext context = RpcInternalContext.getContext();
context.setLocalAddress(null, 80);
context.setLocalAddress("127.0.0.1", -1);
context.setRemoteAddress(null, 80);
context.setRemoteAddress("127.0.0.1", -1);
Assert.assertTrue(context.getRemoteAddress().getPort() == 0);
Assert.assertEquals("127.0.0.1", context.getRemoteHostName());
}

@Test
public void testAttachment() {
Assert.assertTrue(RpcInternalContext.isAttachmentEnable());
RpcInternalContext context = RpcInternalContext.getContext();
boolean error = false;
try {
context.setAttachment("1", "1");
} catch (Exception e) {
error = true;
}
Assert.assertTrue(error);

Map<String, Object> map = new HashMap<String, Object>();
map.put("_11", "1111");
map.put("_22", "2222");
map.put(".33", "3333");
context.setAttachments(map);
Assert.assertEquals("1111", context.getAttachment("_11"));
context.setAttachment(null, "22222");
context.setAttachment("_22", null);
Assert.assertNull(context.getAttachment(null));
Assert.assertNull(context.getAttachment("_22"));

Assert.assertNull(context.removeAttachment("_33"));
Assert.assertEquals("3333", context.removeAttachment(".33"));

context.clearAttachments();
Assert.assertNull(context.removeAttachment("11"));
}

@Test
public void testClear() {
RpcInternalContext context = RpcInternalContext.getContext();
Expand Down Expand Up @@ -116,7 +170,7 @@ public ResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
}
});

context.setProviderInfo(ProviderInfo.valueOf("127.0.0.1:80"));
context.setProviderInfo(ProviderHelper.toProviderInfo("127.0.0.1:80"));
context.setInterfaceConfig(new ProviderConfig());
context.setAttachment("_xxxx", "yyyy");

Expand All @@ -129,7 +183,24 @@ public ResponseFuture addListener(SofaResponseCallback sofaResponseCallback) {
Assert.assertNull(context.getProviderInfo());
Assert.assertNull(context.getInterfaceConfig());
Assert.assertTrue(context.getAttachments().isEmpty());
Assert.assertNotNull(context.getStopWatch());
Assert.assertTrue(context.getStopWatch().read() == 0);

RpcInternalContext.removeAllContext();
Assert.assertNotNull(context.toString());
}

@Test
public void testKey() {
Assert.assertTrue(RpcInternalContext.isValidInternalParamKey("."));
Assert.assertTrue(RpcInternalContext.isValidInternalParamKey(".xx"));
Assert.assertTrue(RpcInternalContext.isValidInternalParamKey("_"));
Assert.assertTrue(RpcInternalContext.isValidInternalParamKey("_xx"));
Assert.assertFalse(RpcInternalContext.isHiddenParamKey("aaaa"));

Assert.assertTrue(RpcInternalContext.isHiddenParamKey("."));
Assert.assertTrue(RpcInternalContext.isHiddenParamKey(".xx"));
Assert.assertFalse(RpcInternalContext.isHiddenParamKey("_"));
Assert.assertFalse(RpcInternalContext.isHiddenParamKey("_xx"));
Assert.assertFalse(RpcInternalContext.isHiddenParamKey("aaaa"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.message.bolt;

import com.alipay.remoting.InvokeCallback;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.context.BaggageResolver;
import com.alipay.sofa.rpc.context.RpcInternalContext;
import com.alipay.sofa.rpc.context.RpcInvokeContext;
import com.alipay.sofa.rpc.context.RpcRuntimeContext;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.core.response.SofaResponse;

/**
* @author <a href="mailto:[email protected]">GengZhang</a>
* @since 5.4.0
*/
public abstract class AbstractInvokeCallback implements InvokeCallback {
/**
* 服务消费者配置
*/
protected final ConsumerConfig consumerConfig;
/**
* 服务提供者信息
*/
protected final ProviderInfo providerInfo;
/**
* 请求
*/
protected final SofaRequest request;
/**
* 请求运行时的ClassLoader
*/
protected ClassLoader classLoader;
/**
* 线程上下文
*/
protected RpcInternalContext context;

protected AbstractInvokeCallback(ConsumerConfig consumerConfig, ProviderInfo providerInfo, SofaRequest request,
RpcInternalContext context, ClassLoader classLoader) {
this.consumerConfig = consumerConfig;
this.providerInfo = providerInfo;
this.request = request;
this.context = context;
this.classLoader = classLoader;
}

protected void recordClientElapseTime() {
if (context != null) {
Long startTime = (Long) context.removeAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME);
if (startTime != null) {
context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, RpcRuntimeContext.now() - startTime);
}
}
}

protected void pickupBaggage(SofaResponse response) {
if (RpcInvokeContext.isBaggageEnable()) {
RpcInvokeContext invokeCtx = null;
if (context != null) {
invokeCtx = (RpcInvokeContext) context.getAttachment(RpcConstants.HIDDEN_KEY_INVOKE_CONTEXT);
}
if (invokeCtx == null) {
invokeCtx = RpcInvokeContext.getContext();
} else {
RpcInvokeContext.setContext(invokeCtx);
}
BaggageResolver.pickupFromResponse(invokeCtx, response);
}
}
}
Loading

0 comments on commit 1b74d34

Please sign in to comment.