Skip to content

Commit

Permalink
客户端系统对服务端心跳探活.
Browse files Browse the repository at this point in the history
  • Loading branch information
magestacks committed Dec 8, 2021
1 parent e0af0ac commit d382802
Show file tree
Hide file tree
Showing 14 changed files with 295 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class Constants {

public static final String NULL = "";

public static final String UP = "UP";

public static final String ENCODE = "UTF-8";

public static final int CONFIG_LONG_POLL_TIMEOUT = 30000;
Expand All @@ -42,6 +44,8 @@ public class Constants {

public static final String MONITOR_PATH = BASE_PATH + "/monitor";

public static final String HEALTH_CHECK_PATH = BASE_PATH + "/health/check";

public static final String PROBE_MODIFY_REQUEST = "Listening-Configs";

public static final String LONG_PULLING_TIMEOUT = "Long-Pulling-Timeout";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package cn.hippo4j.console.controller;

import cn.hippo4j.common.constant.Constants;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.common.web.base.Results;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import static cn.hippo4j.common.constant.Constants.UP;

/**
* Health check controller.
*
* @author chen.ma
* @date 2021/12/8 21:02
*/
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping(Constants.BASE_PATH + "/health/check")
public class HealthCheckController {

@GetMapping
public Result<String> healthCheck() {
return Results.success(UP);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import cn.hippo4j.starter.monitor.MessageSender;
import cn.hippo4j.starter.monitor.ReportingEventExecutor;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.starter.remote.HttpScheduledHealthCheck;
import cn.hippo4j.starter.remote.ServerHealthCheck;
import cn.hippo4j.starter.toolkit.IdentifyUtil;
import cn.hippo4j.starter.toolkit.inet.InetUtils;
import cn.hutool.core.util.IdUtil;
Expand Down Expand Up @@ -58,9 +60,9 @@ public ApplicationContextHolder hippo4JApplicationContextHolder() {

@Bean
@SuppressWarnings("all")
public ConfigService configService(HttpAgent httpAgent, InetUtils hippo4JInetUtils) {
public ConfigService configService(HttpAgent httpAgent, InetUtils hippo4JInetUtils, ServerHealthCheck serverHealthCheck) {
String identify = IdentifyUtil.generate(environment, hippo4JInetUtils);
return new ThreadPoolConfigService(httpAgent, identify);
return new ThreadPoolConfigService(httpAgent, identify, serverHealthCheck);
}

@Bean
Expand Down Expand Up @@ -92,8 +94,15 @@ public HttpMvcSender httpMvcSender(HttpAgent httpAgent) {
}

@Bean
public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties, MessageSender messageSender) {
return new ReportingEventExecutor(properties, messageSender);
public ReportingEventExecutor reportingEventExecutor(BootstrapProperties properties, MessageSender messageSender,
ServerHealthCheck serverHealthCheck) {
return new ReportingEventExecutor(properties, messageSender, serverHealthCheck);
}

@Bean
@SuppressWarnings("all")
public ServerHealthCheck httpScheduledHealthCheck(HttpAgent httpAgent) {
return new HttpScheduledHealthCheck(httpAgent);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public HttpClientUtil hippo4JHttpClientUtil() {
}

@Bean
@SuppressWarnings("all")
public HttpAgent httpAgent(BootstrapProperties properties, HttpClientUtil hippo4JHttpClientUtil) {
return new ServerHttpAgent(properties, hippo4JHttpClientUtil);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import cn.hippo4j.common.toolkit.GroupKey;
import cn.hippo4j.common.web.base.Result;
import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.starter.remote.ServerHealthCheck;
import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import lombok.SneakyThrows;
Expand All @@ -18,7 +20,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static cn.hippo4j.common.constant.Constants.*;

Expand All @@ -39,21 +40,20 @@ public class ClientWorker implements DisposableBean {

private final String identification;

private final ServerHealthCheck serverHealthCheck;

private final ScheduledExecutorService executor;

private final ScheduledExecutorService executorService;

private AtomicBoolean isHealthServer = new AtomicBoolean(true);

private AtomicBoolean isHealthServerTemp = new AtomicBoolean(true);

private final ConcurrentHashMap<String, CacheData> cacheMap = new ConcurrentHashMap(16);

@SuppressWarnings("all")
public ClientWorker(HttpAgent httpAgent, String identification) {
public ClientWorker(HttpAgent httpAgent, String identification, ServerHealthCheck serverHealthCheck) {
this.agent = httpAgent;
this.identification = identification;
this.timeout = CONFIG_LONG_POLL_TIMEOUT;
this.serverHealthCheck = serverHealthCheck;

this.executor = Executors.newScheduledThreadPool(1, r -> {
Thread t = new Thread(r);
Expand All @@ -62,13 +62,9 @@ public ClientWorker(HttpAgent httpAgent, String identification) {
return t;
});

int threadSize = Runtime.getRuntime().availableProcessors();
this.executorService = Executors.newScheduledThreadPool(threadSize, r -> {
Thread t = new Thread(r);
t.setName("client.long.polling.executor");
t.setDaemon(true);
return t;
});
this.executorService = Executors.newSingleThreadScheduledExecutor(
ThreadFactoryBuilder.builder().prefix("client-long-polling-executor").daemon(true).build()
);

log.info("Client identity :: {}", identification);

Expand Down Expand Up @@ -102,25 +98,10 @@ public void destroy() throws Exception {

class LongPollingRunnable implements Runnable {

@SneakyThrows
private void checkStatus() {
if (Objects.equals(isHealthServerTemp.get(), Boolean.FALSE)
&& Objects.equals(isHealthServer.get(), Boolean.TRUE)) {
isHealthServerTemp.set(Boolean.TRUE);
log.info("🚀 The client reconnects to the server successfully.");
}
// 服务端状态不正常睡眠 30s
if (!isHealthServer.get()) {
isHealthServerTemp.set(Boolean.FALSE);
log.error("[Check config] Error. exception message, Thread sleep 30 s.");
Thread.sleep(30000);
}
}

@Override
@SneakyThrows
public void run() {
checkStatus();
serverHealthCheck.isHealthStatus();

List<CacheData> cacheDataList = new ArrayList();
List<String> inInitializingCacheList = new ArrayList();
Expand Down Expand Up @@ -197,10 +178,7 @@ public List<String> checkUpdateTpIds(String probeUpdateString, boolean isInitial
long readTimeoutMs = timeout + (long) Math.round(timeout >> 1);
Result result = agent.httpPostByConfig(LISTENER_PATH, headers, params, readTimeoutMs);

// Server 端重启后会进入非健康状态, 不进入 catch 则为健康调用
isHealthServer.set(true);
if (result != null && result.isSuccess()) {
setHealthServer(true);
return parseUpdateDataIdResponse(result.getData().toString());
}
} catch (Exception ex) {
Expand Down Expand Up @@ -295,12 +273,8 @@ public CacheData addCacheDataIfAbsent(String namespace, String itemId, String tp
return lastCacheData;
}

public boolean isHealthServer() {
return this.isHealthServer.get();
}

private void setHealthServer(boolean isHealthServer) {
this.isHealthServer.set(isHealthServer);
this.serverHealthCheck.setHealthStatus(isHealthServer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,9 @@ public DiscoveryClient(HttpAgent httpAgent, InstanceInfo instanceInfo) {
.threadFactory("DiscoveryClient-HeartbeatExecutor", true)
.build();

this.scheduler = Executors.newScheduledThreadPool(2,
ThreadFactoryBuilder.builder()
.daemon(true)
.prefix("DiscoveryClient-Scheduler")
.build()
this.scheduler = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix("DiscoveryClient-Scheduler").build()
);

register();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cn.hippo4j.starter.core;

import cn.hippo4j.starter.remote.HttpAgent;
import cn.hippo4j.starter.remote.ServerHealthCheck;

import java.util.Arrays;

Expand All @@ -14,8 +15,11 @@ public class ThreadPoolConfigService implements ConfigService {

private final ClientWorker clientWorker;

public ThreadPoolConfigService(HttpAgent httpAgent, String identification) {
clientWorker = new ClientWorker(httpAgent, identification);
private final ServerHealthCheck serverHealthCheck;

public ThreadPoolConfigService(HttpAgent httpAgent, String identification, ServerHealthCheck serverHealthCheck) {
this.serverHealthCheck = serverHealthCheck;
this.clientWorker = new ClientWorker(httpAgent, identification, serverHealthCheck);
}

@Override
Expand All @@ -25,7 +29,7 @@ public void addListener(String tenantId, String itemId, String tpId, Listener li

@Override
public String getServerStatus() {
if (clientWorker.isHealthServer()) {
if (serverHealthCheck.isHealthStatus()) {
return "UP";
} else {
return "DOWN";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import cn.hippo4j.starter.config.BootstrapProperties;
import cn.hippo4j.starter.core.GlobalThreadPoolManage;
import cn.hippo4j.starter.handler.AbstractThreadPoolRuntime;
import cn.hippo4j.starter.remote.ServerHealthCheck;
import cn.hippo4j.starter.toolkit.thread.ThreadFactoryBuilder;
import cn.hippo4j.starter.toolkit.thread.ThreadUtil;
import cn.hutool.core.bean.BeanUtil;
Expand Down Expand Up @@ -47,6 +48,9 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements
@NonNull
private final MessageSender messageSender;

@NonNull
private final ServerHealthCheck serverHealthCheck;

/**
* 数据采集的缓冲容器, 等待 ReportingEventExecutor 上报服务端
*/
Expand All @@ -57,7 +61,7 @@ public class ReportingEventExecutor extends AbstractThreadPoolRuntime implements
*/
private final ScheduledThreadPoolExecutor collectVesselExecutor = new ScheduledThreadPoolExecutor(
new Integer(1),
ThreadFactoryBuilder.builder().daemon(true).prefix("scheduled-collect-vessel").build()
ThreadFactoryBuilder.builder().daemon(true).prefix("collect-data-scheduled").build()
);

@SneakyThrows
Expand Down Expand Up @@ -91,6 +95,7 @@ public void destroy() {
* 采集动态线程池数据, 并添加缓冲队列
*/
private void runTimeGatherTask() {
serverHealthCheck.isHealthStatus();
Message message = collectMessage();
messageCollectVessel.offer(message);
}
Expand Down
Loading

0 comments on commit d382802

Please sign in to comment.