Skip to content

Commit

Permalink
alibaba#502 Fix data inconsistent bug
Browse files Browse the repository at this point in the history
  • Loading branch information
nkorange committed Mar 26, 2019
1 parent e2a7206 commit 14ecd9c
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 45 deletions.
10 changes: 10 additions & 0 deletions api/src/main/java/com/alibaba/nacos/api/naming/NamingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ public interface NamingService {
*/
void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException;

/**
* deregister instance with full instance information
*
* @param serviceName name of service
* @param groupName group of service
* @param instance instance information
* @throws NacosException
*/
void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException;

/**
* get all instances of a service
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,18 @@ public void deregisterInstance(String serviceName, String ip, int port, String c

@Override
public void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException {
beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), ip, port);
serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), ip, port, clusterName);
Instance instance = new Instance();
instance.setIp(ip);
instance.setPort(port);
instance.setClusterName(clusterName);

deregisterInstance(serviceName, groupName, instance);
}

@Override
public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());
serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,17 +187,18 @@ public void registerService(String serviceName, String groupName, Instance insta

}

public void deregisterService(String serviceName, String ip, int port, String clusterName) throws NacosException {
public void deregisterService(String serviceName, Instance instance) throws NacosException {

NAMING_LOGGER.info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}:{}@{}",
namespaceId, serviceName, ip, port, clusterName);
NAMING_LOGGER.info("[DEREGISTER-SERVICE] {} deregistering service {} with instance: {}",
namespaceId, serviceName, instance);

final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put("ip", ip);
params.put("port", String.valueOf(port));
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.CLUSTER_NAME, clusterName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));

reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.DELETE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ public Thread newThread(Runnable r) {

public volatile Notifier notifier = new Notifier();

private volatile Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();
private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>();

private volatile Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);
private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16);

@PostConstruct
public void init() throws Exception {
Expand Down Expand Up @@ -200,52 +200,56 @@ public void onReceiveChecksums(Map<String, String> checksumMap, String server) {

syncChecksumTasks.put(server, "1");

List<String> toUpdateKeys = new ArrayList<>();
List<String> toRemoveKeys = new ArrayList<>();
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.EPHEMERAL.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
// abort the procedure:
return;
}
if (!dataStore.contains(entry.getKey()) ||
dataStore.get(entry.getKey()).value == null ||
!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey());
try {

List<String> toUpdateKeys = new ArrayList<>();
List<String> toRemoveKeys = new ArrayList<>();
for (Map.Entry<String, String> entry : checksumMap.entrySet()) {
if (distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
// this key should not be sent from remote server:
Loggers.EPHEMERAL.error("receive responsible key timestamp of " + entry.getKey() + " from " + server);
// abort the procedure:
return;
}
if (!dataStore.contains(entry.getKey()) ||
dataStore.get(entry.getKey()).value == null ||
!dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
toUpdateKeys.add(entry.getKey());
}
}
}

for (String key : dataStore.keys()) {
for (String key : dataStore.keys()) {

if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}
if (!server.equals(distroMapper.mapSrv(KeyBuilder.getServiceName(key)))) {
continue;
}

if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
if (!checksumMap.containsKey(key)) {
toRemoveKeys.add(key);
}
}
}

Loggers.EPHEMERAL.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);
Loggers.EPHEMERAL.info("to remove keys: {}, to update keys: {}, source: {}", toRemoveKeys, toUpdateKeys, server);

for (String key : toRemoveKeys) {
onRemove(key);
}
for (String key : toRemoveKeys) {
onRemove(key);
}

if (toUpdateKeys.isEmpty()) {
return;
}
if (toUpdateKeys.isEmpty()) {
return;
}

try {
byte[] result = NamingProxy.getData(toUpdateKeys, server);
processData(result);
} catch (Exception e) {
Loggers.EPHEMERAL.error("get data from " + server + " failed!", e);
try {
byte[] result = NamingProxy.getData(toUpdateKeys, server);
processData(result);
} catch (Exception e) {
Loggers.EPHEMERAL.error("get data from " + server + " failed!", e);
}
} finally {
// Remove this 'in process' flag:
syncChecksumTasks.remove(server);
}

// Remove this 'in process' flag:
syncChecksumTasks.remove(server);
}

public boolean syncAllDataFromRemote(Server server) {
Expand Down

0 comments on commit 14ecd9c

Please sign in to comment.