Skip to content

Commit

Permalink
use extra scheduler to handle health check.
Browse files Browse the repository at this point in the history
  • Loading branch information
coding4m committed Aug 14, 2019
1 parent cb0a571 commit 0fce096
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 6 deletions.
19 changes: 17 additions & 2 deletions src/main/java/fastdfs/client/FastdfsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
Expand All @@ -32,6 +33,8 @@ public final class FastdfsClient implements Closeable {
public static final long DEFAULT_CHECK_INTERVAL = 10000;

private final FastdfsExecutor executor;
private final FastdfsScheduler scheduler;

private final TrackerClient trackerClient;
private final StorageClient storageClient;

Expand All @@ -46,7 +49,8 @@ private FastdfsClient(Builder builder) {
);

this.executor = new FastdfsExecutor(settings);
this.trackerClient = new TrackerClient(executor, builder.selector, builder.trackers, builder.fall, builder.rise, builder.checkTimeout, builder.checkInterval);
this.scheduler = new FastdfsScheduler(new HashSet<>(builder.trackers).size());
this.trackerClient = new TrackerClient(executor, scheduler, builder.selector, builder.trackers, builder.fall, builder.rise, builder.checkTimeout, builder.checkInterval);
this.storageClient = new StorageClient(executor);
}

Expand Down Expand Up @@ -711,7 +715,18 @@ public CompletableFuture<FileInfo> infoGet(FileId fileId) {

@Override
public void close() throws IOException {
executor.close();
try {
executor.close();
} catch (Exception e) {
// do nothing;
}

try {
scheduler.close();
} catch (Exception e) {
// do nothing;
}

trackerClient.close();
}

Expand Down
47 changes: 47 additions & 0 deletions src/main/java/fastdfs/client/FastdfsScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package fastdfs.client;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

class FastdfsScheduler implements Closeable {
private final ScheduledExecutorService loop;

FastdfsScheduler(int threads) {
this.loop = Executors.newScheduledThreadPool(threads, new ThreadFactory() {
final String threadPrefix = "fastdfs-scheduler-";
final AtomicInteger threadNumber = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
return new Thread(null, r, threadPrefix + threadNumber.getAndIncrement());
}
});
}

ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return loop.schedule(command, delay, unit);
}

<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return loop.schedule(callable, delay, unit);
}

ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return loop.scheduleAtFixedRate(command, initialDelay, period, unit);
}

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return loop.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}

@Override
public void close() throws IOException {
try {
loop.shutdown();
} catch (Exception e) {
// do nothing.
}
}
}
4 changes: 2 additions & 2 deletions src/main/java/fastdfs/client/TrackerClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ final class TrackerClient implements Closeable {
private final TrackerSelector selector;
private final TrackerMonitor monitor;

TrackerClient(FastdfsExecutor executor, TrackerSelector selector, List<TrackerServer> servers, int fall, int rise, long checkTimeout, long checkInterval) {
TrackerClient(FastdfsExecutor executor, FastdfsScheduler scheduler, TrackerSelector selector, List<TrackerServer> servers, int fall, int rise, long checkTimeout, long checkInterval) {
this.executor = executor;
this.selector = servers.size() == 1 ? TrackerSelector.FIRST : selector;
this.monitor = new TrackerMonitor(executor, servers, fall, rise, checkTimeout, checkInterval);
this.monitor = new TrackerMonitor(executor, scheduler, servers, fall, rise, checkTimeout, checkInterval);
}

private CompletableFuture<InetSocketAddress> trackerSelect() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/fastdfs/client/TrackerMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ class TrackerMonitor implements Closeable {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Logger logger = LoggerFactory.getLogger(getClass());

TrackerMonitor(FastdfsExecutor executor, List<TrackerServer> servers, int fall, int rise, long checkTimeout, long checkInterval) {
TrackerMonitor(FastdfsExecutor executor, FastdfsScheduler scheduler, List<TrackerServer> servers, int fall, int rise, long checkTimeout, long checkInterval) {
this.fall = fall;
this.rise = rise;

this.aliveServers = new LinkedList<>(servers);
this.aliveTasks = new HashSet<>(servers).stream().map(server -> executor.scheduleAtFixedRate(() -> {
this.aliveTasks = new HashSet<>(servers).stream().map(server -> scheduler.scheduleAtFixedRate(() -> {
try {
CompletableFuture<Boolean> promise = executor.execute(server.toInetAddress(), new ActiveTestRequestor(), new ActiveTestReplier());
if (promise.get(checkTimeout, TimeUnit.MILLISECONDS)) {
Expand Down

0 comments on commit 0fce096

Please sign in to comment.