Skip to content

Commit

Permalink
Update curator & guava version (apache#6760)
Browse files Browse the repository at this point in the history
* Update curator & guava version

* Remove deprecated api for curator
  • Loading branch information
menghaoranss authored Aug 11, 2020
1 parent e42a68d commit 5857260
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 36 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.locale>zh_CN</project.build.locale>

<guava.version>18.0</guava.version>
<guava.version>29.0-jre</guava.version>
<gson.version>2.8.6</gson.version>
<slf4j.version>1.7.7</slf4j.version>

Expand All @@ -76,7 +76,7 @@
<jboss-logging.version>3.2.1.Final</jboss-logging.version>
<btm.version>2.1.3</btm.version>

<curator.version>2.10.0</curator.version>
<curator.version>5.1.0</curator.version>
<opentracing.version>0.30.0</opentracing.version>
<apollo.client.version>1.5.0</apollo.client.version>
<nacos.client.verison>1.1.4</nacos.client.verison>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
<artifactId>shardingsphere-orchestration-repository-etcd</artifactId>
<name>${project.artifactId}</name>

<properties>
<guava.version>20.0</guava.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.CuratorCache;
import org.apache.curator.framework.recipes.cache.CuratorCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.shardingsphere.orchestration.repository.api.ConfigurationRepository;
Expand All @@ -41,12 +42,14 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

Expand All @@ -55,7 +58,7 @@
*/
public final class CuratorZookeeperRepository implements ConfigurationRepository, RegistryRepository {

private final Map<String, TreeCache> caches = new HashMap<>();
private final Map<String, CuratorCache> caches = new HashMap<>();

private CuratorFramework client;

Expand Down Expand Up @@ -120,18 +123,18 @@ private void initCuratorClient(final ZookeeperProperties zookeeperProperties) {

@Override
public String get(final String key) {
TreeCache cache = findTreeCache(key);
CuratorCache cache = findTreeCache(key);
if (null == cache) {
return getDirectly(key);
}
ChildData resultInCache = cache.getCurrentData(key);
if (null != resultInCache) {
return null == resultInCache.getData() ? null : new String(resultInCache.getData(), Charsets.UTF_8);
Optional<ChildData> resultInCache = cache.get(key);
if (resultInCache.isPresent()) {
return null == resultInCache.get().getData() ? null : new String(resultInCache.get().getData(), Charsets.UTF_8);
}
return getDirectly(key);
}

private TreeCache findTreeCache(final String key) {
private CuratorCache findTreeCache(final String key) {
return caches.entrySet().stream().filter(entry -> key.startsWith(entry.getKey())).findFirst().map(Entry::getValue).orElse(null);
}

Expand Down Expand Up @@ -166,7 +169,8 @@ public void persist(final String key, final String value) {

private void update(final String key, final String value) {
try {
client.inTransaction().check().forPath(key).and().setData().forPath(key, value.getBytes(Charsets.UTF_8)).and().commit();
TransactionOp transactionOp = client.transactionOp();
client.transaction().forOperations(transactionOp.check().forPath(key), transactionOp.setData().forPath(key, value.getBytes(StandardCharsets.UTF_8)));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
Expand Down Expand Up @@ -229,21 +233,19 @@ public void watch(final String key, final DataChangedEventListener listener) {
if (!caches.containsKey(path)) {
addCacheData(key);
}
TreeCache cache = caches.get(path);
cache.getListenable().addListener((client, event) -> {
ChildData data = event.getData();
if (null == data || null == data.getPath()) {
return;
}
DataChangedEvent.ChangedType changedType = getChangedType(event);
CuratorCache cache = caches.get(path);
cache.listenable().addListener((type, oldData, data) -> {
String eventPath = CuratorCacheListener.Type.NODE_DELETED == type ? oldData.getPath() : data.getPath();
byte[] eventDataByte = CuratorCacheListener.Type.NODE_DELETED == type ? oldData.getData() : data.getData();
DataChangedEvent.ChangedType changedType = getChangedType(type);
if (ChangedType.IGNORED != changedType) {
listener.onChange(new DataChangedEvent(data.getPath(), null == data.getData() ? null : new String(data.getData(), Charsets.UTF_8), changedType));
listener.onChange(new DataChangedEvent(eventPath, null == eventDataByte ? null : new String(eventDataByte, Charsets.UTF_8), changedType));
}
});
}

private void addCacheData(final String cachePath) {
TreeCache cache = new TreeCache(client, cachePath);
CuratorCache cache = CuratorCache.build(client, cachePath);
try {
cache.start();
// CHECKSTYLE:OFF
Expand All @@ -254,13 +256,13 @@ private void addCacheData(final String cachePath) {
caches.put(cachePath + "/", cache);
}

private ChangedType getChangedType(final TreeCacheEvent event) {
switch (event.getType()) {
case NODE_ADDED:
private ChangedType getChangedType(final CuratorCacheListener.Type type) {
switch (type) {
case NODE_CREATED:
return ChangedType.ADDED;
case NODE_UPDATED:
case NODE_CHANGED:
return ChangedType.UPDATED;
case NODE_REMOVED:
case NODE_DELETED:
return ChangedType.DELETED;
default:
return ChangedType.IGNORED;
Expand All @@ -269,7 +271,7 @@ private ChangedType getChangedType(final TreeCacheEvent event) {

@Override
public void close() {
caches.values().forEach(TreeCache::close);
caches.values().forEach(CuratorCache::close);
waitForCacheClose();
CloseableUtils.closeQuietly(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void assertGetChildrenKeys() {
public void assertWatchUpdatedChangedType() throws InterruptedException {
REPOSITORY.persist("/test/children_updated/1", "value1");
AtomicReference<DataChangedEvent> dataChangedEventActual = new AtomicReference<>();
REPOSITORY.watch("/test/children_updated", dataChangedEventActual::set);
REPOSITORY.watch("/test/children_updated/1", dataChangedEventActual::set);
REPOSITORY.persist("/test/children_updated/1", "value2");
Thread.sleep(50L);
DataChangedEvent dataChangedEvent = dataChangedEventActual.get();
Expand All @@ -94,13 +94,13 @@ public void assertWatchUpdatedChangedType() throws InterruptedException {

@Test
public void assertWatchDeletedChangedType() throws Exception {
AtomicReference<DataChangedEvent> dataChangedEventActual = new AtomicReference<>();
REPOSITORY.watch("/test/children_deleted", dataChangedEventActual::set);
REPOSITORY.persist("/test/children_deleted/5", "value5");
Field field = CuratorZookeeperRepository.class.getDeclaredField("client");
field.setAccessible(true);
CuratorFramework client = (CuratorFramework) field.get(REPOSITORY);
AtomicReference<DataChangedEvent> dataChangedEventActual = new AtomicReference<>();
REPOSITORY.watch("/test/children_deleted/5", dataChangedEventActual::set);
client.delete().forPath("/test/children_deleted/5");
client.delete().deletingChildrenIfNeeded().forPath("/test/children_deleted/5");
Thread.sleep(50L);
DataChangedEvent dataChangedEvent = dataChangedEventActual.get();
assertNotNull(dataChangedEvent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Future<?> submit(final ShardingScalingExecutor shardingScalingExecutor) {
*/
public Future<?> submit(final ShardingScalingExecutor shardingScalingExecutor, final ExecuteCallback executeCallback) {
ListenableFuture<?> result = executorService.submit(shardingScalingExecutor);
Futures.addCallback(result, new ExecuteFutureCallback<>(executeCallback));
Futures.addCallback(result, new ExecuteFutureCallback<>(executeCallback), executorService);
return result;
}

Expand All @@ -79,7 +79,7 @@ public Future<?> submitAll(final Collection<? extends ShardingScalingExecutor> s
listenableFutures.add(listenableFuture);
}
ListenableFuture result = Futures.allAsList(listenableFutures);
Futures.addCallback(result, new ExecuteFutureCallback<Collection<Object>>(executeCallback));
Futures.addCallback(result, new ExecuteFutureCallback<Collection<Object>>(executeCallback), executorService);
return result;
}

Expand Down

0 comments on commit 5857260

Please sign in to comment.