Skip to content

Commit

Permalink
upgrade docker image version
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Aug 10, 2024
1 parent a99fe2d commit f320acc
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public static void main(String[] args) {
Runtime.getRuntime().addShutdownHook(new Thread(consulContainer::close));
System.out.println("Embedded docker consul server starting...");
consulContainer.start();
Assertions.assertThat(consulContainer.isCreated()).isTrue();
Assertions.assertThat(consulContainer.isRunning()).isTrue();
Assertions.assertThat(consulContainer.getPortBindings()).hasSameElementsAs(PORT_BINDINGS);
Assertions.assertThat(consulContainer.getExposedPorts()).hasSameElementsAs(Arrays.asList(8500, 8502));
Assertions.assertThat(consulContainer.execInContainer("consul", "kv", "get", key).getStdout().trim()).isEqualTo(val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class EtcdClient implements Closeable {
*/
private final LoopThread healthCheckThread;

private final Map<String, Pair<Watch.Watcher, ChildChangedListener>> childWatchers = new ConcurrentHashMap<>();
private final Map<String, Pair<Watch.Watcher, EventListener>> watchers = new ConcurrentHashMap<>();

private final Set<ConnectionStateListener<EtcdClient>> connectionStateListeners = ConcurrentHashMap.newKeySet();

Expand Down Expand Up @@ -219,26 +219,27 @@ public List<String> getKeyChildren(String parentKey) throws Exception {

// ----------------------------------------------------------------watch

public synchronized void watchChildChanged(String parentKey, CountDownLatch latch, Consumer<List<String>> listener) {
if (childWatchers.containsKey(parentKey)) {
public synchronized void watch(String parentKey, Consumer<List<String>> listener) throws Exception {
if (watchers.containsKey(parentKey)) {
throw new IllegalStateException("Parent key already watched: " + parentKey);
}

childWatchers.computeIfAbsent(parentKey, key -> {
ChildChangedListener innerListener = new ChildChangedListener(key, latch, listener);
Watch.Watcher watcher = client.getWatchClient().watch(utf8(key), WATCH_PREFIX_OPTION, innerListener);
return Pair.of(watcher, innerListener);
});
CountDownLatch latch = new CountDownLatch(1);
try {
EventListener eventListener = new EventListener(parentKey, latch, listener);
Watch.Watcher watcher = client.getWatchClient().watch(utf8(parentKey), WATCH_PREFIX_OPTION, eventListener);
listener.accept(getKeyChildren(parentKey));
watchers.put(parentKey, Pair.of(watcher, eventListener));
} finally {
latch.countDown();
}
}

public synchronized boolean unwatchChildChanged(String parentKey) {
Pair<Watch.Watcher, ChildChangedListener> pair = childWatchers.remove(parentKey);
public synchronized void unwatch(String parentKey) {
Pair<Watch.Watcher, EventListener> pair = watchers.remove(parentKey);
if (pair != null) {
pair.getLeft().close();
pair.getRight().close();
return true;
} else {
return false;
ThrowingRunnable.doCaught(() -> pair.getLeft().close());
ThrowingRunnable.doCaught(() -> pair.getRight().close());
}
}

Expand All @@ -258,9 +259,10 @@ public boolean isConnected() {

@Override
public synchronized void close() {
new ArrayList<>(childWatchers.keySet()).forEach(this::unwatchChildChanged);
new ArrayList<>(watchers.keySet()).forEach(this::unwatch);
ThrowingRunnable.doCaught(() -> client.getWatchClient().close());
healthCheckThread.terminate();
client.close();
ThrowingRunnable.doCaught(client::close);
}

// ----------------------------------------------------------------private static classes & methods
Expand All @@ -269,10 +271,10 @@ private static ByteSequence utf8(String key) {
return ByteSequence.from(key, UTF_8);
}

private class ChildChangedListener implements Consumer<WatchResponse>, Closeable {
private class EventListener implements Consumer<WatchResponse> {
private final String parentKey;
private final CountDownLatch latch;
private final Consumer<List<String>> processor;
private final Consumer<List<String>> listener;

private final ThreadPoolExecutor asyncExecutor = ThreadPoolExecutors.builder()
.corePoolSize(1)
Expand All @@ -282,15 +284,14 @@ private class ChildChangedListener implements Consumer<WatchResponse>, Closeable
.rejectedHandler(ThreadPoolExecutors.DISCARD)
.build();

public ChildChangedListener(String parentKey, CountDownLatch latch, Consumer<List<String>> processor) {
EventListener(String parentKey, CountDownLatch latch, Consumer<List<String>> listener) {
this.parentKey = parentKey;
this.latch = latch;
this.processor = processor;
this.listener = listener;
}

@PreDestroy
@Override
public void close() {
void close() {
ThreadPoolExecutors.shutdown(asyncExecutor, 1);
}

Expand All @@ -303,13 +304,14 @@ public void accept(WatchResponse response) {
return;
}
if (events.stream().allMatch(e -> parentKey.equals(e.getKeyValue().getKey().toString(UTF_8)))) {
// 如果只有父节点的事件变化则跳过
return;
}

asyncExecutor.submit(() -> {
try {
List<String> children = getKeyChildren(parentKey);
processor.accept(children);
listener.accept(children);
} catch (Throwable t) {
LOG.error("Get key '" + parentKey + "' children occur error.", t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -76,15 +75,14 @@ protected EtcdServerRegistry(EtcdRegistryProperties config) {
super(config, '/');
this.ttl = config.getSessionTimeoutMs() / 2000;

CountDownLatch latch = new CountDownLatch(1);
EtcdClient client0 = null;
try {
client0 = new EtcdClient(config);
this.client = client0;

client.createPersistentKey(registryRootPath, PLACEHOLDER_VALUE);
createLeaseIdAndKeepAlive();
client.watchChildChanged(discoveryRootPath, latch, this::doRefreshDiscoveryServers);
client.watch(discoveryRootPath, this::doRefreshDiscoveryServers);

long periodMs = Math.max(ttl / 4, 1) * 1000;
this.keepAliveCheckThread = LoopThread.createStarted("etcd_keep_alive_check", periodMs, periodMs, this::keepAliveCheck);
Expand All @@ -99,8 +97,6 @@ protected EtcdServerRegistry(EtcdRegistryProperties config) {
client0.close();
}
throw new RegistryException("Etcd registry init error: " + config, e);
} finally {
latch.countDown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,21 @@
import java.util.concurrent.CountDownLatch;

/**
* <pre>
* Embedded etcd server base testcontainers and docker.
*
* io.etcd:jetcd-launcher:0.7.7
*
* docker pull gcr.io/etcd-development/etcd:v3.5.15
* </pre>
*
* <a href="https://github.com/etcd-io/etcd/releases">github官网查看版本</a>
*
* @author Ponfee
*/
public final class EmbeddedEtcdServerTestcontainers {

private static final String ETCD_DOCKER_IMAGE_NAME = "gcr.io/etcd-development/etcd:v3.5.12";
private static final String ETCD_DOCKER_IMAGE_NAME = "gcr.io/etcd-development/etcd:v3.5.15";
private static final List<String> PORT_BINDINGS = Arrays.asList("2379:2379/tcp", "2380:2380/tcp", "8080:8080/tcp");

public static void main(String[] args) throws Exception {
Expand All @@ -54,6 +58,8 @@ public static void main(String[] args) throws Exception {
System.out.println("Embedded docker etcd server starting...");
etcd.start();
Assertions.assertThat(etcd.containers()).hasSize(1);
Assertions.assertThat(etcd.containers().get(0).isCreated()).isTrue();
Assertions.assertThat(etcd.containers().get(0).isRunning()).isTrue();
Assertions.assertThat(etcd.containers().get(0).getPortBindings()).hasSameElementsAs(PORT_BINDINGS);
System.out.println("Embedded docker etcd server started!");

Expand Down
2 changes: 1 addition & 1 deletion disjob-registry/disjob-registry-nacos/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>2.3.3</version>
<version>2.4.0</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright 2022-2024 Ponfee (http://www.ponfee.cn/)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.ponfee.disjob.registry.nacos;

import cn.ponfee.disjob.common.exception.Throwables.ThrowingRunnable;
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.Event;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;

/**
* Nacos client
*
* @author Ponfee
*/
public class NacosClient {

private final NamingService namingService;
private final String groupName;
private final Map<String, Watcher> watchers = new ConcurrentHashMap<>();

public NacosClient(Properties config, String groupName) throws NacosException {
this.namingService = NacosFactory.createNamingService(config);
this.groupName = groupName;
}

public synchronized void watch(String serviceName, Consumer<List<Instance>> listener) throws NacosException {
if (watchers.containsKey(serviceName)) {
throw new IllegalStateException("Service name already watched: " + serviceName);
}

CountDownLatch latch = new CountDownLatch(1);
try {
Watcher watcher = new Watcher(listener, latch);
namingService.subscribe(serviceName, groupName, watcher);
listener.accept(namingService.selectInstances(serviceName, groupName, true));
watchers.put(serviceName, watcher);
} finally {
latch.countDown();
}
}

public synchronized void unwatch(String serviceName) {
Watcher watcher = watchers.remove(serviceName);
if (watcher != null) {
ThrowingRunnable.doCaught(() -> namingService.unsubscribe(serviceName, groupName, watcher));
}
}

public String getServerStatus() {
return namingService.getServerStatus();
}

public void registerInstance(String serviceName, Instance instance) throws NacosException {
namingService.registerInstance(serviceName, groupName, instance);
}

public void deregisterInstance(String serviceName, Instance instance) throws NacosException {
namingService.deregisterInstance(serviceName, groupName, instance);
}

public List<Instance> getAllInstances(String serviceName) throws NacosException {
return namingService.getAllInstances(serviceName, groupName);
}

public synchronized void close() {
new ArrayList<>(watchers.keySet()).forEach(this::unwatch);
ThrowingRunnable.doCaught(namingService::shutDown);
}

private static class Watcher implements EventListener {
private final Consumer<List<Instance>> listener;
private final CountDownLatch latch;

Watcher(Consumer<List<Instance>> listener, CountDownLatch latch) {
this.listener = listener;
this.latch = latch;
}

@Override
public void onEvent(Event event) {
if (event instanceof NamingEvent) {
ThrowingRunnable.doCaught(latch::await);
listener.accept(((NamingEvent) event).getInstances());
}
}
}

}
Loading

0 comments on commit f320acc

Please sign in to comment.