Skip to content

Commit

Permalink
[FLINK-35045][state] ForStFlinkFileSystem supports reading and writin…
Browse files Browse the repository at this point in the history
…g with ByteBuffer
  • Loading branch information
masteryhx committed Apr 18, 2024
1 parent 369bbb8 commit 0cfbe4c
Show file tree
Hide file tree
Showing 4 changed files with 417 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.forst.fs;

import org.apache.flink.core.fs.FSDataInputStream;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A {@link FSDataInputStream} delegates requests to other one and supports reading data with {@link
* ByteBuffer}.
*
* <p>All methods in this class maybe used by ForSt, please start a discussion firstly if it has to
* be modified.
*/
public class ByteBufferReadableFSDataInputStream extends FSDataInputStream {

private final FSDataInputStream originalInputStream;

private volatile long toSeek = -1L;

private final Object lock;

public ByteBufferReadableFSDataInputStream(FSDataInputStream originalInputStream) {
this.originalInputStream = originalInputStream;
this.lock = new Object();
}

/**
* Reads up to <code>ByteBuffer#remaining</code> bytes of data from the input stream into a
* ByteBuffer. Not Tread-safe yet since the interface of sequential read of ForSt only be
* accessed by one thread at a time.
*
* @param bb the buffer into which the data is read.
* @return the total number of bytes read into the buffer.
* @exception IOException If the first byte cannot be read for any reason other than end of
* file, or if the input stream has been closed, or if some other I/O error occurs.
*/
public int readFully(ByteBuffer bb) throws IOException {
if (bb == null) {
throw new NullPointerException();
} else if (bb.remaining() == 0) {
return 0;
}
seekIfNeeded();
return readFullyFromFSDataInputStream(originalInputStream, bb);
}

/**
* Reads up to <code>ByteBuffer#remaining</code> bytes of data from the specific position of the
* input stream into a ByteBuffer. Tread-safe since the interface of random read of ForSt may be
* concurrently accessed by multiple threads.
*
* @param position the start offset in input stream at which the data is read.
* @param bb the buffer into which the data is read.
* @return the total number of bytes read into the buffer.
* @exception IOException If the first byte cannot be read for any reason other than end of
* file, or if the input stream has been closed, or if some other I/O error occurs.
*/
public int readFully(long position, ByteBuffer bb) throws IOException {
// TODO: Improve the performance
synchronized (lock) {
originalInputStream.seek(position);
return readFullyFromFSDataInputStream(originalInputStream, bb);
}
}

private int readFullyFromFSDataInputStream(FSDataInputStream originalInputStream, ByteBuffer bb)
throws IOException {
byte[] tmp = new byte[bb.remaining()];
int n = 0;
while (n < tmp.length) {
int read = originalInputStream.read(tmp, n, tmp.length - n);
if (read == -1) {
break;
}
n += read;
}
if (n > 0) {
bb.put(tmp, 0, n);
}
return n;
}

private void seekIfNeeded() throws IOException {
if (toSeek >= 0) {
originalInputStream.seek(toSeek);
toSeek = -1L;
}
}

@Override
public void seek(long desired) {
toSeek = desired;
}

@Override
public long getPos() throws IOException {
if (toSeek >= 0) {
return toSeek;
}
return originalInputStream.getPos();
}

@Override
public int read() throws IOException {
seekIfNeeded();
return originalInputStream.read();
}

@Override
public int read(byte[] b) throws IOException {
seekIfNeeded();
return originalInputStream.read(b);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
seekIfNeeded();
return originalInputStream.read(b, off, len);
}

@Override
public long skip(long n) throws IOException {
long position = getPos();
seek(position + n);
return getPos() - position;
}

@Override
public int available() throws IOException {
seekIfNeeded();
return originalInputStream.available();
}

@Override
public void close() throws IOException {
originalInputStream.close();
}

@Override
public synchronized void mark(int readlimit) {
originalInputStream.mark(readlimit);
}

@Override
public synchronized void reset() throws IOException {
toSeek = -1L;
originalInputStream.reset();
}

@Override
public boolean markSupported() {
return originalInputStream.markSupported();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.forst.fs;

import org.apache.flink.core.fs.FSDataOutputStream;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A {@link FSDataOutputStream} delegates requests to other one and supports writing data with
* {@link ByteBuffer}.
*
* <p>All methods in this class maybe used by ForSt, please start a discussion firstly if it has to
* be modified.
*/
public class ByteBufferWritableFSDataOutputStream extends FSDataOutputStream {

private final FSDataOutputStream originalOutputStream;

public ByteBufferWritableFSDataOutputStream(FSDataOutputStream originalOutputStream) {
this.originalOutputStream = originalOutputStream;
}

/**
* Writes <code>ByteBuffer#remaining</code> bytes from the ByteBuffer to this output stream. Not
* Thread-safe yet since the interface of write of ForSt only be accessed by one thread at a
* time.
*
* <p>If <code>bb</code> is <code>null</code>, a <code>NullPointerException</code> is thrown.
*
* @exception IOException if an I/O error occurs. In particular, an <code>IOException</code> is
* thrown if the output stream is closed.
*/
public void write(ByteBuffer bb) throws IOException {
if (bb == null) {
throw new NullPointerException();
}
if (bb.hasArray()) {
write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining());
} else {
byte[] tmp = new byte[bb.remaining()];
bb.get(tmp);
write(tmp, 0, tmp.length);
}
}

@Override
public long getPos() throws IOException {
return originalOutputStream.getPos();
}

@Override
public void write(int b) throws IOException {
originalOutputStream.write(b);
}

public void write(byte[] b) throws IOException {
originalOutputStream.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
originalOutputStream.write(b, off, len);
}

@Override
public void flush() throws IOException {
originalOutputStream.flush();
}

@Override
public void sync() throws IOException {
originalOutputStream.sync();

// Data only be visible after closing for some object storages e.g. oss, s3
// TODO: 1. Support to close when sync for some object storages.
// (maybe introduce isSupportSync for FileSystem)
// TODO: 2. Support to handle specific files, e.g. MANIFEST, LOG.
}

@Override
public void close() throws IOException {
originalOutputStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
package org.apache.flink.state.forst.fs;

import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
Expand All @@ -43,28 +41,51 @@ public ForStFlinkFileSystem(FileSystem delegateFS) {
this.delegateFS = delegateFS;
}

/**
* Returns a reference to the {@link FileSystem} instance for accessing the file system
* identified by the given {@link URI}.
*
* @param uri the {@link URI} identifying the file system.
* @return a reference to the {@link FileSystem} instance for accessing the file system
* identified by the given {@link URI}.
* @throws IOException thrown if a reference to the file system instance could not be obtained.
*/
public static FileSystem get(URI uri) throws IOException {
return new ForStFlinkFileSystem(FileSystem.get(uri));
}

/**
* Create ByteBufferWritableFSDataOutputStream from specific path which supports to write data
* to ByteBuffer with {@link org.apache.flink.core.fs.FileSystem.WriteMode#OVERWRITE} mode.
*
* @param path The file path to write to.
* @return The stream to the new file at the target path.
* @throws IOException Thrown, if the stream could not be opened because of an I/O, or because a
* file already exists at that path and the write mode indicates to not overwrite the file.
*/
public ByteBufferWritableFSDataOutputStream create(Path path) throws IOException {
return create(path, WriteMode.OVERWRITE);
}

@Override
public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOException {
return delegateFS.create(path, overwriteMode);
public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwriteMode)
throws IOException {
return new ByteBufferWritableFSDataOutputStream(delegateFS.create(path, overwriteMode));
}

@Override
public FSDataInputStream open(Path path, int bufferSize) throws IOException {
return delegateFS.open(path, bufferSize);
public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throws IOException {
return new ByteBufferReadableFSDataInputStream(delegateFS.open(path, bufferSize));
}

@Override
public FSDataInputStream open(Path path) throws IOException {
return delegateFS.open(path);
public ByteBufferReadableFSDataInputStream open(Path path) throws IOException {
return new ByteBufferReadableFSDataInputStream(delegateFS.open(path));
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
// The rename is not atomic for RocksDB. Some FileSystems e.g. HDFS, OSS does not allow a
// The rename is not atomic for ForSt. Some FileSystems e.g. HDFS, OSS does not allow a
// renaming if the target already exists. So, we delete the target before attempting the
// rename.
if (delegateFS.exists(dst)) {
Expand Down Expand Up @@ -117,10 +138,6 @@ public boolean mkdirs(Path path) throws IOException {
return delegateFS.mkdirs(path);
}

public FSDataOutputStream create(Path path) throws IOException {
return create(path, WriteMode.OVERWRITE);
}

@Override
public boolean isDistributedFS() {
return delegateFS.isDistributedFS();
Expand Down
Loading

0 comments on commit 0cfbe4c

Please sign in to comment.