Skip to content

Commit

Permalink
rename current server to local server
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Aug 15, 2024
1 parent b258c10 commit 432cdda
Show file tree
Hide file tree
Showing 42 changed files with 231 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
title: 'Version'
},
{
field: 'startupAt',
field: 'startupTime',
title: 'Worker启动时间'
}]
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
title: 'Version'
},
{
field: 'startupAt',
field: 'startupTime',
title: 'Worker启动时间'
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
title: 'Version'
},
{
field: 'startupAt',
field: 'startupTime',
title: 'Supervisor启动时间'
}]
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public static boolean shutdown(ExecutorService executorService, int awaitSeconds
}

private static ThreadPoolExecutor makeThreadPoolExecutor() {
int poolSize = Numbers.toInt(SystemUtils.getConfig(DISJOB_COMMON_POOL_SIZE), Runtime.getRuntime().availableProcessors() * 4);
int poolSize = Numbers.toInt(SystemUtils.getConfig(DISJOB_COMMON_POOL_SIZE), Runtime.getRuntime().availableProcessors() * 8);
if (poolSize < 0 || poolSize > MAX_CAP) {
LOG.warn("Invalid disjob common pool size config value: {}", poolSize);
poolSize = Numbers.bound(poolSize, 1, MAX_CAP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ public final class JobConstants {
public static final String SPRING_BEAN_NAME_PREFIX = DISJOB_KEY_PREFIX + ".bean";

/**
* Current supervisor spring bean name
* Local supervisor spring bean name
*/
public static final String SPRING_BEAN_NAME_CURRENT_SUPERVISOR = SPRING_BEAN_NAME_PREFIX + ".current-supervisor";
public static final String SPRING_BEAN_NAME_LOCAL_SUPERVISOR = SPRING_BEAN_NAME_PREFIX + ".local-supervisor";

/**
* Timing wheel spring bean name
*/
public static final String SPRING_BEAN_NAME_TIMING_WHEEL = SPRING_BEAN_NAME_PREFIX + ".timing-wheel";

/**
* Current worker spring bean name
* Local worker spring bean name
*/
public static final String SPRING_BEAN_NAME_CURRENT_WORKER = SPRING_BEAN_NAME_PREFIX + ".current-worker";
public static final String SPRING_BEAN_NAME_LOCAL_WORKER = SPRING_BEAN_NAME_PREFIX + ".local-worker";

/**
* Rest template spring bean name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public static Supervisor deserialize(String text) {
return new Supervisor(host, port);
}

public static Supervisor.Current current() {
return Current.instance;
public static Local local() {
return Local.instance;
}

@Override
Expand Down Expand Up @@ -122,25 +122,25 @@ public Supervisor deserialize(JsonParser p, DeserializationContext ctx) throws I
}
}

// -------------------------------------------------------------------------------current Supervisor
// -------------------------------------------------------------------------------local Supervisor

/**
* Supervisor.class.getDeclaredClasses()[0]
*/
@SuppressWarnings("serial")
public abstract static class Current extends Supervisor {
private static volatile Current instance = null;
public abstract static class Local extends Supervisor {
private static volatile Local instance = null;

private final LocalDateTime startupAt;
private final LocalDateTime startupTime;

private Current(String host, int port) {
private Local(String host, int port) {
super(host, port);
SingletonClassConstraint.constrain(Current.class);
this.startupAt = LocalDateTime.now();
SingletonClassConstraint.constrain(Local.class);
this.startupTime = LocalDateTime.now();
}

public final LocalDateTime getStartupAt() {
return startupAt;
public final LocalDateTime getStartupTime() {
return startupTime;
}

/**
Expand All @@ -151,13 +151,13 @@ public final LocalDateTime getStartupAt() {
*/
public abstract String getWorkerContextPath(String group);

private static synchronized Current create(final String host, final int port,
final UnaryOperator<String> workerContextPath) {
private static synchronized Local create(final String host, final int port,
final UnaryOperator<String> workerContextPath) {
if (instance != null) {
throw new Error("Current supervisor already created.");
throw new Error("Local supervisor already created.");
}

instance = new Current(host, port) {
instance = new Local(host, port) {
@Override
public String getWorkerContextPath(String group) {
return workerContextPath.apply(group);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class SupervisorMetrics extends ToJsonString implements Serializable {
/**
* 启动时间
*/
private Date startupAt;
private Date startupTime;

/**
* 是否也是Worker角色
Expand Down
34 changes: 17 additions & 17 deletions disjob-core/src/main/java/cn/ponfee/disjob/core/base/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ public static Worker deserialize(String text) {
return new Worker(group, workerId, host, port);
}

public static Worker.Current current() {
return Current.instance;
public static Local local() {
return Local.instance;
}

// --------------------------------------------------------custom jackson serialize & deserialize
Expand Down Expand Up @@ -201,22 +201,22 @@ public Worker deserialize(JsonParser p, DeserializationContext ctx) throws IOExc
}
}

// -------------------------------------------------------------------------------current Worker
// -------------------------------------------------------------------------------local Worker

@SuppressWarnings("serial")
public abstract static class Current extends Worker {
private static volatile Current instance = null;
public abstract static class Local extends Worker {
private static volatile Local instance = null;

private final LocalDateTime startupAt;
private final LocalDateTime startupTime;

private Current(String group, String workerId, String host, int port) {
private Local(String group, String workerId, String host, int port) {
super(group, workerId, host, port);
SingletonClassConstraint.constrain(Current.class);
this.startupAt = LocalDateTime.now();
SingletonClassConstraint.constrain(Local.class);
this.startupTime = LocalDateTime.now();
}

public final LocalDateTime getStartupAt() {
return startupAt;
public final LocalDateTime getStartupTime() {
return startupTime;
}

/**
Expand Down Expand Up @@ -249,15 +249,15 @@ public final LocalDateTime getStartupAt() {

// need do reflection call
// use synchronized modify for help multiple thread read reference(write to main memory)
private static synchronized Current create(final String group, final String workerId,
final String host, final int port,
final String wToken, final String sToken,
final String supervisorContextPath) {
private static synchronized Local create(final String group, final String workerId,
final String host, final int port,
final String wToken, final String sToken,
final String supervisorContextPath) {
if (instance != null) {
throw new Error("Current worker already created.");
throw new Error("Local worker already created.");
}

instance = new Current(group, workerId, host, port) {
instance = new Local(group, workerId, host, port) {
private final String workerToken = StringUtils.trim(wToken);
private final String supervisorToken = StringUtils.trim(sToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class WorkerMetrics extends ToJsonString implements Serializable {
/**
* 启动时间
*/
private Date startupAt;
private Date startupTime;

/**
* 是否也是Supervisor角色
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package cn.ponfee.disjob.core.util;

import cn.ponfee.disjob.common.concurrent.ThreadPoolExecutors;
import cn.ponfee.disjob.common.concurrent.Threads;
import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable;
import cn.ponfee.disjob.common.spring.SpringContextHolder;
Expand Down Expand Up @@ -72,20 +73,6 @@ public static String getLocalHost(String specifiedHost) {
throw new Error("Not found available server host.");
}

private static boolean isValidHost(String host, String from) {
if (StringUtils.isBlank(host)) {
return false;
}
if (!NetUtils.isValidLocalHost(host)) {
LOG.warn("Invalid server host configured {}: {}", from, host);
return false;
}
if (!NetUtils.isReachableHost(host)) {
LOG.warn("Unreachable server host configured {}: {}", from, host);
}
return true;
}

public static void doInSynchronized(Long lock, ThrowingRunnable<?> action, Supplier<String> message) {
Throwable t = null;
try {
Expand All @@ -95,16 +82,34 @@ public static void doInSynchronized(Long lock, ThrowingRunnable<?> action, Suppl
LOG.error(message.get(), t);
} finally {
if (isCurrentThreadInterrupted(t)) {
try {
doInSynchronized(lock, action);
} catch (Throwable e) {
LOG.error("Retry error, " + message.get(), e);
}
ThreadPoolExecutors.commonThreadPool().execute(()-> {
try {
doInSynchronized(lock, action);
} catch (Throwable e) {
LOG.error("Retry error, " + message.get(), e);
}
});
}
Threads.interruptIfNecessary(t);
}
}

// ----------------------------------------------------------------------private methods

private static boolean isValidHost(String host, String from) {
if (StringUtils.isBlank(host)) {
return false;
}
if (!NetUtils.isValidLocalHost(host)) {
LOG.warn("Invalid server host configured {}: {}", from, host);
return false;
}
if (!NetUtils.isReachableHost(host)) {
LOG.warn("Unreachable server host configured {}: {}", from, host);
}
return true;
}

private static void doInSynchronized(Long lock, ThrowingRunnable<?> action) throws Throwable {
// Long.toString(lock).intern()
synchronized (INSTANCE_LOCK_POOL.intern(lock)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,8 +182,8 @@ private void doDispatch0(ExecuteTaskParam task) throws Exception {
}

boolean result;
if (taskReceiver != null && task.getWorker().matches(Worker.current())) {
// if current Supervisor also is a Worker role, then dispatch to this local worker
if (taskReceiver != null && task.getWorker().matches(Worker.local())) {
// if local Supervisor also is a Worker role, then dispatch to this local worker
log.info("Dispatching task to local worker {}, {}, {}", task.getTaskId(), task.getOperation(), task.getWorker());
result = taskReceiver.receive(task);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
public abstract class TaskReceiver implements Startable {
protected final Logger log = LoggerFactory.getLogger(getClass());

private final Worker.Current currentWorker;
private final Worker.Local localWorker;
private final TimingWheel<ExecuteTaskParam> timingWheel;

protected TaskReceiver(Worker.Current currentWorker, TimingWheel<ExecuteTaskParam> timingWheel) {
protected TaskReceiver(Worker.Local localWorker, TimingWheel<ExecuteTaskParam> timingWheel) {
this.timingWheel = Objects.requireNonNull(timingWheel, "Timing wheel cannot be null.");
this.currentWorker = Objects.requireNonNull(currentWorker, "Current worker cannot be null.");
this.localWorker = Objects.requireNonNull(localWorker, "Local worker cannot be null.");
}

/**
Expand All @@ -60,18 +60,18 @@ protected final boolean doReceive(ExecuteTaskParam param) {
return false;
}

currentWorker.verifySupervisorAuthenticationToken(param);
localWorker.verifySupervisorAuthenticationToken(param);

Worker assignedWorker = param.getWorker();
if (!currentWorker.matches(assignedWorker)) {
log.error("Received unmatched worker task: {}, {}, {}", param.getTaskId(), currentWorker, assignedWorker);
if (!localWorker.matches(assignedWorker)) {
log.error("Received unmatched worker task: {}, {}, {}", param.getTaskId(), localWorker, assignedWorker);
return false;
}
if (!currentWorker.getWorkerId().equals(assignedWorker.getWorkerId())) {
if (!localWorker.getWorkerId().equals(assignedWorker.getWorkerId())) {
// 当Worker宕机后又快速启动(重启)的情况,Supervisor从本地缓存(或注册中心)拿到的仍是旧的workerId,但任务却Http方式派发给新的workerId(同机器同端口)
// 这种情况:1、可以剔除掉,等待Supervisor重新派发即可;2、也可以不剔除掉,短暂时间内该Worker的压力会是正常情况的2倍(注册中心还存有旧workerId);
log.warn("Received former worker task: {}, {}, {}", param.getTaskId(), currentWorker, assignedWorker);
param.setWorker(currentWorker);
log.warn("Received former worker task: {}, {}, {}", param.getTaskId(), localWorker, assignedWorker);
param.setWorker(localWorker);
}

boolean res = timingWheel.offer(param);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,19 @@ public RouteStrategy routeStrategy() {
@Override
protected void doRoute(List<ExecuteTaskParam> tasks, List<Worker> workers) {
// 查找workers列表中是否有当前的jvm worker
Worker worker = findLocal(workers, Worker.current());
Worker worker = findLocal(workers, Worker.local());
if (worker != null) {
tasks.forEach(task -> task.setWorker(worker));
} else {
outsiderRouter.route(tasks, workers);
}
}

private static Worker findLocal(List<Worker> workers, Worker current) {
if (current == null) {
private static Worker findLocal(List<Worker> workers, Worker localWorker) {
if (localWorker == null) {
return null;
}
return workers.stream().filter(current::equals).findAny().orElse(null);
return workers.stream().filter(localWorker::equals).findAny().orElse(null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ public HttpTaskDispatcher(ApplicationEventPublisher eventPublisher,
HttpTaskReceiver httpTaskReceiver) {
super(eventPublisher, discoveryWorker, retryProperties, httpTaskReceiver);

Function<Worker, String> workerContextPath = worker -> Supervisor.current().getWorkerContextPath(worker.getGroup());
Function<Worker, String> workerContextPath = worker -> Supervisor.local().getWorkerContextPath(worker.getGroup());
RetryProperties retry = RetryProperties.of(0, 0);
// `TaskDispatcher#dispatch0`内部有处理本地worker的分派逻辑,这里不需要本地的`Controller`,所以传null
this.httpTaskReceiverClient = create(Controller.class, null, null, workerContextPath, restTemplate, retry);
//this.httpTaskReceiverClient = create(Controller.class, httpTaskReceiver, Worker.current(), workerContextPath, restTemplate, retry);
//this.httpTaskReceiverClient = create(Controller.class, httpTaskReceiver, Worker.local(), workerContextPath, restTemplate, retry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
@RpcController
public class HttpTaskReceiver extends TaskReceiver implements Controller {

public HttpTaskReceiver(Worker.Current currentWorker, TimingWheel<ExecuteTaskParam> timingWheel) {
super(currentWorker, timingWheel);
public HttpTaskReceiver(Worker.Local localWorker, TimingWheel<ExecuteTaskParam> timingWheel) {
super(localWorker, timingWheel);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,17 @@ public class HttpTaskDispatchingAutoConfiguration extends BaseTaskDispatchingAut
/**
* Configuration http task receiver.
*/
@ConditionalOnBean(Worker.Current.class)
@ConditionalOnBean(Worker.Local.class)
@Bean
public TaskReceiver taskReceiver(Worker.Current currentWorker,
public TaskReceiver taskReceiver(Worker.Local localWorker,
@Qualifier(JobConstants.SPRING_BEAN_NAME_TIMING_WHEEL) TimingWheel<ExecuteTaskParam> timingWheel) {
return new HttpTaskReceiver(currentWorker, timingWheel);
return new HttpTaskReceiver(localWorker, timingWheel);
}

/**
* Configuration http task dispatcher.
*/
@ConditionalOnBean(Supervisor.Current.class)
@ConditionalOnBean(Supervisor.Local.class)
@Bean
public TaskDispatcher taskDispatcher(ApplicationEventPublisher eventPublisher,
RetryProperties retry,
Expand Down
Loading

0 comments on commit 432cdda

Please sign in to comment.