Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
赵禹光 committed May 19, 2019
1 parent c26c61a commit ba6b33b
Show file tree
Hide file tree
Showing 12 changed files with 320 additions and 243 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.ke.schedule.client.spring.starter;

import com.ke.schedule.client.spring.annotation.Kob;
import com.ke.schedule.client.spring.annotation.KobSchedule;
import com.ke.schedule.client.spring.core.ClientProcessor;
import com.ke.schedule.client.spring.startup.AbstractAutoConfiguration;
import com.ke.schedule.client.spring.startup.ClientProperties;
Expand All @@ -9,8 +9,6 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;

import java.util.Map;

@Configuration
public class KobClientAutoConfiguration extends AbstractAutoConfiguration {

Expand All @@ -24,6 +22,6 @@ public ClientProperties prop() {
@Bean(name = "kobScheduleProcessor", initMethod = "init", destroyMethod = "destroy")
public ClientProcessor scheduleProcessor() {
ClientProperties prop = (ClientProperties) applicationContext.getBean("kobClientProperties");
return new ClientProcessor(prop.build(), applicationContext.getBeansWithAnnotation(Kob.class));
return new ClientProcessor(prop.build(), applicationContext.getBeansWithAnnotation(KobSchedule.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface Kob {
public @interface KobSchedule {
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
public @interface Task {
String key();

String cn();
String remark();
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package com.ke.schedule.client.spring.core;

import com.alibaba.fastjson.JSON;
import com.ke.schedule.basic.model.*;
import com.ke.schedule.client.spring.constant.ClientConstant;
import com.ke.schedule.client.spring.constant.ClientLogConstant;
import com.ke.schedule.client.spring.startup.ClientProperties;
import com.ke.schedule.basic.constant.ZkPathConstant;
import com.ke.schedule.basic.model.ClientData;
import com.ke.schedule.basic.model.TaskBaseContext;
import com.ke.schedule.basic.model.TaskResult;
import com.ke.schedule.basic.model.ZkAuthInfo;
import com.ke.schedule.basic.support.IpUtils;
import com.ke.schedule.basic.support.KobUtils;
import com.ke.schedule.basic.support.NamedThreadFactory;
import com.ke.schedule.basic.support.UuidUtils;
import com.ke.schedule.client.spring.annotation.Task;
import com.ke.schedule.client.spring.constant.ClientConstant;
import com.ke.schedule.client.spring.constant.ClientLogConstant;
import com.ke.schedule.client.spring.startup.ClientProperties;
import javafx.util.Pair;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.OkHttpClient;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkMarshallingError;
import org.I0Itec.zkclient.exception.ZkTimeoutException;
Expand All @@ -25,45 +25,32 @@
import org.springframework.util.CollectionUtils;

import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* @Author: zhaoyuguang
* @Date: 2018/7/31 下午9:42
*/

public @Slf4j @NoArgsConstructor class ClientContext {
private @Getter
ClientProperties prop;
private Map<String, Object> beans;
public @Slf4j
@NoArgsConstructor
class ClientContext {
private @Getter @Setter Long expireRecyclingTime = 20L * 60L * 1000L;
private @Getter
ClientData client;
private @Getter ClientData data;
private @Getter ZkClient zkClient;
private @Getter String clientTaskPath;
private @Getter String clientNodePath;
private @Getter String pathClientInfoLocal;
private @Getter Map<String, Pair<String, Function<TaskBaseContext, TaskResult>>> runner;
private @Getter ThreadPoolExecutor pool;
private @Getter
TaskDispatcher2 dispatcher = new TaskDispatcher2(this);
private @Getter String adminUrl;
private @Getter @Setter String processorPath;
private String taskLogPath;
private String serviceLogPath;
private Integer clientWorks;
private Long initialDelay;


public ClientContext(ClientProperties prop, Map<String, Object> beans) {
this.prop = prop;
this.beans = beans;
this.client = new ClientData();
this.runner = new ConcurrentHashMap<>();
}
private @Getter @Setter String adminUrl;
private @Getter @Setter String zkConnect;

static class Builder {
private ClientContext context;
Expand All @@ -78,6 +65,7 @@ public ClientContext build() {
}

Builder zk(String zkServers, Integer sessionTimeout, Integer connectionTimeout, List<ZkAuthInfo> auths) {
context.zkConnect = zkServers;
try {
context.zkClient = new ZkClient(zkServers, sessionTimeout, connectionTimeout, new ZkSerializer() {
@Override
Expand Down Expand Up @@ -112,7 +100,7 @@ Builder runner(Map<String, Object> beans) {
for (final Method method : methods) {
Task task = AnnotationUtils.findAnnotation(method, Task.class);
if (task != null) {
context.runner.put(task.key(), new Pair<>(task.cn(), TaskRunnerBuilder.build(v, method)));
context.runner.put(task.key(), new Pair<>(task.remark(), TaskRunnerBuilder.build(v, method)));
}
}
}
Expand All @@ -125,35 +113,32 @@ Builder pool(Integer threads) {
context.pool = (ThreadPoolExecutor) Executors.newFixedThreadPool(threads);
return this;
}
}

public boolean checkProperties() {
if (prop == null) {
log.error(ClientLogConstant.error500("spring配置文件中未找到kob.client属性"));
return false;
}
if (KobUtils.isEmpty(prop.getZkPrefix())) {
log.error(ClientLogConstant.error500("spring配置文件中未找到kob.client.cluster属性"));
return false;
}
if (KobUtils.isEmpty(prop.getProjectCode())) {
log.error(ClientLogConstant.error500("spring配置文件中未找到kob.client.project_code属性"));
return false;
}
if (KobUtils.isEmpty(prop.getZkConnectString())) {
log.error(ClientLogConstant.error500("spring配置文件中未找到kob.client.zk_servers属性"));
return false;
Builder path(String zkPrefix, String projectCode) {
context.clientTaskPath = ZkPathConstant.clientTaskPath(zkPrefix, projectCode);
context.clientNodePath = ZkPathConstant.clientNodePath(zkPrefix, projectCode);
return this;
}
if (KobUtils.isEmpty(prop.getAdminUrl())) {
log.error(ClientLogConstant.error500("spring配置文件中未找到kob.client.admin_url"));
return false;

Builder client(String projectCode, Integer threads) {
context.data = new ClientData();
context.data.setVersion(ClientConstant.VERSION);
context.data.setIp(IpUtils.getLocalAddress());
context.data.setIdentification(context.data.getIp() + ZkPathConstant.HYPHEN + UuidUtils.builder(UuidUtils.AbbrType.CI));
context.data.setProjectCode(projectCode);
context.data.setCreated(System.currentTimeMillis());
context.data.setThreads(threads);
context.data.setTasks(new HashMap<>());
context.runner.forEach((k,v) -> context.data.getTasks().put(k, v.getKey()));
return this;
}
if (KobUtils.isEmpty(beans)) {
log.error(ClientLogConstant.error500("spring IOC容器未找到有@Kob注解标记的Bean,请检查使用spring scan注解是否有误"));
return false;

Builder admin(String adminUrl) {
context.adminUrl = adminUrl;
return this;
}
return true;
}

//
// public void build() {
// if (prop.getZkSessionTimeout() == null) {
Expand Down Expand Up @@ -192,31 +177,31 @@ public boolean checkProperties() {
// clientNodePath = ZkPathConstant.clientNodePath(prop.getZkPrefix(), prop.getProjectCode());
// buildKobRunner(beans);
// buildClientInfo(prop);
// ClientPath clientPathLocal = new ClientPath(client.getIp(),client.getIdentification(), client.getProjectCode(), client.getTasks());
// ClientPath clientPathLocal = new ClientPath(data.getIp(),data.getIdentification(), data.getProjectCode(), data.getTasks());
// pathClientInfoLocal = clientNodePath + ZkPathConstant.BACKSLASH + JSON.toJSONString(clientPathLocal);
// pool = new ThreadPoolExecutor(prop.getThreads(), prop.getThreads(),0L, TimeUnit.MILLISECONDS,
// new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("kob-schedule-performing"));
// }

private void buildClientInfo(ClientProperties prop) {
client.setVersion(ClientConstant.VERSION);
String ip = IpUtils.getLocalAddress();
client.setIp(ip);
String uuid = UuidUtils.builder(UuidUtils.AbbrType.CI);
client.setIdentification(ip + ZkPathConstant.HYPHEN + uuid);
client.setProjectCode(prop.getProjectCode());
Long now = System.currentTimeMillis();
client.setCreated(now);
client.setModified(now);
this.adminUrl = prop.getAdminUrl();
this.taskLogPath = ClientConstant.DEFAULT_TASK_LOG_PATH;
this.serviceLogPath = ClientConstant.DEFAULT_SERVICE_LOG_PATH;
this.clientWorks = prop.getThreads();
client.setLogWarnEnable(prop.getLogWarnEnable());
client.setExpireRecyclingTime(prop.getExpireRecyclingSec());
client.setLoadFactor(prop.getLoadFactor());
client.setThreads(prop.getThreads());
this.initialDelay = prop.getInitialDelay();
client.setHeartbeatPeriod(prop.getHeartbeatPeriod());
}
// private void buildClientInfo(ClientProperties prop) {
// data.setVersion(ClientConstant.VERSION);
// String ip = IpUtils.getLocalAddress();
// data.setIp(ip);
// String uuid = UuidUtils.builder(UuidUtils.AbbrType.CI);
// data.setIdentification(ip + ZkPathConstant.HYPHEN + uuid);
// data.setProjectCode(prop.getProjectCode());
// Long now = System.currentTimeMillis();
// data.setCreated(now);
// data.setModified(now);
// this.adminUrl = prop.getAdminUrl();
// this.taskLogPath = ClientConstant.DEFAULT_TASK_LOG_PATH;
// this.serviceLogPath = ClientConstant.DEFAULT_SERVICE_LOG_PATH;
// this.clientWorks = prop.getThreads();
// data.setLogWarnEnable(prop.getLogWarnEnable());
// data.setExpireRecyclingTime(prop.getExpireRecyclingSec());
// data.setLoadFactor(prop.getLoadFactor());
// data.setThreads(prop.getThreads());
// this.initialDelay = prop.getInitialDelay();
// data.setHeartbeatPeriod(prop.getHeartbeatPeriod());
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Predicate<Object[]> tryToExclusionNodeHasMe() {
TaskContext.Path path = (TaskContext.Path) objects[0];
ClientContext client = (ClientContext) objects[1];
return !StringUtils.isEmpty(path.getTryToExclusionNode())
&& path.getTryToExclusionNode().contains(client.getClient().getIdentification());
&& path.getTryToExclusionNode().contains(client.getData().getIdentification());
};
}

Expand All @@ -76,10 +76,10 @@ public Predicate<ClientContext> checkPoolSize() {
int active = context.getPool().getActiveCount();
int max = context.getPool().getMaximumPoolSize();
double value = new BigDecimal((float) active / max).setScale(2, BigDecimal.ROUND_HALF_UP).doubleValue();
double loadFactor = context.getClient().getLoadFactor();
double loadFactor = context.getData().getLoadFactor();
if (value > loadFactor) {
if (context.getClient().getLogWarnEnable()) {
log.warn(ClientLogConstant.warn404(loadFactor, context.getClient().getIdentification(), active, max));
if (context.getData().getLogWarnEnable()) {
log.warn(ClientLogConstant.warn404(loadFactor, context.getData().getIdentification(), active, max));
}
return true;
}
Expand Down
Loading

0 comments on commit ba6b33b

Please sign in to comment.