Skip to content

Commit

Permalink
增加共享和单例概念,提供fast-failed模式
Browse files Browse the repository at this point in the history
  • Loading branch information
monstercodings committed Sep 8, 2020
1 parent 5241c83 commit 48dd8e9
Show file tree
Hide file tree
Showing 34 changed files with 658 additions and 568 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>top.codings</groupId>
<artifactId>websiphon-light</artifactId>
<version>0.0.100</version>
<version>0.1.0</version>

<parent>
<artifactId>root-pom</artifactId>
Expand Down Expand Up @@ -70,7 +70,7 @@
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<artifactId>slf4j-log4j12</artifactId>
<version>2.0.0-alpha1</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ public abstract class CombineCrawler implements ICrawler {
protected CrawlerConfig config;

@Override
public CompletableFuture<ICrawler> startup() {
public CompletableFuture<? extends ICrawler> startup() {
if (next != null) return next.startup();
throw new FrameworkException("非代理爬虫必须实现自身方法");
}

@Override
public CompletableFuture<ICrawler> shutdown() {
public CompletableFuture<? extends ICrawler> shutdown() {
if (next != null) return next.shutdown();
throw new FrameworkException("非代理爬虫必须实现自身方法");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ public interface ICrawler {
/**
* 启动爬虫
*/
CompletableFuture<ICrawler> startup();
CompletableFuture<? extends ICrawler> startup();

/**
* 关闭爬虫
*/
CompletableFuture<ICrawler> shutdown();
CompletableFuture<? extends ICrawler> shutdown();

/**
* 将任务推送给爬虫
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import top.codings.websiphon.light.error.FrameworkException;
import top.codings.websiphon.light.function.handler.IResponseHandler;
import top.codings.websiphon.light.function.handler.QueueResponseHandler;
import top.codings.websiphon.light.function.ComponentCloseAware;
import top.codings.websiphon.light.function.ComponentInitAware;
import top.codings.websiphon.light.requester.IRequest;
import top.codings.websiphon.light.requester.IRequester;
import top.codings.websiphon.light.requester.support.CombineRequester;
Expand All @@ -16,6 +18,7 @@
import java.net.Proxy;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Slf4j
public class BaseCrawler extends CombineCrawler {
Expand Down Expand Up @@ -123,51 +126,77 @@ public boolean isRunning() {
}

@Override
public CompletableFuture<ICrawler> startup() {
if (begin) {
throw new FrameworkException("爬虫正在执行启动/关闭操作,请勿重复执行");
}
synchronized (this) {
if (begin) {
throw new FrameworkException("爬虫正在执行启动/关闭操作,请勿重复执行");
public CompletableFuture<? extends ICrawler> startup() {
return CompletableFuture.supplyAsync(() -> {
if (!stop) {
throw new FrameworkException("爬虫已启动");
}
begin = true;
}
CompletableFuture<ICrawler> completableFuture = CompletableFuture.completedFuture(this);
if (responseHandler instanceof QueueResponseHandler) {
// 启动响应处理器
completableFuture = completableFuture.thenCombineAsync(((QueueResponseHandler) responseHandler).startup(this), (crawler, iResponseHandler) -> crawler);
}
return completableFuture
.thenCombineAsync(getRequester().init(), (crawler, o) -> crawler)
.whenCompleteAsync((o, o2) -> stop = begin = false);
synchronized (this) {
if (begin) {
throw new FrameworkException("爬虫正在执行启动/关闭操作,请勿重复执行");
}
begin = true;
}
if (responseHandler instanceof ComponentInitAware) {
// 启动响应处理器
try {
((ComponentInitAware) responseHandler).init(this);
} catch (Exception e) {
throw new FrameworkException("初始化响应管理器失败", e);
}
}
IRequester requester = getRequester();
if (requester instanceof ComponentInitAware) {
try {
((ComponentInitAware) requester).init(this);
} catch (Exception e) {
throw new FrameworkException("初始化请求器失败", e);
}
}
return this;
}).whenCompleteAsync((baseCrawler, throwable) -> {
stop = begin = false;
if (throwable != null) {
try {
shutdown().get();
} catch (Exception e) {
}
}
});
}

@Override
public CompletableFuture<ICrawler> shutdown() {
if (begin) {
throw new FrameworkException("爬虫正在执行启动/关闭操作,请勿重复执行");
}
synchronized (this) {
if (begin) {
throw new FrameworkException("爬虫正在执行启动/关闭操作,请勿重复执行");
return CompletableFuture.supplyAsync(() -> {
if (stop) {
throw new FrameworkException("爬虫已关闭");
}
begin = true;
}
Optional.ofNullable(config.getShutdownHook()).ifPresent(action -> action.accept(this.wrapper()));
CompletableFuture<ICrawler> completableFuture = CompletableFuture.completedFuture(this.wrapper());
boolean force = true;
IRequester requester = getRequester();
if (requester != null) {
completableFuture = completableFuture.thenCombineAsync(requester.shutdown(force), (crawler, o) -> crawler);
}
if (null != responseHandler) {
if (responseHandler instanceof QueueResponseHandler) {
completableFuture = completableFuture.thenCombineAsync(((QueueResponseHandler) responseHandler).shutdown(force), (crawler, iResponseHandler) -> crawler);
synchronized (this) {
if (begin) {
throw new FrameworkException("爬虫正在执行启动/关闭操作,请勿重复执行");
}
begin = true;
}
}
stop = true;
begin = false;
return completableFuture;
IRequester requester = getRequester();
if (requester != null && requester instanceof ComponentCloseAware) {
try {
((ComponentCloseAware) requester).close();
} catch (Exception e) {
throw new FrameworkException("关闭请求器失败", e);
}
}
if (null != responseHandler && responseHandler instanceof ComponentCloseAware) {
try {
((ComponentCloseAware) responseHandler).close();
} catch (Exception e) {
throw new FrameworkException("关闭响应管理器失败", e);
}
}
Optional.ofNullable(config.getShutdownHook()).ifPresent(action -> action.accept(this.wrapper()));
stop = true;
begin = false;
return this.wrapper();
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ protected void doProxy() {
taskTimeoutMillis,
timeoutHandler
);
requester.setCrawler(this);
requester.setLimitMemory(limitMemory);
setRequester(requester);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ public RegistryCrawler(RegistryConfig registryConfig) {
}

@Override
public CompletableFuture<ICrawler> startup() {
CompletableFuture<ICrawler> completableFuture = super.startup();
public CompletableFuture<? extends ICrawler> startup() {
CompletableFuture<? extends ICrawler> completableFuture = super.startup();
if (registryConfig.isEnabled()) {
registry.setConfig(config, registryConfig);
registry.setCrawler(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package top.codings.websiphon.light.function;

public interface ComponentCloseAware {
void close() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package top.codings.websiphon.light.function.processor;
package top.codings.websiphon.light.function;

import top.codings.websiphon.light.crawler.ICrawler;
import top.codings.websiphon.light.error.StopHandlErrorException;
import top.codings.websiphon.light.requester.IRequest;

public interface ProcessErrorAware {
void doOnError(IRequest request, Throwable throwable, ICrawler crawler) throws StopHandlErrorException;
public interface ComponentErrorAware {
void doOnError(Throwable throwable, IRequest request, ICrawler crawler) throws StopHandlErrorException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package top.codings.websiphon.light.function;

public interface ComponentInitAware<T> {
void init(T t) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,32 @@
package top.codings.websiphon.light.function.handler;

import top.codings.websiphon.light.config.CrawlerConfig;
import top.codings.websiphon.light.crawler.ICrawler;
import top.codings.websiphon.light.error.FrameworkException;
import top.codings.websiphon.light.function.ComponentInitAware;
import top.codings.websiphon.light.loader.anno.Shared;

public abstract class AbstractResponseHandler implements IResponseHandler {
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class AbstractResponseHandler implements IResponseHandler, ComponentInitAware<ICrawler> {
private transient volatile boolean init;
protected CrawlerConfig config;

@Override
public void setConfig(CrawlerConfig config) {
this.config = config;
}

@Override
public void init(ICrawler crawler) throws Exception {
synchronized (this) {
if (init && getClass().getDeclaredAnnotation(Shared.class) == null) {
throw new FrameworkException(String.format(
"[%s]非共享组件,如需使用单例供多个爬虫使用则需使用@Shared注解修饰该组件",
getClass().getName()
));
}
init = true;
}
}
}
Loading

0 comments on commit 48dd8e9

Please sign in to comment.