Skip to content

Commit

Permalink
fix apache#466 add run oldest task reject policy (apache#467)
Browse files Browse the repository at this point in the history
* fix apache#466 add run oldest task reject policy

* fix apache#466 optimize policy when queue is full all the time
  • Loading branch information
github-ygy authored and slievrly committed Feb 27, 2019
1 parent 1ecc798 commit 5a45f5a
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.alibaba.fescar.common.thread;
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
* policys for RejectedExecutionHandler
*
* Created by guoyao on 2019/2/26.
*/
public final class RejectedPolicys {

/**
* when rejected happened ,add the new task and run the oldest task
* @return
*/
public static RejectedExecutionHandler runsOldestTaskPolicy() {
return new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
return;
}
BlockingQueue<Runnable> workQueue=executor.getQueue();
Runnable firstWork=workQueue.poll();
boolean newTaskAdd=workQueue.offer(r);
if (firstWork != null) {
firstWork.run();
}
if (!newTaskAdd) {
executor.execute(r);
}
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 1999-2018 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fescar.common.thread;

import org.junit.Test;
import org.testng.Assert;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by guoyao on 2019/2/26.
*/
public class RejectedPolicysTest {

private final int DEFAULT_CORE_POOL_SIZE=1;
private final int DEFAULT_KEEP_ALIVE_TIME=10;
private final int MAX_QUEUE_SIZE=1;

@Test
public void testRunsOldestTaskPolicy() throws Exception {
AtomicInteger atomicInteger=new AtomicInteger();
ThreadPoolExecutor poolExecutor=
new ThreadPoolExecutor(DEFAULT_CORE_POOL_SIZE, DEFAULT_CORE_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(MAX_QUEUE_SIZE),
new NamedThreadFactory("OldestRunsPolicy", DEFAULT_CORE_POOL_SIZE), RejectedPolicys.runsOldestTaskPolicy());
CountDownLatch downLatch1=new CountDownLatch(1);
CountDownLatch downLatch2=new CountDownLatch(1);
//task1
poolExecutor.execute(new Runnable() {
@Override
public void run() {
try {
//wait the oldest task of queue count down
downLatch1.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicInteger.getAndAdd(1);
}
});
Assert.assertEquals(atomicInteger.get(), 0);
//task2
poolExecutor.execute(new Runnable() {
@Override
public void run() {
// run second
atomicInteger.getAndAdd(2);
}
});
//task3
poolExecutor.execute(new Runnable() {
@Override
public void run() {
downLatch2.countDown();
atomicInteger.getAndAdd(3);
}
});
//only the task2 run which is the oldest task of queue
Assert.assertEquals(atomicInteger.get(), 2);
downLatch1.countDown();
downLatch2.await();
//run task3
Assert.assertEquals(atomicInteger.get(), 6);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,22 @@

package com.alibaba.fescar.core.rpc.netty;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alibaba.fescar.common.XID;
import com.alibaba.fescar.common.exception.FrameworkErrorCode;
import com.alibaba.fescar.common.exception.FrameworkException;
import com.alibaba.fescar.common.thread.NamedThreadFactory;
import com.alibaba.fescar.common.thread.RejectedPolicys;
import com.alibaba.fescar.common.util.NetUtil;
import com.alibaba.fescar.config.Configuration;
import com.alibaba.fescar.config.ConfigurationFactory;
import com.alibaba.fescar.core.context.RootContext;
import com.alibaba.fescar.core.protocol.AbstractMessage;
import com.alibaba.fescar.core.protocol.HeartbeatMessage;
import com.alibaba.fescar.core.protocol.RegisterTMRequest;
import com.alibaba.fescar.core.protocol.RegisterTMResponse;
import com.alibaba.fescar.core.protocol.ResultCode;
import com.alibaba.fescar.core.protocol.*;
import com.alibaba.fescar.core.protocol.transaction.GlobalBeginResponse;
import com.alibaba.fescar.core.rpc.netty.NettyPoolKey.TransactionRole;
import com.alibaba.fescar.core.service.ConfigurationKeys;
import com.alibaba.fescar.discovery.loadbalance.LoadBalanceFactory;
import com.alibaba.fescar.discovery.registry.RegistryFactory;
import com.alibaba.nacos.client.naming.utils.CollectionUtils;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -57,6 +42,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.alibaba.fescar.common.exception.FrameworkErrorCode.NoAvailableService;

/**
Expand Down Expand Up @@ -128,7 +118,7 @@ public static TmRpcClient getInstance() {
new LinkedBlockingQueue(MAX_QUEUE_SIZE),
new NamedThreadFactory(nettyClientConfig.getTmDispatchThreadPrefix(),
nettyClientConfig.getClientWorkerThreads()),
new ThreadPoolExecutor.CallerRunsPolicy());
RejectedPolicys.runsOldestTaskPolicy());
instance = new TmRpcClient(nettyClientConfig, null, threadPoolExecutor);
}
}
Expand Down

0 comments on commit 5a45f5a

Please sign in to comment.