Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Jan 23, 2017
2 parents e5ff572 + b4179cc commit 80c5fea
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions mpush-zk/src/main/java/com/mpush/zk/ZKClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public class ZKClient extends BaseService {
private ZKConfig zkConfig;
private CuratorFramework client;
private TreeCache cache;
private Map<String, String> ephemeralNodes = new LinkedHashMap<>();
private Map<String, String> ephemeralNodes = new LinkedHashMap<>(4);
private Map<String, String> ephemeralSequentialNodes = new LinkedHashMap<>(1);

private synchronized static ZKClient I() {
return I == null ? new ZKClient() : I;
Expand Down Expand Up @@ -138,7 +139,8 @@ public List<ACL> getAclForPath(final String path) {
private void addConnectionStateListener() {
client.getConnectionStateListenable().addListener((cli, newState) -> {
if (newState == ConnectionState.RECONNECTED) {
ephemeralNodes.forEach(this::reRegisterEphemeralSequential);
ephemeralNodes.forEach(this::reRegisterEphemeral);
ephemeralSequentialNodes.forEach(this::reRegisterEphemeralSequential);
}
Logs.RSD.warn("zk connection state changed new state={}, isConnected={}", newState, newState.isConnected());
});
Expand Down Expand Up @@ -262,18 +264,39 @@ public void update(final String key, final String value) {
* @param key
* @param value
*/
public void registerEphemeral(final String key, final String value) {
public void registerEphemeral(final String key, final String value, boolean cacheNode) {
try {
if (isExisted(key)) {
client.delete().deletingChildrenIfNeeded().forPath(key);
}
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(key, value.getBytes(Constants.UTF_8));
if (cacheNode) ephemeralNodes.put(key, value);
} catch (Exception ex) {
Logs.RSD.error("persistEphemeral:{},{}", key, value, ex);
throw new ZKException(ex);
}
}

/**
* 注册临时数据
*
* @param key
* @param value
*/
public void reRegisterEphemeral(final String key, final String value) {
registerEphemeral(key, value, false);
}

/**
* 注册临时数据
*
* @param key
* @param value
*/
public void registerEphemeral(final String key, final String value) {
registerEphemeral(key, value, true);
}

/**
* 注册临时顺序数据
*
Expand All @@ -284,7 +307,7 @@ public void registerEphemeral(final String key, final String value) {
private void registerEphemeralSequential(final String key, final String value, boolean cacheNode) {
try {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(key, value.getBytes());
if (cacheNode) ephemeralNodes.put(key, value);
if (cacheNode) ephemeralSequentialNodes.put(key, value);
} catch (Exception ex) {
Logs.RSD.error("persistEphemeralSequential:{},{}", key, value, ex);
throw new ZKException(ex);
Expand Down

0 comments on commit 80c5fea

Please sign in to comment.