Skip to content

Commit

Permalink
Fix zookeeper cas failed when create (apache#11036)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlbumenJ authored Nov 28, 2022
1 parent b860358 commit 4c52ac0
Show file tree
Hide file tree
Showing 9 changed files with 767 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void doClose() throws Exception {

@Override
protected boolean doPublishConfig(String pathKey, String content) throws Exception {
zkClient.create(pathKey, content, false);
zkClient.createOrUpdate(pathKey, content, false);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void doStoreConsumerMetadata(MetadataIdentifier consumerMetadataIdenti

@Override
protected void doSaveMetadata(ServiceMetadataIdentifier metadataIdentifier, URL url) {
zkClient.create(getNodePath(metadataIdentifier), URL.encode(url.toFullString()), false);
zkClient.createOrUpdate(getNodePath(metadataIdentifier), URL.encode(url.toFullString()), false);
}

@Override
Expand All @@ -116,7 +116,7 @@ protected List<String> doGetExportedURLs(ServiceMetadataIdentifier metadataIdent

@Override
protected void doSaveSubscriberData(SubscriberMetadataIdentifier subscriberMetadataIdentifier, String urls) {
zkClient.create(getNodePath(subscriberMetadataIdentifier), urls, false);
zkClient.createOrUpdate(getNodePath(subscriberMetadataIdentifier), urls, false);
}

@Override
Expand All @@ -130,7 +130,7 @@ public String getServiceDefinition(MetadataIdentifier metadataIdentifier) {
}

private void storeMetadata(MetadataIdentifier metadataIdentifier, String v) {
zkClient.create(getNodePath(metadataIdentifier), v, false);
zkClient.createOrUpdate(getNodePath(metadataIdentifier), v, false);
}

String getNodePath(BaseMetadataIdentifier metadataIdentifier) {
Expand All @@ -141,7 +141,7 @@ String getNodePath(BaseMetadataIdentifier metadataIdentifier) {
public void publishAppMetadata(SubscriberMetadataIdentifier identifier, MetadataInfo metadataInfo) {
String path = getNodePath(identifier);
if (StringUtils.isBlank(zkClient.getContent(path)) && StringUtils.isNotEmpty(metadataInfo.getContent())) {
zkClient.create(path, metadataInfo.getContent(), false);
zkClient.createOrUpdate(path, metadataInfo.getContent(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private void checkDestroyed() {
public void doRegister(URL url) {
try {
checkDestroyed();
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true), false);
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
Expand Down Expand Up @@ -193,7 +193,7 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
}
});

zkClient.create(root, false);
zkClient.create(root, false, true);

List<String> services = zkClient.addChildListener(root, zkListener);
if (CollectionUtils.isNotEmpty(services)) {
Expand Down Expand Up @@ -227,7 +227,7 @@ public void doSubscribe(final URL url, final NotifyListener listener) {
}

// create "directories".
zkClient.create(path, false);
zkClient.create(path, false, true);

// Add children (i.e. service items).
List<String> children = zkClient.addChildListener(path, zkListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,8 @@ public void delete(String path) {
deletePath(path);
}


@Override
public void create(String path, boolean ephemeral) {
public void create(String path, boolean ephemeral, boolean faultTolerant) {
if (!ephemeral) {
if (persistentExistNodePath.contains(path)) {
return;
Expand All @@ -81,12 +80,12 @@ public void create(String path, boolean ephemeral) {
}
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
create(path.substring(0, i), false, true);
}
if (ephemeral) {
createEphemeral(path);
createEphemeral(path, faultTolerant);
} else {
createPersistent(path);
createPersistent(path, faultTolerant);
persistentExistNodePath.add(path);
}
}
Expand Down Expand Up @@ -160,32 +159,29 @@ public void close() {
closed = true;
try {
doClose();
} catch (Throwable t) {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", t.getMessage(), t);
} catch (Exception e) {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", e.getMessage(), e);
}
}

@Override
public void create(String path, String content, boolean ephemeral) {
if (checkExists(path)) {
delete(path);
}
public void createOrUpdate(String path, String content, boolean ephemeral) {
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
create(path.substring(0, i), false, true);
}
if (ephemeral) {
createEphemeral(path, content);
createOrUpdateEphemeral(path, content);
} else {
createPersistent(path, content);
createOrUpdatePersistent(path, content);
}
}

@Override
public void createOrUpdate(String path, String content, boolean ephemeral, int version) {
int i = path.lastIndexOf('/');
if (i > 0) {
create(path.substring(0, i), false);
create(path.substring(0, i), false, true);
}
if (ephemeral) {
createOrUpdateEphemeral(path, content, version);
Expand All @@ -212,16 +208,22 @@ protected void doClose() {
stateListeners.clear();
}

protected abstract void createPersistent(String path);
protected abstract void createPersistent(String path, boolean faultTolerant);

protected abstract void createEphemeral(String path);
protected abstract void createEphemeral(String path, boolean faultTolerant);

protected abstract void createPersistent(String path, String data);
protected abstract void createPersistent(String path, String data, boolean faultTolerant);

protected abstract void createEphemeral(String path, String data);
protected abstract void createEphemeral(String path, String data, boolean faultTolerant);

protected abstract void update(String path, String data, int version);

protected abstract void update(String path, String data);

protected abstract void createOrUpdatePersistent(String path, String data);

protected abstract void createOrUpdateEphemeral(String path, String data);

protected abstract void createOrUpdatePersistent(String path, String data, int version);

protected abstract void createOrUpdateEphemeral(String path, String data, int version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ public interface ZookeeperClient {
*
* @param path path to ZNode
* @param ephemeral specify create mode of ZNode creation. true - EPHEMERAL, false - PERSISTENT.
* @param faultTolerant specify fault tolerance of ZNode creation.
* true - ignore exception and recreate if is ephemeral, false - throw exception.
*/
void create(String path, boolean ephemeral);
void create(String path, boolean ephemeral, boolean faultTolerant);

/**
* Delete ZNode.
Expand Down Expand Up @@ -92,14 +94,22 @@ public interface ZookeeperClient {
URL getUrl();

/**
* Create ZNode in Zookeeper with content specified.
* Create or update ZNode in Zookeeper with content specified.
*
* @param path path to ZNode
* @param content the content of ZNode
* @param ephemeral specify create mode of ZNode creation. true - EPHEMERAL, false - PERSISTENT.
*/
void create(String path, String content, boolean ephemeral);
void createOrUpdate(String path, String content, boolean ephemeral);

/**
* CAS version to Create or update ZNode in Zookeeper with content specified.
*
* @param path path to ZNode
* @param content the content of ZNode
* @param ephemeral specify create mode of ZNode creation. true - EPHEMERAL, false - PERSISTENT.
* @param ticket origin content version, if current version is not the specified version, throw exception
*/
void createOrUpdate(String path, String content, boolean ephemeral, int ticket);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,60 +113,79 @@ public List<ACL> getAclForPath(String path) {
}

@Override
public void createPersistent(String path) {
public void createPersistent(String path, boolean faultTolerant) {
try {
client.create().forPath(path);
} catch (NodeExistsException e) {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "ZNode " + path + " already exists.", e);
if (!faultTolerant) {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "ZNode " + path + " already exists.", e);
throw new IllegalStateException(e.getMessage(), e);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

@Override
public void createEphemeral(String path) {
public void createEphemeral(String path, boolean faultTolerant) {
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (NodeExistsException e) {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.", e);
deletePath(path);
createEphemeral(path);
if (faultTolerant) {
logger.info("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.");
deletePath(path);
createEphemeral(path, true);
} else {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "ZNode " + path + " already exists.", e);
throw new IllegalStateException(e.getMessage(), e);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

@Override
protected void createPersistent(String path, String data) {
protected void createPersistent(String path, String data, boolean faultTolerant) {
byte[] dataBytes = data.getBytes(CHARSET);
try {
client.create().forPath(path, dataBytes);
} catch (NodeExistsException e) {
try {
client.setData().forPath(path, dataBytes);
} catch (Exception e1) {
throw new IllegalStateException(e.getMessage(), e1);
if (faultTolerant) {
logger.info("ZNode " + path + " already exists. Will be override with new data.");
try {
client.setData().forPath(path, dataBytes);
} catch (Exception e1) {
throw new IllegalStateException(e.getMessage(), e1);
}
} else {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "ZNode " + path + " already exists.", e);
throw new IllegalStateException(e.getMessage(), e);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

@Override
protected void createEphemeral(String path, String data) {
protected void createEphemeral(String path, String data, boolean faultTolerant) {
byte[] dataBytes = data.getBytes(CHARSET);
try {
client.create().withMode(CreateMode.EPHEMERAL).forPath(path, dataBytes);
} catch (NodeExistsException e) {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.", e);
deletePath(path);
createEphemeral(path, data);
if (faultTolerant) {
logger.info("ZNode " + path + " already exists, since we will only try to recreate a node on a session expiration" +
", this duplication might be caused by a delete delay from the zk server, which means the old expired session" +
" may still holds this ZNode and the server just hasn't got time to do the deletion. In this case, " +
"we can just try to delete and create again.");
deletePath(path);
createEphemeral(path, data, true);
} else {
logger.warn(REGISTRY_ZOOKEEPER_EXCEPTION, "", "", "ZNode " + path + " already exists.", e);
throw new IllegalStateException(e.getMessage(), e);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
Expand All @@ -182,13 +201,51 @@ protected void update(String path, String data, int version) {
}
}

@Override
protected void update(String path, String data) {
byte[] dataBytes = data.getBytes(CHARSET);
try {
client.setData().forPath(path, dataBytes);
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}

@Override
protected void createOrUpdatePersistent(String path, String data) {
try {
if (checkExists(path)) {
update(path, data);
} else {
createPersistent(path, data, true);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}

}

@Override
protected void createOrUpdateEphemeral(String path, String data) {
try {
if (checkExists(path)) {
update(path, data);
} else {
createEphemeral(path, data, true);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}

}

@Override
protected void createOrUpdatePersistent(String path, String data, int version) {
try {
if (checkExists(path)) {
update(path, data, version);
} else {
createPersistent(path, data);
createPersistent(path, data, false);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
Expand All @@ -201,7 +258,7 @@ protected void createOrUpdateEphemeral(String path, String data, int version) {
if (checkExists(path)) {
update(path, data, version);
} else {
createEphemeral(path, data);
createEphemeral(path, data, false);
}
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
Expand Down
Loading

0 comments on commit 4c52ac0

Please sign in to comment.