Skip to content

Commit

Permalink
[update] add running check and close pipelines in BaseAssemble,update
Browse files Browse the repository at this point in the history
the constructor also
  • Loading branch information
brianway committed Dec 15, 2016
1 parent 2ef9daa commit f5e2e6d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@
/**
* Created by brian on 16/12/14.
*/
public class ZhihuElasticsearchUploader extends ElasticsearchUploader implements OutPipeline<Document> {
public class ZhihuElasticsearchUploader extends ElasticsearchUploader implements OutPipeline<Document>, AutoCloseable {

private String index;
private String type;

private long awaitTime = 1;
private TimeUnit timeUnit = TimeUnit.MINUTES;

private AtomicLong count = new AtomicLong(0);

public ZhihuElasticsearchUploader(String index, String type) {
Expand All @@ -38,29 +42,31 @@ public AtomicLong getCount() {
return count;
}

@Override
public void close() throws InterruptedException {
bulkProcessor.awaitClose(awaitTime, timeUnit);
}

public void setTimeout(long awaitTime, TimeUnit timeUnit) {
this.awaitTime = awaitTime;
this.timeUnit = timeUnit;
}

public static void main(String[] args) {
String index = "zhihu";
String type = "user";
//ZhihuElasticsearchUploader uploader = new ZhihuElasticsearchUploader(index,type);

//String folder = "/Users/brian/todo/data/webmagic/www.zhihu.com";
String folder = "/Users/brian/Desktop/zhihu/20161124/www.zhihu.com";

ZhihuElasticsearchUploader outPipeline = new ZhihuElasticsearchUploader(index, type);

BaseAssembler.<File, Document>create()
.setRawInput(new FileRawInput(folder))
BaseAssembler.<File, Document>create(new FileRawInput(folder), new ZhihuUserDataProcessor())
.addOutPipeline(outPipeline)
.setDataProcessor(new ZhihuUserDataProcessor())
.thread(10)
.run();

System.out.println("out sent :" + outPipeline.getCount());
//outPipeline.bulkProcessor.flush();
try {
outPipeline.bulkProcessor.awaitClose(1, TimeUnit.MINUTES);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(outPipeline.bulkProcessor);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -30,15 +31,32 @@ public class BaseAssembler<IN, OUT> {

protected AtomicLong outItemCount = new AtomicLong(0);

protected AtomicInteger stat = new AtomicInteger(STAT_INIT);

protected final static int STAT_INIT = 0;

protected final static int STAT_RUNNING = 1;

protected final static int STAT_STOPPED = 2;

private final AtomicLong inItemCount = new AtomicLong(0);

/**
* 工厂方法
*
* @param <IN> 输入队列的类型参数
* @param <OUT> 输出队列的类型参数
* @return 组装类的实例
*/
public static <IN, OUT> BaseAssembler<IN, OUT> create() {
return new BaseAssembler<>();
public static <IN, OUT> BaseAssembler<IN, OUT> create(
RawInput<IN> rawInput,
DataProcessor<IN, OUT> dataProcessor) {
return new BaseAssembler<>(rawInput, dataProcessor);
}

public BaseAssembler(RawInput<IN> rawInput, DataProcessor<IN, OUT> dataProcessor) {
this.rawInput = rawInput;
this.dataProcessor = dataProcessor;
}

protected void initComponent() {
Expand All @@ -47,7 +65,6 @@ protected void initComponent() {
}

if (threadPool == null || threadPool.isShutdown()) {
//threadPool = Executors.newFixedThreadPool(threadNum);
threadPool = new CountableThreadPool(threadNum);
}

Expand All @@ -59,8 +76,9 @@ protected void initComponent() {
public void run() {
long startTime = System.currentTimeMillis();

checkRunningStat();
initComponent();
while (!Thread.currentThread().isInterrupted()) {
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
final IN inItem = rawInput.poll();
if (inItem == null) {
if (threadPool.getThreadAlive() == 0) {
Expand All @@ -75,14 +93,17 @@ public void run() {
} catch (Exception e) {
logger.error("error: " + inItem, e);
} finally {

inItemCount.incrementAndGet();
}
}
});
}
}
stat.set(STAT_STOPPED);
logger.info("Process end");
threadPool.shutdown();
// release some resources
close();

long endTime = System.currentTimeMillis();
//logger.info("Total time: {}", endTime - startTime);
System.out.println("Total time: " + (endTime - startTime));
Expand All @@ -94,35 +115,63 @@ protected void processInItem(IN inItem) {
if (outItems == null) {
return;
}

outItemCount.addAndGet(outItems.size());

for (OutPipeline<OUT> outPipeline : outPipelines) {
outPipeline.process(outItems);
}
}
// protected DataFlow<OUT> route(IN inItem) {
// int h;
// int hash = (inItem == null) ? 0 : (h = inItem.hashCode()) ^ (h >>> 16);
// int index = (outQueues.size() - 1) & hash;
// logger.debug("index: {}", index);
// return outQueues.get(index);
// }

public BaseAssembler<IN, OUT> setRawInput(RawInput<IN> rawInput) {
this.rawInput = rawInput;
return this;

private void checkRunningStat() {
while (true) {
int statNow = stat.get();
if (statNow == STAT_RUNNING) {
throw new IllegalStateException("Assembler is already running!");
}
if (stat.compareAndSet(statNow, STAT_RUNNING)) {
break;
}
}
}

protected void checkIfRunning() {
if (stat.get() == STAT_RUNNING) {
throw new IllegalStateException("Assembler is already running!");
}
}

public void close() {
destroyEach(dataProcessor);
for (OutPipeline<OUT> outPipeline : outPipelines) {
destroyEach(outPipeline);
}
threadPool.shutdown();
}

private void destroyEach(Object object) {
if (object instanceof AutoCloseable) {
try {
((AutoCloseable) object).close();
} catch (Exception e) {
logger.warn("destroyEach: {}", e);
}
}
}

public BaseAssembler<IN, OUT> thread(int threadNum) {
this.threadNum = threadNum;
return this;
}

public BaseAssembler<IN, OUT> setDataProcessor(DataProcessor<IN, OUT> dataProcessor) {
this.dataProcessor = dataProcessor;
public BaseAssembler<IN, OUT> setOutPipelines(List<OutPipeline<OUT>> outPipelines) {
checkIfRunning();
this.outPipelines = outPipelines;
return this;
}

public BaseAssembler<IN, OUT> addOutPipeline(OutPipeline<OUT> outPipeline) {
checkIfRunning();
this.outPipelines.add(outPipeline);
return this;
}
Expand All @@ -137,10 +186,9 @@ public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
BaseAssembler.<File, String>create()
.setRawInput(new FileRawInput(folder))
BaseAssembler.<File, String>create(
new FileRawInput(folder), new DemoDataProcessor())
.addOutPipeline(outPipeline)
.setDataProcessor(new DemoDataProcessor())
.thread(10)
.run();
}
Expand Down

0 comments on commit f5e2e6d

Please sign in to comment.