Skip to content

Commit

Permalink
Change the invalid compare in NotifySingleService (alibaba#5587)
Browse files Browse the repository at this point in the history
* optimize the compare in NotifySingleService.java

* delete the unused method setupNotifyExecutors
  • Loading branch information
brotherlu-xcq authored May 14, 2021
1 parent de070a5 commit c756da3
Showing 1 changed file with 2 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,12 @@

package com.alibaba.nacos.config.server.service.notify;

import com.alibaba.nacos.common.executor.ExecutorFactory;
import com.alibaba.nacos.common.executor.NameThreadFactory;
import com.alibaba.nacos.common.task.NacosTask;
import com.alibaba.nacos.config.server.utils.LogUtil;
import com.alibaba.nacos.core.cluster.Member;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -39,6 +30,7 @@
* @author Nacos
*/
public class NotifySingleService {
private static final Logger LOGGER = LogUtil.FATAL_LOG;

static class NotifyTaskProcessorWrapper extends NotifyTaskProcessor {

Expand Down Expand Up @@ -84,7 +76,7 @@ public void run() {
.error("[notify-exception] target:{} dataid:{} group:{} ts:{}", target, getDataId(), getGroup(),
getLastModified());
LogUtil.NOTIFY_LOG.debug("[notify-exception] target:{} dataid:{} group:{} ts:{}",
new Object[] {target, getDataId(), getGroup(), getLastModified()}, e);
target, getDataId(), getGroup(), getLastModified(), e);
}

if (!this.isSuccess) {
Expand All @@ -100,59 +92,4 @@ public void run() {
}
}
}

@Autowired
public NotifySingleService(ServerMemberManager memberManager) {
this.memberManager = memberManager;
setupNotifyExecutors();
}

/**
* When the system is started or when the cluster is expanded or offline: single-threaded setupNotifyExecutors
* executors use ConcurrentHashMap to ensure visibility.
*/
private void setupNotifyExecutors() {
Collection<Member> clusterIps = memberManager.allMembers();

for (Member member : clusterIps) {

final String address = member.getAddress();

/*
* Fixed number of threads, unbounded queue
* (based on assumption: thread pool throughput is good,
* there will be no continuous task accumulation,
* there is occasional instantaneous pressure)
*/
Executor executor = ExecutorFactory.newSingleScheduledExecutorService(
new NameThreadFactory("com.alibaba.nacos.config.NotifySingleServiceThread-" + address));

if (null == executors.putIfAbsent(address, executor)) {
LOGGER.warn("[notify-thread-pool] setup thread target ip {} ok.", address);
}
}

for (Map.Entry<String, Executor> entry : executors.entrySet()) {
String target = entry.getKey();

// The cluster node goes offline
if (!clusterIps.contains(target)) {
ThreadPoolExecutor executor = (ThreadPoolExecutor) entry.getValue();
executor.shutdown();
executors.remove(target);
LOGGER.warn("[notify-thread-pool] tear down thread target ip {} ok.", target);
}
}

}

private static final Logger LOGGER = LogUtil.FATAL_LOG;

private ServerMemberManager memberManager;

private ConcurrentHashMap<String, Executor> executors = new ConcurrentHashMap<String, Executor>();

public ConcurrentHashMap<String, Executor> getExecutors() {
return executors;
}
}

0 comments on commit c756da3

Please sign in to comment.