Skip to content

Commit

Permalink
Merge pull request weibocom#286 from weibocom/feature/localMethodInvoke
Browse files Browse the repository at this point in the history
Feature/local method invoke
  • Loading branch information
rayzhang0603 authored Nov 25, 2016
2 parents f39e247 + fcc4983 commit 1ed0a4e
Show file tree
Hide file tree
Showing 19 changed files with 160 additions and 40 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/logs/
*/*/logs/
*/target/
bin/

# maven ignore
target/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;

import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
Expand All @@ -43,14 +44,13 @@
*/
@SpiMeta(name = "activeWeight")
public class ActiveWeightLoadBalance<T> extends AbstractLoadBalance<T> {
private static Random random = new Random();

@Override
protected Referer<T> doSelect(Request request) {
List<Referer<T>> referers = getReferers();

int refererSize = referers.size();
int startIndex = random.nextInt(refererSize);
int startIndex = ThreadLocalRandom.current().nextInt(refererSize);
int currentCursor = 0;
int currentAvailableCursor = 0;

Expand Down Expand Up @@ -83,7 +83,7 @@ protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder)
List<Referer<T>> referers = getReferers();

int refererSize = referers.size();
int startIndex = random.nextInt(refererSize);
int startIndex = ThreadLocalRandom.current().nextInt(refererSize);
int currentCursor = 0;
int currentAvailableCursor = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.CollectionUtil;
import com.weibo.api.motan.util.LoggerUtil;
import com.weibo.api.motan.util.MathUtil;

import org.apache.commons.lang3.StringUtils;

import java.util.*;
Expand Down Expand Up @@ -200,7 +202,7 @@ Referer<T> next() {
String group = randomKeyList.get(ThreadLocalRandom.current().nextInt(randomKeySize));
AtomicInteger ai = cursors.get(group);
List<Referer<T>> referers = groupReferers.get(group);
return referers.get(ai.getAndIncrement() % referers.size());
return referers.get(MathUtil.getPositive(ai.getAndIncrement()) % referers.size());
}

// 求最大公约数
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.MathUtil;

/**
*
Expand Down Expand Up @@ -82,11 +83,13 @@ protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder)
}

private int getHash(Request request) {
int hashcode;
if (request.getArguments() == null || request.getArguments().length == 0) {
return 0x7fffffff & request.hashCode();
hashcode = request.hashCode();
} else {
return 0x7fffffff & Arrays.hashCode(request.getArguments());
hashcode = Arrays.hashCode(request.getArguments());
}
return MathUtil.getPositive(hashcode);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.weibo.api.motan.util.NetUtils;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;

/**
* "本地服务优先" 负载均衡
Expand All @@ -42,7 +43,6 @@
@SpiMeta(name = "localFirst")
public class LocalFirstLoadBalance<T> extends AbstractLoadBalance<T> {
public static final int MAX_REFERER_COUNT = 10;
private static Random random = new Random();

public static long ipToLong(final String addr) {
final String[] addressBytes = addr.split("\\.");
Expand Down Expand Up @@ -107,7 +107,7 @@ protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder)
}

int refererSize = referers.size();
int startIndex = random.nextInt(refererSize);
int startIndex = ThreadLocalRandom.current().nextInt(refererSize);
int currentCursor = 0;
int currentAvailableCursor = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.weibo.api.motan.cluster.loadbalance;

import java.util.List;
import java.util.concurrent.ThreadLocalRandom;

import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
Expand All @@ -36,7 +37,7 @@ public class RandomLoadBalance<T> extends AbstractLoadBalance<T> {
protected Referer<T> doSelect(Request request) {
List<Referer<T>> referers = getReferers();

int idx = (int) (Math.random() * referers.size());
int idx = (int) (ThreadLocalRandom.current().nextDouble() * referers.size());
for (int i = 0; i < referers.size(); i++) {
Referer<T> ref = referers.get((i + idx) % referers.size());
if (ref.isAvailable()) {
Expand All @@ -50,7 +51,7 @@ protected Referer<T> doSelect(Request request) {
protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder) {
List<Referer<T>> referers = getReferers();

int idx = (int) (Math.random() * referers.size());
int idx = (int) (ThreadLocalRandom.current().nextDouble() * referers.size());
for (int i = 0; i < referers.size(); i++) {
Referer<T> referer = referers.get((i + idx) % referers.size());
if (referer.isAvailable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.weibo.api.motan.core.extension.SpiMeta;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Request;
import com.weibo.api.motan.util.MathUtil;

/**
*
Expand Down Expand Up @@ -65,6 +66,6 @@ protected void doSelectToHolder(Request request, List<Referer<T>> refersHolder)

// get positive int
private int getNextPositive() {
return 0x7fffffff & idx.incrementAndGet();
return MathUtil.getPositive(idx.incrementAndGet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
*/
@SpiMeta(name = "statistic")
public class AccessStatisticFilter implements Filter {
protected static Application RPC_SERVICES = new Application(ApplicationInfo.STATISTIC, "rpc_service");

@Override
public Response filter(Caller<?> caller, Request request) {
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -68,7 +70,7 @@ public Response filter(Caller<?> caller, Request request) {
String statName =
caller.getUrl().getProtocol() + MotanConstants.PROTOCOL_SEPARATOR + MotanFrameworkUtil.getGroupMethodString(request);
if (caller instanceof Provider) {
application = new Application(ApplicationInfo.STATISTIC, "rpc_service");
application = RPC_SERVICES;
StatsUtil.accessStatistic(statName, application, end, end - start, bizProcessTime, accessStatus);
}
application = ApplicationInfo.getApplication(caller.getUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.weibo.api.motan.exception.MotanFrameworkException;
import com.weibo.api.motan.rpc.*;
import com.weibo.api.motan.util.ReflectUtil;

import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand Down Expand Up @@ -135,7 +137,8 @@ public Response filter(Caller<?> caller, Request request) {
return response;
}

private Object invoke(Object clz, Method method, Object[] args, MockInfo info) throws InterruptedException, InvocationTargetException, IllegalAccessException {
private Object invoke(Object clz, Method method, Object[] args, MockInfo info) throws InterruptedException, InvocationTargetException,
IllegalAccessException {

info.callNum.addAndGet(1);

Expand All @@ -151,7 +154,7 @@ private long caclSleepTime(MockInfo info) {

long sleepTime;

int n = new Random().nextInt(1000);
int n = ThreadLocalRandom.current().nextInt(1000);

long delta = (long) (rMean - info.mean + 1);
if (n < 900) {
Expand All @@ -175,7 +178,7 @@ private long caclSleepTime(MockInfo info) {
while (info.errorRate * rate < 1) {
rate *= 10;
}
if (new Random().nextInt(rate) == 1) {
if (ThreadLocalRandom.current().nextInt(rate) == 1) {
throw new RuntimeException();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.weibo.api.motan.exception.MotanServiceException;
import com.weibo.api.motan.rpc.ApplicationInfo;
import com.weibo.api.motan.rpc.DefaultRequest;
import com.weibo.api.motan.rpc.Referer;
import com.weibo.api.motan.rpc.Response;
import com.weibo.api.motan.switcher.Switcher;
import com.weibo.api.motan.switcher.SwitcherService;
Expand Down Expand Up @@ -75,6 +76,12 @@ private void init() {
}

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(isLocalMethod(method)){
if("toString".equals(method.getName())){
return clustersToString();
}
throw new MotanServiceException("can not invoke local method:" + method.getName());
}
DefaultRequest request = new DefaultRequest();

request.setRequestId(RequestIdGenerator.getRequestId());
Expand Down Expand Up @@ -136,6 +143,35 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
+ MotanFrameworkUtil.toString(request), MotanErrorMsgConstant.SERVICE_UNFOUND);

}

/**
* tostring,equals,hashCode,finalize等接口未声明的方法不进行远程调用
* @param method
* @return
*/
public boolean isLocalMethod(Method method){
if(method.getDeclaringClass().equals(Object.class)){
try{
Method interfaceMethod = clz.getDeclaredMethod(method.getName(), method.getParameterTypes());
return false;
}catch(Exception e){
return true;
}
}
return false;
}

private String clustersToString(){
StringBuilder sb = new StringBuilder();
for(Cluster<T> cluster: clusters){
sb.append("{protocol:").append(cluster.getUrl().getProtocol());
for(Referer<T> refer : (List<Referer<T>>)cluster.getReferers()){
sb.append("[").append(refer.getUrl().toSimpleString()).append(", available:").append(refer.isAvailable()).append("]");
}
sb.append("}");
}
return sb.toString();
}

private Object getDefaultReturnValue(Class<?> returnType) {
if (returnType != null && returnType.isPrimitive()) {
Expand Down
7 changes: 6 additions & 1 deletion motan-core/src/main/java/com/weibo/api/motan/rpc/URL.java
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,12 @@ public boolean canServe(URL refUrl) {
if (!version.equals(refVersion)) {
return false;
}

// check serialize
String serialize = getParameter(URLParamType.serialize.getName(), URLParamType.serialize.getValue());
String refSerialize = refUrl.getParameter(URLParamType.serialize.getName(), URLParamType.serialize.getValue());
if (!serialize.equals(refSerialize)) {
return false;
}
// 由于需要提供跨group访问rpc的能力,所以不再验证group是否一致。
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ProviderProtectedMessageRouter extends ProviderMessageRouter {
protected ConcurrentMap<String, AtomicInteger> requestCounters = new ConcurrentHashMap<String, AtomicInteger>();
protected AtomicInteger totalCounter = new AtomicInteger(0);

protected Object lock = new Object();


public ProviderProtectedMessageRouter() {
super();
Expand All @@ -73,11 +73,8 @@ protected Response call(Request request, Provider<?> provider) {

try {
int requestCounter = 0, totalCounter = 0;
synchronized (lock) {
requestCounter = incrRequestCounter(requestKey);
totalCounter = incrTotalCounter();
}

requestCounter = incrRequestCounter(requestKey);
totalCounter = incrTotalCounter();
if (isAllowRequest(requestCounter, totalCounter, maxThread, request)) {
return super.call(request, provider);
} else {
Expand All @@ -86,10 +83,8 @@ protected Response call(Request request, Provider<?> provider) {
}

} finally {
synchronized (lock) {
decrTotalCounter();
decrRequestCounter(requestKey);
}
decrTotalCounter();
decrRequestCounter(requestKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ConcurrentHashSet<E> extends AbstractSet<E> implements Set<E>, java.io.Serializable {

private static final long serialVersionUID = -8672117787651310382L;

private static final Object PRESENT = new Object();

private final ConcurrentHashMap<E, Object> map;
private final ConcurrentMap<E, Object> map;

public ConcurrentHashSet() {
map = new ConcurrentHashMap<E, Object>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,13 @@ public static int parseInt(String intStr, int defaultValue) {
return defaultValue;
}
}

/**
* return positive int value of originValue
* @param originValue
* @return positive int
*/
public static int getPositive(int originValue){
return 0x7fffffff & originValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,17 @@ public static boolean isValidAddress(InetAddress address) {
String name = address.getHostAddress();
return (name != null && !ANYHOST.equals(name) && !LOCALHOST.equals(name) && IP_PATTERN.matcher(name).matches());
}

//return ip to avoid lookup dns
public static String getHostName(SocketAddress socketAddress) {
if (socketAddress == null) {
return null;
}

if (socketAddress instanceof InetSocketAddress) {
return ((InetSocketAddress) socketAddress).getHostName();
InetAddress addr = ((InetSocketAddress) socketAddress).getAddress();
if(addr != null){
return addr.getHostAddress();
}
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private void allAvailableCluster(int refererSize) {
if (refererSize <= ActiveWeightLoadBalance.MAX_REFERER_COUNT) {
Assert.assertEquals(referer.activeRefererCount(), lowActive);
} else {
Assert.assertTrue(refererSize - ActiveWeightLoadBalance.MAX_REFERER_COUNT >= referer.activeRefererCount());
Assert.assertTrue(refererSize - ActiveWeightLoadBalance.MAX_REFERER_COUNT - 1 >= referer.activeRefererCount());
}

List<Referer> referersHolder = new ArrayList<Referer>();
Expand Down Expand Up @@ -102,7 +102,7 @@ private void partOfUnAvailableCluster(int refererSize, int unAvailableSize) {
if (availableSize <= ActiveWeightLoadBalance.MAX_REFERER_COUNT) {
Assert.assertTrue(referer.activeRefererCount() - lowActive - unAvailableSize <= 0);
} else {
Assert.assertTrue(refererSize - ActiveWeightLoadBalance.MAX_REFERER_COUNT + unAvailableSize >= referer.activeRefererCount());
Assert.assertTrue(refererSize - ActiveWeightLoadBalance.MAX_REFERER_COUNT + unAvailableSize -1 >= referer.activeRefererCount());
}

List<Referer> referersHolder = new ArrayList<Referer>();
Expand Down
Loading

0 comments on commit 1ed0a4e

Please sign in to comment.