Skip to content

Commit

Permalink
Fix reconnect of bolt/rest/h2c . (sofastack#180)
Browse files Browse the repository at this point in the history
  • Loading branch information
ujjboy authored Jun 14, 2018
1 parent b1232c7 commit 53569c0
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void init(ServerConfig serverConfig) {
// 启动线程池
this.bizThreadPool = initThreadPool(serverConfig);
// 服务端处理器
this.serverHandler = new HttpServerHandler(bizThreadPool);
this.serverHandler = new HttpServerHandler();

// set default transport config
this.serverTransportConfig.setContainer(container);
Expand Down Expand Up @@ -118,6 +118,9 @@ public void start() {
return;
}
try {
// 启动线程池
this.bizThreadPool = initThreadPool(serverConfig);
this.serverHandler.setBizThreadPool(bizThreadPool);
serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
started = serverTransport.start();

Expand Down Expand Up @@ -156,6 +159,14 @@ public void stop() {
// 关闭端口,不关闭线程池
serverTransport.stop();
serverTransport = null;

// 关闭线程池
if (bizThreadPool != null) {
bizThreadPool.shutdown();
bizThreadPool = null;
serverHandler.setBizThreadPool(null);
}

started = false;

if (EventBus.isEnable(ServerStoppedEvent.class)) {
Expand Down Expand Up @@ -216,12 +227,6 @@ public void destroy() {
}

stop();

// 关闭线程池
if (bizThreadPool != null) {
bizThreadPool.shutdown();
}

serverHandler = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ public AtomicInteger getProcessingCount() {
return processingCount;
}

public HttpServerHandler(ThreadPoolExecutor bizThreadPool) {
this.bizThreadPool = bizThreadPool;
}

@Override
public void registerChannel(AbstractChannel nettyChannel) {

Expand Down Expand Up @@ -139,4 +135,9 @@ public boolean checkService(String serviceName, String methodName) {
public ThreadPoolExecutor getBizThreadPool() {
return bizThreadPool;
}

public HttpServerHandler setBizThreadPool(ThreadPoolExecutor bizThreadPool) {
this.bizThreadPool = bizThreadPool;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,14 @@ public AbstractHttp2ClientTransport(ClientTransportConfig transportConfig) {
* 服务端提供者信息
*/
protected final ProviderInfo providerInfo;
/**
* Start from 3 (because 1 is setting stream)
*/
private final static int START_STREAM_ID = 3;
/**
* StreamId, start from 3 (because 1 is setting stream)
*/
protected final AtomicInteger streamId = new AtomicInteger(3);
protected final AtomicInteger streamId = new AtomicInteger();
/**
* 正在发送的调用数量
*/
Expand All @@ -131,6 +135,9 @@ public AbstractHttp2ClientTransport(ClientTransportConfig transportConfig) {

@Override
public void connect() {
if (isAvailable()) {
return;
}
EventLoopGroup workerGroup = NettyHelper.getClientIOEventLoopGroup();
Http2ClientInitializer initializer = new Http2ClientInitializer(transportConfig);
try {
Expand All @@ -152,6 +159,8 @@ public void connect() {
http2SettingsHandler.awaitSettings(transportConfig.getConnectTimeout(), TimeUnit.MILLISECONDS);

responseChannelHandler = initializer.responseHandler();
// RESET streamId
streamId.set(START_STREAM_ID);
} catch (Exception e) {
throw new SofaRpcException(RpcErrorType.CLIENT_NETWORK, e);
}
Expand Down Expand Up @@ -295,8 +304,8 @@ protected void doSend(final SofaRequest request, AbstractHttpClientHandler callb
TIMEOUT_TIMER.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
Map.Entry<ChannelFuture, AbstractHttpClientHandler> entry =
responseChannelHandler.removePromise(requestId);
Map.Entry<ChannelFuture, AbstractHttpClientHandler> entry = responseChannelHandler
.removePromise(requestId);
if (entry != null) {
ClientHandler handler = entry.getValue();
Exception e = timeoutException(request, timeoutMills, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class RestServer implements Server {
@Override
public void init(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
httpServer = buildServer();
}

private SofaNettyJaxrsServer buildServer() {
Expand Down Expand Up @@ -136,7 +137,6 @@ public void start() {
}
// 绑定到端口
try {
httpServer = buildServer();
httpServer.start();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Start the http rest server at port {}", serverConfig.getPort());
Expand Down Expand Up @@ -170,7 +170,6 @@ public void stop() {
LOGGER.info("Stop the http rest server at port {}", serverConfig.getPort());
}
httpServer.stop();
httpServer = null;
} catch (Exception e) {
LOGGER.error("Stop the http rest server at port " + serverConfig.getPort() + " error !", e);
}
Expand Down Expand Up @@ -222,6 +221,7 @@ public void unRegisterProcessor(ProviderConfig providerConfig, boolean closeIfNo
@Override
public void destroy() {
stop();
httpServer = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ServerConfig;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
Expand Down Expand Up @@ -64,7 +63,7 @@
public class SofaNettyJaxrsServer implements EmbeddedJaxrsServer {

private final ServerConfig serverConfig;
protected ServerBootstrap bootstrap = new ServerBootstrap();
protected ServerBootstrap bootstrap = null;
protected String hostname = null;
protected int port = 8080;
protected ResteasyDeployment deployment = new SofaResteasyDeployment(); // CHANGE: 使用sofa的类
Expand Down Expand Up @@ -224,7 +223,7 @@ public void start() {
serverConfig.isDaemon()));
}
// Configure the server.
bootstrap
bootstrap = new ServerBootstrap()
.group(eventLoopGroup)
.channel(
(serverConfig != null && serverConfig.isEpoll()) ? EpollServerSocketChannel.class
Expand Down Expand Up @@ -293,5 +292,6 @@ public void stop() {
eventExecutor.shutdownGracefully().sync();
} catch (Exception ignore) { // NOPMD
}
bootstrap = null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.test;

import java.util.concurrent.Callable;

/**
* @author <a href="mailto:[email protected]">GengZhang</a>
*/
public class TestUtils {

public static <T> T delayGet(Callable<T> callable, T expect, int period, int times) {
T result = null;
int i = 0;
while (i++ < times) {
try {
Thread.sleep(period);
result = callable.call();
if (result != null && result.equals(expect)) {
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.client.bolt;

import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ApplicationConfig;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.test.ActivelyDestroyTest;
import com.alipay.sofa.rpc.test.HelloService;
import com.alipay.sofa.rpc.test.HelloServiceImpl;
import com.alipay.sofa.rpc.test.TestUtils;
import org.junit.Assert;

import java.util.concurrent.Callable;

/**
* @author <a href="mailto:[email protected]">GengZhang</a>
*/
public class BoltDirectUrlTest extends ActivelyDestroyTest {

// @Test
// FIXME 目前bolt的IO线程关闭时未释放,暂不支持本测试用例
public void testAll() {
// 只有2个线程 执行
ServerConfig serverConfig = new ServerConfig()
.setPort(12300)
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setDaemon(true);

// 发布一个服务,每个请求要执行1秒
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setRef(new HelloServiceImpl())
.setBootstrap("bolt")
.setApplication(new ApplicationConfig().setAppName("serverApp"))
.setServer(serverConfig)
.setRegister(false);
providerConfig.export();

final ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName())
.setDirectUrl("bolt://127.0.0.1:12300")
.setProtocol(RpcConstants.PROTOCOL_TYPE_BOLT)
.setBootstrap("bolt")
.setApplication(new ApplicationConfig().setAppName("clientApp"))
.setReconnectPeriod(1000);

HelloService helloService = consumerConfig.refer();

Assert.assertNotNull(helloService.sayHello("xx", 22));

serverConfig.getServer().stop();

// 关闭后再调用一个抛异常
try {
helloService.sayHello("xx", 22);
} catch (Exception e) {
// 应该抛出异常
Assert.assertTrue(e instanceof SofaRpcException);
}

Assert.assertTrue(TestUtils.delayGet(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return CommonUtils.isEmpty(consumerConfig.getConsumerBootstrap()
.getCluster().getConnectionHolder().getAvailableConnections());
}
}, true, 50, 40));

serverConfig.getServer().start();
// 等待客户端重连服务端
Assert.assertTrue(TestUtils.delayGet(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return CommonUtils.isNotEmpty(consumerConfig.getConsumerBootstrap()
.getCluster().getConnectionHolder().getAvailableConnections());
}
}, true, 50, 60));

Assert.assertNotNull(helloService.sayHello("xx", 22));
}
}
Loading

0 comments on commit 53569c0

Please sign in to comment.