Skip to content

Commit

Permalink
Fix unexpected provider operation cause by unordered zookeeper event. (
Browse files Browse the repository at this point in the history
  • Loading branch information
ujjboy committed Sep 17, 2018
1 parent 79e116a commit a8eec7b
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 165 deletions.
30 changes: 15 additions & 15 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,28 @@ language: java
addons:
apt:
packages:
- openjdk-6-jdk
- openjdk-6-jdk

jdk:
- oraclejdk8
- openjdk7
- openjdk6
- oraclejdk8
- openjdk7
- openjdk6

before_install:
- echo "Downloading Maven 3.2.5"
&& wget https://archive.apache.org/dist/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.zip
&& unzip -q apache-maven-3.2.5-bin.zip
&& export M2_HOME=$PWD/apache-maven-3.2.5
&& export PATH=$M2_HOME/bin:$PATH
&& cp ./tools/ci/.travis.settings.xml $HOME/.m2/settings.xml
&& mvn -version
- echo "Downloading Maven 3.2.5"
&& wget https://archive.apache.org/dist/maven/maven-3/3.2.5/binaries/apache-maven-3.2.5-bin.zip
&& unzip -q apache-maven-3.2.5-bin.zip
&& export M2_HOME=$PWD/apache-maven-3.2.5
&& export PATH=$M2_HOME/bin:$PATH
&& cp ./tools/ci/.travis.settings.xml $HOME/.m2/settings.xml
&& mvn -version

install:
- mvn clean install -Pci-install -B -U -e
- sh ./tools/check_format.sh
- mvn clean install -Pci-install -B -U -e
- sh ./tools/check_format.sh

script:
- mvn clean package -Pci-test
- travis_retry mvn clean package -Pci-test

after_success:
- bash <(curl -s https://codecov.io/bash)
- bash <(curl -s https://codecov.io/bash)
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,15 @@ public void removeConfigListener(AbstractInterfaceConfig config) {
public void updateConfig(AbstractInterfaceConfig config, String configPath, ChildData data) {
if (data == null) {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data is null");
LOGGER.infoWithApp(config.getAppName(), "Receive update data is null");
}
} else {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data: path=[" + data.getPath() + "]"
LOGGER.infoWithApp(config.getAppName(), "Receive update data: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
List<ConfigListener> configListeners = configListenerMap.get(config);
if (CommonUtils.isNotEmpty(configListeners)) {
//转换子节点Data为接口级配置<配置属性名,配置属性值>,例如<timeout,200>
Map<String, String> attribute = ZookeeperRegistryHelper.convertConfigToAttribute(configPath, data,
false);
for (ConfigListener listener : configListeners) {
listener.configChanged(attribute);
}
}
notifyListeners(config, configPath, data, false);
}
}

Expand All @@ -108,13 +100,13 @@ public void updateConfig(AbstractInterfaceConfig config, String configPath, Chil
public void updateConfigAll(AbstractInterfaceConfig config, String configPath, List<ChildData> currentData) {
if (CommonUtils.isEmpty(currentData)) {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data is null");
LOGGER.infoWithApp(config.getAppName(), "Receive updateAll data is null");
}
} else {
if (LOGGER.isInfoEnabled(config.getAppName())) {
for (ChildData data : currentData) {
LOGGER.infoWithApp(config.getAppName(), "Receive data: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
LOGGER.infoWithApp(config.getAppName(), "Receive updateAll data: path=["
+ data.getPath() + "], data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
}
Expand All @@ -141,23 +133,15 @@ public void updateConfigAll(AbstractInterfaceConfig config, String configPath, L
public void removeConfig(AbstractInterfaceConfig config, String configPath, ChildData data) {
if (data == null) {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data is null");
LOGGER.infoWithApp(config.getAppName(), "Receive remove data is null");
}
} else {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data: path=[" + data.getPath() + "]"
LOGGER.infoWithApp(config.getAppName(), "Receive remove data: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
List<ConfigListener> configListeners = configListenerMap.get(config);
if (CommonUtils.isNotEmpty(configListeners)) {
//转换子节点Data为接口级配置<配置属性名,null>,例如<timeout,null>
Map<String, String> attribute = ZookeeperRegistryHelper
.convertConfigToAttribute(configPath, data, true);
for (ConfigListener listener : configListeners) {
listener.configChanged(attribute);
}
}
notifyListeners(config, configPath, data, true);
}
}

Expand All @@ -171,22 +155,26 @@ public void removeConfig(AbstractInterfaceConfig config, String configPath, Chil
public void addConfig(AbstractInterfaceConfig config, String configPath, ChildData data) {
if (data == null) {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data is null");
LOGGER.infoWithApp(config.getAppName(), "Receive add data is null");
}
} else {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data: path=[" + data.getPath() + "]"
LOGGER.infoWithApp(config.getAppName(), "Receive add data: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
List<ConfigListener> configListeners = configListenerMap.get(config);
if (CommonUtils.isNotEmpty(configListeners)) {
//转换子节点Data为接口级配置<配置属性名,配置属性值>,例如<timeout,200>
Map<String, String> attribute = ZookeeperRegistryHelper.convertConfigToAttribute(configPath, data,
false);
for (ConfigListener listener : configListeners) {
listener.configChanged(attribute);
}
notifyListeners(config, configPath, data, false);
}
}

private void notifyListeners(AbstractInterfaceConfig config, String configPath, ChildData data, boolean removeType) {
List<ConfigListener> configListeners = configListenerMap.get(config);
if (CommonUtils.isNotEmpty(configListeners)) {
//转换子节点Data为接口级配置<配置属性名,配置属性值>,例如<timeout,200>
Map<String, String> attribute = ZookeeperRegistryHelper.convertConfigToAttribute(configPath, data,
removeType);
for (ConfigListener listener : configListeners) {
listener.configChanged(attribute);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,15 @@ public void removeConfigListener(AbstractInterfaceConfig config) {
public void updateConfig(AbstractInterfaceConfig config, String overridePath, ChildData data) throws Exception {
if (data == null) {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data is null");
LOGGER.infoWithApp(config.getAppName(), "Receive update data is null");
}
} else {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data: path=[" + data.getPath() + "]"
LOGGER.infoWithApp(config.getAppName(), "Receive update data: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
List<ConfigListener> configListeners = configListenerMap.get(config);
if (CommonUtils.isNotEmpty(configListeners)) {
//转换子节点Data为IP级配置<配置属性名,配置属性值>,例如<timeout,200>
Map<String, String> attribute = ZookeeperRegistryHelper.convertOverrideToAttribute(overridePath, data,
false, null);
for (ConfigListener listener : configListeners) {
listener.attrUpdated(attribute);
}
}
notifyListeners(config, overridePath, data, false, null);
}
}

Expand All @@ -112,13 +104,13 @@ public void updateConfigAll(AbstractInterfaceConfig config, String overridePath,
throws UnsupportedEncodingException {
if (CommonUtils.isEmpty(currentData)) {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data is null");
LOGGER.infoWithApp(config.getAppName(), "Receive updateAll data is null");
}
} else {
if (LOGGER.isInfoEnabled(config.getAppName())) {
for (ChildData data : currentData) {
LOGGER.infoWithApp(config.getAppName(), "Receive data: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
LOGGER.infoWithApp(config.getAppName(), "Receive updateAll data: path=["
+ data.getPath() + "], data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
}
Expand Down Expand Up @@ -160,15 +152,7 @@ public void removeConfig(AbstractInterfaceConfig config, String overridePath, Ch
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
List<ConfigListener> configListeners = configListenerMap.get(config);
if (CommonUtils.isNotEmpty(configListeners)) {
//转换子节点Data为IP级配置<配置属性名,注册属性值>,例如<timeout,200>
Map<String, String> attribute = ZookeeperRegistryHelper.convertOverrideToAttribute(overridePath, data,
true, registerConfig);
for (ConfigListener listener : configListeners) {
listener.attrUpdated(attribute);
}
}
notifyListeners(config, overridePath, data, true, registerConfig);
}
}

Expand All @@ -187,18 +171,23 @@ public void addConfig(AbstractInterfaceConfig config, String overridePath, Child
}
} else {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive data: path=[" + data.getPath() + "]"
LOGGER.infoWithApp(config.getAppName(), "Receive add data: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
List<ConfigListener> configListeners = configListenerMap.get(config);
if (CommonUtils.isNotEmpty(configListeners)) {
//转换子节点Data为IP级配置<配置属性名,配置属性值>,例如<timeout,200>
Map<String, String> attribute = ZookeeperRegistryHelper.convertOverrideToAttribute(overridePath, data,
false, null);
for (ConfigListener listener : configListeners) {
listener.attrUpdated(attribute);
}
notifyListeners(config, overridePath, data, false, null);
}
}

private void notifyListeners(AbstractInterfaceConfig config, String overridePath, ChildData data,
boolean removeType, AbstractInterfaceConfig interfaceConfig) throws Exception {
List<ConfigListener> configListeners = configListenerMap.get(config);
if (CommonUtils.isNotEmpty(configListeners)) {
//转换子节点Data为IP级配置<配置属性名,配置属性值>,例如<timeout,200>
Map<String, String> attribute = ZookeeperRegistryHelper.convertOverrideToAttribute(overridePath, data,
removeType, interfaceConfig);
for (ConfigListener listener : configListeners) {
listener.attrUpdated(attribute);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -46,7 +45,7 @@ public class ZookeeperProviderObserver extends AbstractZookeeperObserver {
* slf4j Logger for this class
*/
private final static Logger LOGGER = LoggerFactory
.getLogger(ZookeeperConfigObserver.class);
.getLogger(ZookeeperProviderObserver.class);

/**
* The Provider add listener map.
Expand Down Expand Up @@ -74,61 +73,81 @@ public void removeProviderListener(ConsumerConfig consumerConfig) {
providerListenerMap.remove(consumerConfig);
}

public void updateProvider(ConsumerConfig config, String providerPath, ChildData data)
/**
* Update Provider
*
* @param config ConsumerConfig
* @param providerPath Provider path of zookeeper
* @param data Event data
* @param currentData provider data list
* @throws UnsupportedEncodingException decode error
*/
public void updateProvider(ConsumerConfig config, String providerPath, ChildData data, List<ChildData> currentData)
throws UnsupportedEncodingException {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive update provider: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
List<ProviderInfoListener> providerInfoListeners = providerListenerMap.get(config);
if (CommonUtils.isNotEmpty(providerInfoListeners)) {
List<ProviderInfo> providerInfos = Arrays.asList(
ZookeeperRegistryHelper.convertUrlToProvider(providerPath, data));
for (ProviderInfoListener listener : providerInfoListeners) {
List<ProviderInfo> providerInfosForProtocol = filterByProtocol(config, providerInfos);
listener.addProvider(new ProviderGroup(providerInfosForProtocol));
}
+ ", stat=[" + data.getStat() + "]" + ", list=[" + currentData.size() + "]");
}
notifyListeners(config, providerPath, currentData, false);
}

public void removeProvider(ConsumerConfig config, String providerPath, ChildData data)
/**
* Remove Provider
*
* @param config ConsumerConfig
* @param providerPath Provider path of zookeeper
* @param data Event data
* @param currentData provider data list
* @throws UnsupportedEncodingException decode error
*/
public void removeProvider(ConsumerConfig config, String providerPath, ChildData data, List<ChildData> currentData)
throws UnsupportedEncodingException {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive remove provider: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
List<ProviderInfoListener> providerInfoListeners = providerListenerMap.get(config);
if (CommonUtils.isNotEmpty(providerInfoListeners)) {
List<ProviderInfo> providerInfos = Arrays.asList(
ZookeeperRegistryHelper.convertUrlToProvider(providerPath, data));
List<ProviderInfo> providerInfosForProtocol = filterByProtocol(config, providerInfos);
for (ProviderInfoListener listener : providerInfoListeners) {
listener.removeProvider(new ProviderGroup(providerInfosForProtocol));
}
+ ", stat=[" + data.getStat() + "]" + ", list=[" + currentData.size() + "]");
}
notifyListeners(config, providerPath, currentData, false);
}

public void addProvider(ConsumerConfig config, String providerPath, ChildData data)
/**
* Add provider
*
* @param config ConsumerConfig
* @param providerPath Provider path of zookeeper
* @param data Event data
* @param currentData provider data list
* @throws UnsupportedEncodingException decode error
*/
public void addProvider(ConsumerConfig config, String providerPath, ChildData data, List<ChildData> currentData)
throws UnsupportedEncodingException {
if (LOGGER.isInfoEnabled(config.getAppName())) {
LOGGER.infoWithApp(config.getAppName(), "Receive add provider: path=[" + data.getPath() + "]"
+ ", data=[" + StringSerializer.decode(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
+ ", stat=[" + data.getStat() + "]" + ", list=[" + currentData.size() + "]");
}
notifyListeners(config, providerPath, currentData, true);
}

private void notifyListeners(ConsumerConfig config, String providerPath, List<ChildData> currentData, boolean add)
throws UnsupportedEncodingException {
List<ProviderInfoListener> providerInfoListeners = providerListenerMap.get(config);
if (CommonUtils.isNotEmpty(providerInfoListeners)) {
List<ProviderInfo> providerInfos = Arrays.asList(
ZookeeperRegistryHelper.convertUrlToProvider(providerPath, data));
List<ProviderInfo> providerInfos = ZookeeperRegistryHelper.convertUrlsToProviders(providerPath,
currentData);
List<ProviderInfo> providerInfosForProtocol = filterByProtocol(config, providerInfos);
for (ProviderInfoListener listener : providerInfoListeners) {
List<ProviderInfo> providerInfosForProtocol = filterByProtocol(config, providerInfos);
listener.addProvider(new ProviderGroup(providerInfosForProtocol));
if (add) {
listener.addProvider(new ProviderGroup(providerInfosForProtocol));
} else {
listener.updateProviders(new ProviderGroup(providerInfosForProtocol));
}
}
}
}

static List<ProviderInfo> filterByProtocol(ConsumerConfig consumerConfig, List<ProviderInfo> providerInfos) {
private List<ProviderInfo> filterByProtocol(ConsumerConfig consumerConfig, List<ProviderInfo> providerInfos) {
String protocol = consumerConfig.getProtocol();
List<ProviderInfo> result = new ArrayList<ProviderInfo>();
for (ProviderInfo providerInfo : providerInfos) {
Expand Down
Loading

0 comments on commit a8eec7b

Please sign in to comment.