Skip to content

Commit

Permalink
还原为LargeMappedByteBuffer
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie committed Jan 15, 2018
1 parent 8d4f357 commit 3d19228
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 33 deletions.
72 changes: 72 additions & 0 deletions common/src/main/java/lee/study/down/io/LargeMappedByteBuffer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package lee.study.down.io;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.LinkedList;
import java.util.List;

public class LargeMappedByteBuffer implements Closeable {

private List<MappedByteBuffer> bufferList;
private long rawPosition;
private long position;
private long size;

public LargeMappedByteBuffer(FileChannel fileChannel, MapMode mapMode, long position, long size)
throws IOException {
this.rawPosition = position;
this.position = position;
this.size = size;
int count = (int) Math.ceil(size / (double) Integer.MAX_VALUE);
this.bufferList = new LinkedList<>();
long calcPos = position;
for (int i = 0; i < count; i++) {
long calcSize = i + 1 == count ? size % Integer.MAX_VALUE : Integer.MAX_VALUE;
bufferList.add(fileChannel.map(mapMode, calcPos, calcSize));
calcPos += calcSize;
}
}

public final void put(ByteBuffer byteBuffer) throws IOException {
try {
int index = getIndex();
long length = byteBuffer.limit() - byteBuffer.position();
this.position += length;
MappedByteBuffer mappedBuffer = bufferList.get(index);
if (mappedBuffer.remaining() < length) {
byte[] temp = new byte[mappedBuffer.remaining()];
byteBuffer.get(temp);
bufferList.get(index).put(temp);
bufferList.get(index + 1).put(byteBuffer);
} else {
bufferList.get(index).put(byteBuffer);
}
} catch (Exception e) {
throw new IOException("LargeMappedByteBuffer put rawPosition-"+rawPosition+" size-"+size, e);
}
}

private int getIndex() {
return (int) ((this.position - this.rawPosition) / Integer.MAX_VALUE);
}

@Override
public void close() throws IOException {
try {
Class<?> clazz = Class.forName("sun.nio.ch.FileChannelImpl");
Method m = clazz.getDeclaredMethod("unmap", MappedByteBuffer.class);
m.setAccessible(true);
for (MappedByteBuffer mappedBuffer : bufferList) {
m.invoke(clazz, mappedBuffer);
}
} catch (Exception e) {
throw new IOException("LargeMappedByteBuffer close", e);
}
}

}
26 changes: 7 additions & 19 deletions core/src/main/java/lee/study/down/HttpDownBootstrap.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package lee.study.down;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultLastHttpContent;
Expand All @@ -21,11 +18,13 @@
import lee.study.down.constant.HttpDownStatus;
import lee.study.down.dispatch.HttpDownCallback;
import lee.study.down.handle.HttpDownInitializer;
import lee.study.down.io.LargeMappedByteBuffer;
import lee.study.down.model.ChunkInfo;
import lee.study.down.model.HttpDownInfo;
import lee.study.down.model.HttpRequestInfo;
import lee.study.down.model.TaskInfo;
import lee.study.down.util.FileUtil;
import lee.study.down.util.HttpDownUtil;
import lee.study.proxyee.util.ProtoUtil.RequestProto;
import lombok.AllArgsConstructor;
import lombok.Data;
Expand All @@ -39,10 +38,7 @@ public class HttpDownBootstrap {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpDownBootstrap.class);
public static final String ATTR_CHANNEL = "channel";
public static final String ATTR_FILE_CHANNEL = "fileChannel";
//tcp bufferSize最大为128K
public static final int BUFFER_SIZE = 1024 * 128;
private static final RecvByteBufAllocator RECV_BYTE_BUF_ALLOCATOR = new AdaptiveRecvByteBufAllocator(
64, BUFFER_SIZE, BUFFER_SIZE);
public static final String ATTR_MAP_BUFFER = "mapBuffer";

private HttpDownInfo httpDownInfo;
private SslContext clientSslContext;
Expand Down Expand Up @@ -77,8 +73,6 @@ public void startChunkDown(ChunkInfo chunkInfo, int updateStatus) {
LOGGER.debug("开始下载:" + chunkInfo.getIndex() + "\t" + chunkInfo.getDownSize());
Bootstrap bootstrap = new Bootstrap()
.channel(NioSocketChannel.class)
.option(ChannelOption.RCVBUF_ALLOCATOR, RECV_BYTE_BUF_ALLOCATOR)
.option(ChannelOption.SO_RCVBUF, BUFFER_SIZE)
.group(clientLoopGroup)
.handler(new HttpDownInitializer(requestProto.getSsl(), this, chunkInfo));
if (httpDownInfo.getProxyConfig() != null) {
Expand Down Expand Up @@ -194,18 +188,12 @@ public void continueDown()

public void close(ChunkInfo chunkInfo) {
try {
FileChannel fileChannel = (FileChannel) getAttr(chunkInfo, ATTR_FILE_CHANNEL);
if (fileChannel != null) {
//关闭旧的下载文件连接
fileChannel.close();
}
Channel channel = (Channel) getAttr(chunkInfo, ATTR_CHANNEL);
if (channel != null) {
//关闭旧的下载连接
channel.close();
}
FileChannel fileChannel = (FileChannel) getAttr(chunkInfo, ATTR_FILE_CHANNEL);
LargeMappedByteBuffer mapBuffer = (LargeMappedByteBuffer) getAttr(chunkInfo, ATTR_MAP_BUFFER);
HttpDownUtil.safeClose(channel, fileChannel, mapBuffer);
} catch (Exception e) {
LOGGER.error("close error", e);
LOGGER.error("closeChunk error", e);
}
}

Expand Down
39 changes: 29 additions & 10 deletions core/src/main/java/lee/study/down/handle/HttpDownInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.ReferenceCountUtil;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import lee.study.down.HttpDownBootstrap;
import lee.study.down.constant.HttpDownStatus;
import lee.study.down.dispatch.HttpDownCallback;
import lee.study.down.io.LargeMappedByteBuffer;
import lee.study.down.model.ChunkInfo;
import lee.study.down.model.TaskInfo;
import lee.study.down.util.FileUtil;
Expand Down Expand Up @@ -44,18 +46,18 @@ public HttpDownInitializer(boolean isSsl, HttpDownBootstrap bootstrap,

@Override
protected void initChannel(Channel ch) throws Exception {
bootstrap.setAttr(chunkInfo, HttpDownBootstrap.ATTR_CHANNEL, ch);
if (bootstrap.getHttpDownInfo().getProxyConfig() != null) {
ch.pipeline().addLast(ProxyHandleFactory.build(bootstrap.getHttpDownInfo().getProxyConfig()));
}
if (isSsl) {
ch.pipeline().addLast(bootstrap.getClientSslContext().newHandler(ch.alloc()));
}
ch.pipeline()
.addLast("httpCodec", new HttpClientCodec(4096, 8192, HttpDownBootstrap.BUFFER_SIZE));
.addLast("httpCodec", new HttpClientCodec());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {

private FileChannel fileChannel;
private LargeMappedByteBuffer mappedBuffer;
private TaskInfo taskInfo = bootstrap.getHttpDownInfo().getTaskInfo();
private HttpDownCallback callback = bootstrap.getCallback();

Expand All @@ -67,17 +69,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ByteBuf byteBuf = httpContent.content();
int readableBytes = byteBuf.readableBytes();
synchronized (chunkInfo) {
if (chunkInfo.getStatus() == HttpDownStatus.RUNNING) {
MappedByteBuffer mappedByteBuf = fileChannel.map(MapMode.READ_WRITE,
chunkInfo.getOriStartPosition() + chunkInfo.getDownSize(), readableBytes);
mappedByteBuf.put(byteBuf.nioBuffer());
FileUtil.unmap(mappedByteBuf);
Channel nowChannel = (Channel) bootstrap
.getAttr(chunkInfo, HttpDownBootstrap.ATTR_CHANNEL);
LargeMappedByteBuffer nowMapBuffer = (LargeMappedByteBuffer) bootstrap
.getAttr(chunkInfo, HttpDownBootstrap.ATTR_MAP_BUFFER);
if (chunkInfo.getStatus() == HttpDownStatus.RUNNING
&& nowChannel == ctx.channel()) {
nowMapBuffer.put(byteBuf.nioBuffer());
//文件已下载大小
chunkInfo.setDownSize(chunkInfo.getDownSize() + readableBytes);
taskInfo.setDownSize(taskInfo.getDownSize() + readableBytes);
callback.onProgress(bootstrap.getHttpDownInfo(), chunkInfo);
} else {
bootstrap.close(chunkInfo);
safeClose(ctx.channel());
return;
}
}
Expand Down Expand Up @@ -128,8 +132,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
|| chunkInfo.getStatus() == HttpDownStatus.CONNECTING_CONTINUE) {
fileChannel = new RandomAccessFile(taskInfo.buildTaskFilePath(), "rw")
.getChannel();
mappedBuffer = new LargeMappedByteBuffer(fileChannel,
MapMode.READ_WRITE, chunkInfo.getOriStartPosition() + chunkInfo.getDownSize(),
chunkInfo.getTotalSize() - chunkInfo.getDownSize());
chunkInfo.setStatus(HttpDownStatus.RUNNING);
bootstrap.setAttr(chunkInfo, HttpDownBootstrap.ATTR_CHANNEL, ctx.channel());
bootstrap.setAttr(chunkInfo, HttpDownBootstrap.ATTR_FILE_CHANNEL, fileChannel);
bootstrap.setAttr(chunkInfo, HttpDownBootstrap.ATTR_MAP_BUFFER, mappedBuffer);
callback.onChunkStart(bootstrap.getHttpDownInfo(), chunkInfo);
} else {
bootstrap.close(chunkInfo);
Expand All @@ -146,12 +155,14 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOGGER.error("down onError:", cause);
if (ctx.channel() == bootstrap.getAttr(chunkInfo, HttpDownBootstrap.ATTR_CHANNEL)) {
Channel nowChannel = (Channel) bootstrap
.getAttr(chunkInfo, HttpDownBootstrap.ATTR_CHANNEL);
if (nowChannel == ctx.channel()) {
chunkInfo.setStatus(HttpDownStatus.CONNECTING_FAIL);
bootstrap.retryChunkDown(chunkInfo);
callback.onError(bootstrap.getHttpDownInfo(), chunkInfo, cause);
} else {
bootstrap.close(chunkInfo);
safeClose(ctx.channel());
}
}

Expand All @@ -161,6 +172,14 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
bootstrap.close(chunkInfo);
}

private void safeClose(Channel channel) {
try {
HttpDownUtil.safeClose(channel, fileChannel, mappedBuffer);
} catch (IOException e) {
LOGGER.error("connect close fail:", e);
}
}

});
}

Expand Down
9 changes: 5 additions & 4 deletions core/src/main/java/lee/study/down/util/HttpDownUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslContext;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.channels.FileChannel;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lee.study.down.io.LargeMappedByteBuffer;
import lee.study.down.model.HttpRequestInfo;
import lee.study.down.model.TaskInfo;
import lee.study.proxyee.util.ProtoUtil.RequestProto;

public class HttpDownUtil {

// private static final RecvByteBufAllocator RECV_BYTE_BUF_ALLOCATOR = new AdaptiveRecvByteBufAllocator(64,8192,65536);

/**
* 检测是否支持断点下载
*/
Expand Down Expand Up @@ -153,7 +154,7 @@ public static long getDownFileSize(HttpHeaders resHeaders) {
return 0;
}

/*public static void safeClose(Channel channel, FileChannel fileChannel,
public static void safeClose(Channel channel, FileChannel fileChannel,
LargeMappedByteBuffer mappedBuffer) throws IOException {
if (channel != null) {
//关闭旧的下载连接
Expand All @@ -167,5 +168,5 @@ public static long getDownFileSize(HttpHeaders resHeaders) {
//关闭旧的下载文件连接
mappedBuffer.close();
}
}*/
}
}

0 comments on commit 3d19228

Please sign in to comment.