Skip to content

Commit

Permalink
包结构与配置文件调整,自动重试功能,下载完FileChannel没关闭问题修复。
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie committed Dec 8, 2017
1 parent d75fa3f commit 7525dd6
Show file tree
Hide file tree
Showing 27 changed files with 477 additions and 402 deletions.
Binary file added effect/bdy.gif
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
55 changes: 0 additions & 55 deletions src/main/java/lee/study/HttpDownServer.java

This file was deleted.

89 changes: 89 additions & 0 deletions src/main/java/lee/study/down/HttpDownServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package lee.study.down;

import com.alibaba.fastjson.JSON;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import lee.study.down.dispatch.HttpDownErrorCheckTask;
import lee.study.down.dispatch.HttpDownProgressPushTask;
import lee.study.down.intercept.BdyBatchDownIntercept;
import lee.study.down.intercept.BdyIntercept;
import lee.study.down.intercept.HttpDownIntercept;
import lee.study.down.intercept.HttpDownSniffIntercept;
import lee.study.down.model.HttpDownInfo;
import lee.study.down.model.TaskInfo;
import lee.study.proxyee.intercept.CertDownIntercept;
import lee.study.proxyee.intercept.HttpProxyInterceptInitializer;
import lee.study.proxyee.intercept.HttpProxyInterceptPipeline;
import lee.study.proxyee.server.HttpProxyServer;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;

@SpringBootApplication
public class HttpDownServer implements InitializingBean {

public static final NioEventLoopGroup LOOP_GROUP = new NioEventLoopGroup(1);
public static final Bootstrap DOWN_BOOT = new Bootstrap().group(LOOP_GROUP)
.channel(NioSocketChannel.class);

public static final Map<String, HttpDownInfo> DOWN_CONTENT = new ConcurrentHashMap<>();
public static final Map<String, WebSocketSession> WS_CONTENT = new ConcurrentHashMap<>();

public static int VIEW_SERVER_PORT;

@Value("${view.server.port}")
private int viewServerPort;

@Override
public void afterPropertiesSet() throws Exception {
VIEW_SERVER_PORT = viewServerPort;
}

public static void sendMsg(String type, TaskInfo taskInfo) {
try {
for (Entry<String, WebSocketSession> entry : HttpDownServer.WS_CONTENT.entrySet()) {
WebSocketSession session = entry.getValue();
if (session.isOpen()) {
Map<String, Object> msg = new HashMap<>();
msg.put("type", type);
msg.put("taskInfo", taskInfo);
TextMessage message = new TextMessage(JSON.toJSONString(msg));
synchronized (session){
session.sendMessage(message);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

public static void start(int port) {
SpringApplication.run(HttpDownServer.class);
new HttpDownProgressPushTask().start();
new HttpDownErrorCheckTask().start();
//监听http下载请求
new HttpProxyServer().proxyInterceptInitializer(new HttpProxyInterceptInitializer() {
@Override
public void init(HttpProxyInterceptPipeline pipeline) {
pipeline.addLast(new CertDownIntercept());
pipeline.addLast(new BdyIntercept());
pipeline.addLast(new HttpDownSniffIntercept());
pipeline.addLast(new BdyBatchDownIntercept());
pipeline.addLast(new HttpDownIntercept());
}
}).start(port);
}

public static void main(String[] args) throws Exception {
start(9999);
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package lee.study.controller;
package lee.study.down.controller;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import lee.study.HttpDownServer;
import lee.study.down.HttpDown;
import lee.study.down.HttpDownCallback;
import lee.study.form.DownForm;
import lee.study.model.ChunkInfo;
import lee.study.model.HttpDownInfo;
import lee.study.model.TaskInfo;
import lee.study.util.HttpDownUtil;
import lee.study.ws.HttpDownProgressHandle;
import lee.study.down.HttpDownServer;
import lee.study.down.dispatch.HttpDownCallback;
import lee.study.down.form.DownForm;
import lee.study.down.model.ChunkInfo;
import lee.study.down.model.HttpDownInfo;
import lee.study.down.model.TaskInfo;
import lee.study.down.util.HttpDownUtil;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
Expand Down Expand Up @@ -58,24 +55,25 @@ public String startTask(@RequestBody DownForm downForm) {
ChunkInfo chunkInfo = new ChunkInfo();
chunkInfo.setIndex(i);
long chunkSize = taskInfo.getTotalSize() / downForm.getConnections();
chunkInfo.setStartPosition(i * chunkSize);
chunkInfo.setOriStartPosition(i * chunkSize);
chunkInfo.setNowStartPosition(chunkInfo.getOriStartPosition());
if (i == downForm.getConnections() - 1) { //最后一个连接去下载多出来的字节
chunkSize += taskInfo.getTotalSize() % downForm.getConnections();
}
chunkInfo.setEndPosition(chunkInfo.getStartPosition() + chunkSize - 1);
chunkInfo.setEndPosition(chunkInfo.getOriStartPosition() + chunkSize - 1);
chunkInfo.setTotalSize(chunkSize);
chunkInfoList.add(chunkInfo);
}
taskInfo.setChunkInfoList(chunkInfoList);
HttpDownUtil.serialize(httpDownModel,
/*HttpDownUtil.serialize(httpDownModel,
httpDownModel.getTaskInfo().getFilePath() + File.separator
+ httpDownModel.getTaskInfo().getFileName() + ".cfg");
HttpDown.fastDown(httpDownModel, new HttpDownCallback() {
+ httpDownModel.getTaskInfo().getFileName() + ".cfg");*/
HttpDownUtil.taskDown(httpDownModel, new HttpDownCallback() {

@Override
public void start(TaskInfo taskInfo) {
//标记为下载中并记录开始时间
HttpDownProgressHandle.sendMsg("start", taskInfo);
HttpDownServer.sendMsg("start", taskInfo);
}

@Override
Expand All @@ -90,17 +88,21 @@ public void progress(TaskInfo taskInfo, ChunkInfo chunkInfo) {

@Override
public void error(TaskInfo taskInfo, ChunkInfo chunkInfo, Throwable cause) {

try {
HttpDownUtil.retryDown(taskInfo, chunkInfo);
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void chunkDone(TaskInfo taskInfo, ChunkInfo chunkInfo) {
HttpDownProgressHandle.sendMsg("chunkDone", taskInfo);
HttpDownServer.sendMsg("chunkDone", taskInfo);
}

@Override
public void done(TaskInfo taskInfo) {
HttpDownProgressHandle.sendMsg("done", taskInfo);
HttpDownServer.sendMsg("done", taskInfo);
}
});
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package lee.study.down;
package lee.study.down.dispatch;

import lee.study.model.ChunkInfo;
import lee.study.model.TaskInfo;
import lee.study.down.model.ChunkInfo;
import lee.study.down.model.TaskInfo;

public interface HttpDownCallback {

Expand Down
50 changes: 50 additions & 0 deletions src/main/java/lee/study/down/dispatch/HttpDownErrorCheckTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package lee.study.down.dispatch;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import lee.study.down.HttpDownServer;
import lee.study.down.model.ChunkInfo;
import lee.study.down.model.HttpDownInfo;
import lee.study.down.model.TaskInfo;
import lee.study.down.util.HttpDownUtil;

/**
* 1分钟内没有下载判断为失败,进行重试
*/
public class HttpDownErrorCheckTask extends Thread {

@Override
public void run() {
try {
Map<String, Long> flagMap = new HashMap<>();
while (true) {
if (HttpDownServer.DOWN_CONTENT != null && HttpDownServer.DOWN_CONTENT.size() > 0) {
for (Entry<String, HttpDownInfo> entry : HttpDownServer.DOWN_CONTENT.entrySet()) {
TaskInfo taskInfo = entry.getValue().getTaskInfo();
if (taskInfo.getStatus() == 1) {
for (ChunkInfo chunkInfo : taskInfo.getChunkInfoList()) {
if (chunkInfo.getStatus() == 1) {
String key = taskInfo.getId() + "_" + chunkInfo.getIndex();
Long downSize = flagMap.get(key);
//下载失败
if (downSize != null && downSize == chunkInfo.getDownSize()) {
System.out.println("60秒内无响应重试:"+chunkInfo.getIndex()+"\t"+chunkInfo.getChannel().id()+"\t"+chunkInfo.getDownSize());
chunkInfo.setStatus(3);
HttpDownUtil.retryDown(taskInfo, chunkInfo);
} else {
flagMap.put(key, chunkInfo.getDownSize());
}
}
}
}
}
TimeUnit.MILLISECONDS.sleep(60000);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package lee.study.down.dispatch;

import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import lee.study.down.HttpDownServer;
import lee.study.down.model.ChunkInfo;
import lee.study.down.model.HttpDownInfo;
import lee.study.down.model.TaskInfo;

public class HttpDownProgressPushTask extends Thread{

@Override
public void run() {
try {
while (true) {
if (HttpDownServer.DOWN_CONTENT != null && HttpDownServer.DOWN_CONTENT.size() > 0) {
for (Entry<String, HttpDownInfo> entry : HttpDownServer.DOWN_CONTENT.entrySet()) {
TaskInfo taskInfo = entry.getValue().getTaskInfo();
if (taskInfo.getStatus() == 1) {
taskInfo.setLastTime(System.currentTimeMillis());
for (ChunkInfo chunkInfo : taskInfo.getChunkInfoList()) {
if (chunkInfo.getStatus() == 1) {
chunkInfo.setLastTime(System.currentTimeMillis());
}
}
}
HttpDownServer.sendMsg("progress", taskInfo);
}
TimeUnit.MILLISECONDS.sleep(200);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package lee.study.form;
package lee.study.down.form;

import lombok.Data;

Expand Down
Loading

0 comments on commit 7525dd6

Please sign in to comment.