Skip to content

Commit

Permalink
[FLINK-35045][state] Introduce input stream pool to improve the perfo…
Browse files Browse the repository at this point in the history
…rmance of readFully
  • Loading branch information
masteryhx committed Apr 18, 2024
1 parent 0cfbe4c commit 7d004ca
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;

/**
* A {@link FSDataInputStream} delegates requests to other one and supports reading data with {@link
Expand All @@ -36,11 +39,17 @@ public class ByteBufferReadableFSDataInputStream extends FSDataInputStream {

private volatile long toSeek = -1L;

private final Object lock;
private final Queue<FSDataInputStream> readInputStreamPool;

public ByteBufferReadableFSDataInputStream(FSDataInputStream originalInputStream) {
private final Callable<FSDataInputStream> inputStreamBuilder;

public ByteBufferReadableFSDataInputStream(
FSDataInputStream originalInputStream,
Callable<FSDataInputStream> inputStreamBuilder,
int inputStreamCapacity) {
this.originalInputStream = originalInputStream;
this.lock = new Object();
this.inputStreamBuilder = inputStreamBuilder;
this.readInputStreamPool = new LinkedBlockingQueue<>(inputStreamCapacity);
}

/**
Expand All @@ -66,20 +75,26 @@ public int readFully(ByteBuffer bb) throws IOException {
/**
* Reads up to <code>ByteBuffer#remaining</code> bytes of data from the specific position of the
* input stream into a ByteBuffer. Tread-safe since the interface of random read of ForSt may be
* concurrently accessed by multiple threads.
* concurrently accessed by multiple threads. TODO: Support to split this method to other class.
*
* @param position the start offset in input stream at which the data is read.
* @param bb the buffer into which the data is read.
* @return the total number of bytes read into the buffer.
* @exception IOException If the first byte cannot be read for any reason other than end of
* file, or if the input stream has been closed, or if some other I/O error occurs.
*/
public int readFully(long position, ByteBuffer bb) throws IOException {
// TODO: Improve the performance
synchronized (lock) {
originalInputStream.seek(position);
return readFullyFromFSDataInputStream(originalInputStream, bb);
public int readFully(long position, ByteBuffer bb) throws Exception {
// TODO: Support partitioned read
FSDataInputStream fsDataInputStream = readInputStreamPool.poll();
if (fsDataInputStream == null) {
fsDataInputStream = inputStreamBuilder.call();
}
fsDataInputStream.seek(position);
int result = readFullyFromFSDataInputStream(fsDataInputStream, bb);
if (!readInputStreamPool.offer(fsDataInputStream)) {
fsDataInputStream.close();
}
return result;
}

private int readFullyFromFSDataInputStream(FSDataInputStream originalInputStream, ByteBuffer bb)
Expand Down Expand Up @@ -153,6 +168,9 @@ public int available() throws IOException {
@Override
public void close() throws IOException {
originalInputStream.close();
for (FSDataInputStream fsDataInputStream : readInputStreamPool) {
fsDataInputStream.close();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwrit

@Override
public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throws IOException {
return new ByteBufferReadableFSDataInputStream(delegateFS.open(path, bufferSize));
return new ByteBufferReadableFSDataInputStream(
delegateFS.open(path, bufferSize), () -> delegateFS.open(path, bufferSize), 32);
}

@Override
public ByteBufferReadableFSDataInputStream open(Path path) throws IOException {
return new ByteBufferReadableFSDataInputStream(delegateFS.open(path));
return new ByteBufferReadableFSDataInputStream(
delegateFS.open(path), () -> delegateFS.open(path), 32);
}

@Override
Expand Down

0 comments on commit 7d004ca

Please sign in to comment.