Skip to content

Commit

Permalink
Add a new todo item.
Browse files Browse the repository at this point in the history
Fixed the wrong sequence of thread synchronization. It caused messages left uncosumed before.
  • Loading branch information
liyanpeng5 committed Jan 5, 2018
1 parent 60cf0b1 commit 094f9df
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 2 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ Benchmark应该覆盖推送QPS,接收处理QPS以及单线程和多线程生
5. 实现消息发送端的可靠投递,在发送超时的时候,持久到数据库或者缓存,定时补偿发送
6. 实现Producer可以持久消息到数据库,有问题的时候,异步定时重发
7. InputConsumer和OutputProducer中的propertiesFile有些多余,我们需要把它迁移到KafkaHandlers中,或者支持多个地方配置。
8. InterruptedException和优雅关机的实现,不应该接受一个interrupt信号就退出

## 获得技术支持

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ public void startup() {
throw new IllegalStateException("The client has been started.");
}

status = Status.RUNNING;

log.info("Streams num: " + streams.size());
tasks = new ArrayList<AbstractMessageTask>();
for (KafkaStream<String, String> stream : streams) {
Expand All @@ -295,8 +297,6 @@ public void startup() {
tasks.add(abstractMessageTask);
streamThreadPool.execute(abstractMessageTask);
}

status = Status.RUNNING;
}

public void shutdownGracefully() {
Expand Down

0 comments on commit 094f9df

Please sign in to comment.