Skip to content

Commit

Permalink
Merge branch 'master' into hotfix_default_server_mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Fury Zhu authored Mar 21, 2019
2 parents 81bbbe6 + c1d895a commit cde0928
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,4 @@ nacos.naming.distro.batchSyncKeyCount=1000
nacos.naming.distro.initDataRatio=0.9
nacos.naming.distro.syncRetryDelay=5000
nacos.naming.data.warmup=false
nacos.naming.expireInstance=true
1 change: 1 addition & 0 deletions distribution/conf/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ nacos.naming.distro.batchSyncKeyCount=1000
nacos.naming.distro.initDataRatio=0.9
nacos.naming.distro.syncRetryDelay=5000
nacos.naming.data.warmup=true
nacos.naming.expireInstance=true
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.alibaba.nacos.naming.cluster.ServerStatus;
import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
Expand All @@ -32,13 +33,15 @@
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.*;
import com.alibaba.nacos.naming.pojo.Record;
import org.apache.commons.lang3.StringUtils;
import org.javatuples.Pair;
import org.springframework.beans.factory.annotation.Autowired;

import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;

/**
* A consistency protocol algorithm called <b>Partition</b>
Expand All @@ -57,6 +60,18 @@
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {

private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);

t.setDaemon(true);
t.setName("com.alibaba.nacos.naming.distro.notifier");

return t;
}
});

@Autowired
private DistroMapper distroMapper;

Expand All @@ -83,7 +98,9 @@ public class DistroConsistencyServiceImpl implements EphemeralConsistencyService

private boolean initialized = false;

private volatile Map<String, List<RecordListener>> listeners = new ConcurrentHashMap<>();
public volatile Notifier notifier = new Notifier();

private volatile Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();

@PostConstruct
public void init() throws Exception {
Expand All @@ -97,6 +114,8 @@ public void run() {
}
}
});

executor.submit(notifier);
}

public void load() throws Exception {
Expand Down Expand Up @@ -154,13 +173,8 @@ public void onPut(String key, Record value) {
if (!listeners.containsKey(key)) {
return;
}
for (RecordListener listener : listeners.get(key)) {
try {
listener.onChange(key, value);
} catch (Exception e) {
Loggers.EPHEMERAL.error("notify " + listener + ", key:" + key + " failed.", e);
}
}

notifier.addTask(key, ApplyAction.CHANGE);
}

public void onRemove(String key) {
Expand All @@ -170,13 +184,8 @@ public void onRemove(String key) {
if (!listeners.containsKey(key)) {
return;
}
for (RecordListener listener : listeners.get(key)) {
try {
listener.onDelete(key);
} catch (Exception e) {
Loggers.EPHEMERAL.error("notify " + listener + ", key:" + key + " failed.", e);
}
}

notifier.addTask(key, ApplyAction.DELETE);
}

public void onReceiveChecksums(Map<String, String> checksumMap, String server) {
Expand Down Expand Up @@ -288,7 +297,7 @@ public void processData(byte[] data) throws Exception {
@Override
public void listen(String key, RecordListener listener) throws NacosException {
if (!listeners.containsKey(key)) {
listeners.put(key, new ArrayList<>());
listeners.put(key, new CopyOnWriteArrayList<>());
}
listeners.get(key).add(listener);
}
Expand All @@ -314,4 +323,78 @@ public boolean isAvailable() {
public boolean isInitialized() {
return initialized || !globalConfig.isDataWarmup();
}

public class Notifier implements Runnable {

private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);

private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024);

public void addTask(String datumKey, ApplyAction action) {

if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) {
return;
}
if (action == ApplyAction.CHANGE) {
services.put(datumKey, StringUtils.EMPTY);
}
tasks.add(Pair.with(datumKey, action));
}

public int getTaskSize() {
return tasks.size();
}

@Override
public void run() {
Loggers.EPHEMERAL.info("distro notifier started");

while (true) {
try {

Pair pair = tasks.take();

if (pair == null) {
continue;
}

String datumKey = (String) pair.getValue0();
ApplyAction action = (ApplyAction) pair.getValue1();

services.remove(datumKey);

int count = 0;

if (!listeners.containsKey(datumKey)) {
continue;
}

for (RecordListener listener : listeners.get(datumKey)) {

count++;

try {
if (action == ApplyAction.CHANGE) {
listener.onChange(datumKey, dataStore.get(datumKey).value);
continue;
}

if (action == ApplyAction.DELETE) {
listener.onDelete(datumKey);
continue;
}
} catch (Throwable e) {
Loggers.EPHEMERAL.error("[NACOS-DISTRO] error while notifying listener of key: {} {}", datumKey, e);
}
}

if (Loggers.EPHEMERAL.isDebugEnabled()) {
Loggers.EPHEMERAL.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}", datumKey, count);
}
} catch (Throwable e) {
Loggers.EPHEMERAL.error("[NACOS-DISTRO] Error while handling notifying task", e);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ public void onThrowable(Throwable t) {
}
});
} catch (Exception e) {
Loggers.RAFT.error("VIPSRV error while sending heart-beat to peer: {} {}", server, e);
Loggers.RAFT.error("error while sending heart-beat to peer: {} {}", server, e);
MetricsMonitor.getLeaderSendBeatFailedException().increment();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public boolean matchUnlistenKey(String key) {
@Override
public void onChange(String key, Instances value) throws Exception {

Loggers.RAFT.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);

for (Instance ip : value.getInstanceList()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instance;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.misc.HttpClient;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.misc.*;
import com.alibaba.nacos.naming.push.PushService;
import com.ning.http.client.AsyncCompletionHandler;
import com.ning.http.client.Response;
Expand Down Expand Up @@ -57,6 +54,10 @@ public DistroMapper getDistroMapper() {
return SpringContext.getAppContext().getBean(DistroMapper.class);
}

public GlobalConfig getGlobalConfig() {
return SpringContext.getAppContext().getBean(GlobalConfig.class);
}

public String taskKey() {
return service.getName();
}
Expand Down Expand Up @@ -85,6 +86,10 @@ public void run() {
}
}

if (!getGlobalConfig().isExpireInstance()) {
return;
}

// then remove obsolete instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > service.getIpDeleteTimeout()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class GlobalConfig {
@Value("${nacos.naming.data.warmup}")
private boolean dataWarmup = false;

@Value("${nacos.naming.expireInstance}")
private boolean expireInstance = true;

public int getTaskDispatchPeriod() {
return taskDispatchPeriod;
}
Expand All @@ -61,4 +64,8 @@ public int getTaskDispatchThreadCount() {
public boolean isDataWarmup() {
return dataWarmup;
}

public boolean isExpireInstance() {
return expireInstance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public static HttpResult request(String url, List<String> headers, Map<String, S

return getResult(conn);
} catch (Exception e) {
Loggers.SRV_LOG.warn("[VIPSRV] Exception while request: {}, caused: {}", url, e);
Loggers.SRV_LOG.warn("Exception while request: {}, caused: {}", url, e);
return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap());
} finally {
if (conn != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,9 @@ public void setSendBeatOnly(boolean sendBeatOnly) {
// the followings are not implemented

public String getName() {
return "00-00---000-VIPSRV_SWITCH_DOMAIN-000---00-00";
return UtilsAndCommons.SWITCH_DOMAIN_NAME;
}


public void update(SwitchDomain domain) {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public class UtilsAndCommons {

public static final String IPADDRESS_DATA_ID_PRE = "com.alibaba.nacos.naming.iplist.";

public static final String SWITCH_DOMAIN_NAME = "00-00---000-VIPSRV_SWITCH_DOMAIN-000---00-00";
public static final String SWITCH_DOMAIN_NAME = "00-00---000-NACOS_SWITCH_DOMAIN-000---00-00";

public static final String CIDR_REGEX = "[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}\\.[0-9]{1,3}/[0-9]+";

Expand Down
1 change: 1 addition & 0 deletions naming/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ nacos.naming.distro.initDataRatio=0.9
nacos.naming.distro.syncRetryDelay=5000

nacos.naming.data.warmup=true
nacos.naming.expireInstance=true
1 change: 1 addition & 0 deletions test/src/test/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ nacos.naming.distro.batchSyncKeyCount=1000
nacos.naming.distro.initDataRatio=0.9
nacos.naming.distro.syncRetryDelay=5000
nacos.naming.data.warmup=false
nacos.naming.expireInstance=true

0 comments on commit cde0928

Please sign in to comment.