Skip to content

Commit

Permalink
Pooled buffers (qubole#40)
Browse files Browse the repository at this point in the history
Improving caching mechanisam (qubole#39)
    
Remove MemoryMappedBuffer usage from RemoteReadRequestChain
    
There were several issues with jdk throwing SIGINTs with MemoryMappedBuffer
  • Loading branch information
pvam authored and Shubham Tagra committed Feb 22, 2017
1 parent 070fe38 commit b472ad4
Show file tree
Hide file tree
Showing 12 changed files with 238 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public List<BlockLocation> getCacheStatus(String remotePath, long fileLength, lo
}

if (currentNodeIndex == -1 || nodes == null) {
log.error("Initialization not done");
return null;
}

Expand Down Expand Up @@ -192,6 +193,9 @@ else if (clusterType == PRESTO_CLUSTER_MANAGER.ordinal()) {
}
}
}
if (nodes == null || nodes.size() == 0 || currentNodeIndex == -1) {
log.error(String.format("Could not initialize cluster nodes=%s nodeName=%s currentNodeIndex=%d", nodes, nodeName, currentNodeIndex));
}
}

nodes = clusterManager.getNodes();
Expand Down
2 changes: 1 addition & 1 deletion rubix-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
*/
package com.qubole.rubix.core;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;

import static com.google.common.base.Preconditions.checkState;

Expand All @@ -25,14 +29,34 @@
*/
public class CachedReadRequestChain extends ReadRequestChain
{
private final RandomAccessFile fileToRead;
private FileChannel fileChannel = null;
private RandomAccessFile raf;
private int read = 0; // data read

private ByteBuffer directBuffer;

private static final Log log = LogFactory.getLog(CachedReadRequestChain.class);

public CachedReadRequestChain(RandomAccessFile fileToRead)
public CachedReadRequestChain(String fileToRead, ByteBuffer buffer)
throws IOException
{
this.raf = new RandomAccessFile(fileToRead, "r");
FileInputStream fis = new FileInputStream(raf.getFD());
fileChannel = fis.getChannel();
directBuffer = buffer;
}

@VisibleForTesting
public CachedReadRequestChain(String fileToRead)
throws IOException
{
this(fileToRead, ByteBuffer.allocate(1024));
}

@VisibleForTesting
public CachedReadRequestChain()
{
this.fileToRead = fileToRead;
//Dummy constructor for testing #testConsequtiveRequest method.
}

public Integer call()
Expand All @@ -48,22 +72,31 @@ public Integer call()

checkState(isLocked, "Trying to execute Chain without locking");
for (ReadRequest readRequest : readRequests) {
fileToRead.seek(readRequest.getActualReadStart());
int nread = 0;
int leftToRead = readRequest.getActualReadLength();
log.debug(String.format("Processing readrequest %d-%d, length %d", readRequest.actualReadStart, readRequest.actualReadEnd, leftToRead));
while (nread < readRequest.getActualReadLength()) {
int nbytes = fileToRead.read(readRequest.getDestBuffer(), readRequest.getDestBufferOffset() + nread, readRequest.getActualReadLength() - nread);
log.debug(String.format("CachedFileRead copied data from %d of length %d at buffer offset %d",
readRequest.getActualReadStart(),
readRequest.getActualReadLength() - nread,
readRequest.getDestBufferOffset() + nread));
if (nbytes < 0) {
int readInThisCycle = Math.min(leftToRead, directBuffer.capacity());
directBuffer.clear();
int nbytes = fileChannel.read(directBuffer, readRequest.getActualReadStart() + nread);
if (nbytes <= 0) {
break;
}
nread += nbytes;
directBuffer.flip();
int transferBytes = Math.min(readInThisCycle, nbytes);
directBuffer.get(readRequest.getDestBuffer(), readRequest.getDestBufferOffset() + nread, transferBytes);
leftToRead -= transferBytes;
nread += transferBytes;
}
log.debug(String.format("CachedFileRead copied data [%d - %d] at buffer offset %d",
readRequest.getActualReadStart(),
readRequest.getActualReadStart() + nread,
readRequest.getDestBufferOffset()));
read += nread;
}
log.info(String.format("Read %d bytes from cached file", read));
fileChannel.close();
raf.close();
return read;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.DirectBufferPool;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -57,7 +58,7 @@ public class CachingInputStream
private long nextReadPosition;
private long nextReadBlock;
private int blockSize;
private RandomAccessFile localFileForReading = null;
private RandomAccessFile localFileForWriting = null;
private CachingFileSystemStats statsMbean;

private static ListeningExecutorService readService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
Expand All @@ -75,6 +76,12 @@ public class CachingInputStream
ClusterType clusterType;
FileSystem remoteFileSystem;

private static DirectBufferPool bufferPool = new DirectBufferPool();
private ByteBuffer directWriteBuffer = null;
private ByteBuffer directReadBuffer = null;
private byte[] affixBuffer;
private int diskReadBufferSize;

public CachingInputStream(FSDataInputStream parentInputStream, FileSystem parentFs, Path backendPath, Configuration conf, CachingFileSystemStats statsMbean, ClusterType clusterType, BookKeeperFactory bookKeeperFactory, FileSystem remoteFileSystem)
throws IOException
{
Expand Down Expand Up @@ -118,23 +125,19 @@ private void initialize(FSDataInputStream parentInputStream, Configuration conf,
this.inputStream = checkNotNull(parentInputStream, "ParentInputStream is null");
this.blockSize = CacheConfig.getBlockSize(conf);
this.localPath = CacheConfig.getLocalPath(remotePath, conf);
try {
this.localFileForReading = new RandomAccessFile(localPath, "r");
}
catch (FileNotFoundException e) {
log.info("Creating local file " + localPath);
File file = new File(localPath);
this.diskReadBufferSize = CacheConfig.getDiskReadBufferSizeDefault(conf);
File file = new File(localPath);
if (!file.exists()) {
try {
file.createNewFile();
file.setReadable(true, false);
file.setWritable(true, false);
this.localFileForReading = new RandomAccessFile(file, "rw");
}
catch (IOException e1) {
log.error("Error in creating local file " + localPath, e1);
catch (IOException e) {
log.error("Error in creating local file " + localPath, e);
// reset bookkeeper client so that we take direct route
this.bookKeeperClient = null;
}
file.setReadable(true, false);
file.setWritable(true, false);
}
}

Expand Down Expand Up @@ -176,6 +179,7 @@ public int read(byte[] buffer, int offset, int length)
throws IOException
{
log.debug(String.format("Got Read, currentPos: %d currentBlock: %d bufferOffset: %d length: %d", nextReadPosition, nextReadBlock, offset, length));

if (nextReadPosition >= fileSize) {
log.debug("Already at eof, returning");
return -1;
Expand Down Expand Up @@ -276,7 +280,6 @@ private List<ReadRequestChain> setupReadRequestChains(byte[] buffer,
log.debug("Reached EOF, returning");
break;
}

if (backendReadEnd >= fileSize) {
backendReadEnd = fileSize;
}
Expand Down Expand Up @@ -307,8 +310,21 @@ private List<ReadRequestChain> setupReadRequestChains(byte[] buffer,

else if (isCached.get(idx).getLocation() == Location.CACHED) {
log.debug(String.format("Sending cached block %d to cachedReadRequestChain", blockNum));
if (cachedReadRequestChain == null) {
cachedReadRequestChain = new CachedReadRequestChain(localFileForReading);
try {
if (directReadBuffer == null) {
directReadBuffer = bufferPool.getBuffer(diskReadBufferSize);
}
if (cachedReadRequestChain == null) {
cachedReadRequestChain = new CachedReadRequestChain(localPath, directReadBuffer);
}
}
catch (IOException e) {
log.error("Unable to open file channel in R mode", e);
// reset bookkeeper client so that we take direct route
this.bookKeeperClient = null;
isCached = null;
idx--;
blockNum--;
}
cachedReadRequestChain.addReadRequest(readRequest);
}
Expand All @@ -324,8 +340,27 @@ else if (isCached.get(idx).getLocation() == Location.CACHED) {
}
else {
log.debug(String.format("Sending block %d to remoteReadRequestChain", blockNum));
if (remoteReadRequestChain == null) {
remoteReadRequestChain = new RemoteReadRequestChain(inputStream, localPath);
try {
if (localFileForWriting == null) {
this.localFileForWriting = new RandomAccessFile(localPath, "rw");
}
if (directWriteBuffer == null) {
directWriteBuffer = bufferPool.getBuffer(diskReadBufferSize);
}
if (affixBuffer == null) {
affixBuffer = new byte[blockSize];
}
if (remoteReadRequestChain == null) {
remoteReadRequestChain = new RemoteReadRequestChain(inputStream, localFileForWriting, directWriteBuffer, affixBuffer);
}
}
catch (IOException e) {
log.error("Unable to obtain open file channel in RW mode", e);
// reset bookkeeper client so that we take direct route
this.bookKeeperClient = null;
isCached = null;
idx--;
blockNum--;
}
remoteReadRequestChain.addReadRequest(readRequest);
}
Expand Down Expand Up @@ -362,13 +397,27 @@ private void setNextReadBlock()
this.nextReadBlock = this.nextReadPosition / blockSize;
}

private void returnBuffers()
{
if (directWriteBuffer != null) {
bufferPool.returnBuffer(directWriteBuffer);
directWriteBuffer = null;
}

if (directReadBuffer != null) {
bufferPool.returnBuffer(directReadBuffer);
directReadBuffer = null;
}
}

@Override
public void close()
{
returnBuffers();
try {
inputStream.close();
if (localFileForReading != null) {
localFileForReading.close();
if (localFileForWriting != null) {
localFileForWriting.close();
}
if (bookKeeperClient != null) {
bookKeeperClient.close();
Expand Down
Loading

0 comments on commit b472ad4

Please sign in to comment.