From 0cfbe4c95e0b2b376f5eb3afa2dfb5a5affc1a5c Mon Sep 17 00:00:00 2001 From: Hangxiang Yu Date: Mon, 25 Mar 2024 18:10:38 +0800 Subject: [PATCH] [FLINK-35045][state] ForStFlinkFileSystem supports reading and writing with ByteBuffer --- .../ByteBufferReadableFSDataInputStream.java | 173 ++++++++++++++++++ .../ByteBufferWritableFSDataOutputStream.java | 102 +++++++++++ .../state/forst/fs/ForStFlinkFileSystem.java | 43 +++-- .../forst/fs/ForStFlinkFileSystemTest.java | 112 ++++++++++++ 4 files changed, 417 insertions(+), 13 deletions(-) create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java create mode 100644 flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java create mode 100644 flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java new file mode 100644 index 0000000000000..e2b1f512e843d --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferReadableFSDataInputStream.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import org.apache.flink.core.fs.FSDataInputStream; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A {@link FSDataInputStream} delegates requests to other one and supports reading data with {@link + * ByteBuffer}. + * + *

All methods in this class maybe used by ForSt, please start a discussion firstly if it has to + * be modified. + */ +public class ByteBufferReadableFSDataInputStream extends FSDataInputStream { + + private final FSDataInputStream originalInputStream; + + private volatile long toSeek = -1L; + + private final Object lock; + + public ByteBufferReadableFSDataInputStream(FSDataInputStream originalInputStream) { + this.originalInputStream = originalInputStream; + this.lock = new Object(); + } + + /** + * Reads up to ByteBuffer#remaining bytes of data from the input stream into a + * ByteBuffer. Not Tread-safe yet since the interface of sequential read of ForSt only be + * accessed by one thread at a time. + * + * @param bb the buffer into which the data is read. + * @return the total number of bytes read into the buffer. + * @exception IOException If the first byte cannot be read for any reason other than end of + * file, or if the input stream has been closed, or if some other I/O error occurs. + */ + public int readFully(ByteBuffer bb) throws IOException { + if (bb == null) { + throw new NullPointerException(); + } else if (bb.remaining() == 0) { + return 0; + } + seekIfNeeded(); + return readFullyFromFSDataInputStream(originalInputStream, bb); + } + + /** + * Reads up to ByteBuffer#remaining bytes of data from the specific position of the + * input stream into a ByteBuffer. Tread-safe since the interface of random read of ForSt may be + * concurrently accessed by multiple threads. + * + * @param position the start offset in input stream at which the data is read. + * @param bb the buffer into which the data is read. + * @return the total number of bytes read into the buffer. + * @exception IOException If the first byte cannot be read for any reason other than end of + * file, or if the input stream has been closed, or if some other I/O error occurs. + */ + public int readFully(long position, ByteBuffer bb) throws IOException { + // TODO: Improve the performance + synchronized (lock) { + originalInputStream.seek(position); + return readFullyFromFSDataInputStream(originalInputStream, bb); + } + } + + private int readFullyFromFSDataInputStream(FSDataInputStream originalInputStream, ByteBuffer bb) + throws IOException { + byte[] tmp = new byte[bb.remaining()]; + int n = 0; + while (n < tmp.length) { + int read = originalInputStream.read(tmp, n, tmp.length - n); + if (read == -1) { + break; + } + n += read; + } + if (n > 0) { + bb.put(tmp, 0, n); + } + return n; + } + + private void seekIfNeeded() throws IOException { + if (toSeek >= 0) { + originalInputStream.seek(toSeek); + toSeek = -1L; + } + } + + @Override + public void seek(long desired) { + toSeek = desired; + } + + @Override + public long getPos() throws IOException { + if (toSeek >= 0) { + return toSeek; + } + return originalInputStream.getPos(); + } + + @Override + public int read() throws IOException { + seekIfNeeded(); + return originalInputStream.read(); + } + + @Override + public int read(byte[] b) throws IOException { + seekIfNeeded(); + return originalInputStream.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + seekIfNeeded(); + return originalInputStream.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + long position = getPos(); + seek(position + n); + return getPos() - position; + } + + @Override + public int available() throws IOException { + seekIfNeeded(); + return originalInputStream.available(); + } + + @Override + public void close() throws IOException { + originalInputStream.close(); + } + + @Override + public synchronized void mark(int readlimit) { + originalInputStream.mark(readlimit); + } + + @Override + public synchronized void reset() throws IOException { + toSeek = -1L; + originalInputStream.reset(); + } + + @Override + public boolean markSupported() { + return originalInputStream.markSupported(); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java new file mode 100644 index 0000000000000..ba13447ea862b --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ByteBufferWritableFSDataOutputStream.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import org.apache.flink.core.fs.FSDataOutputStream; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * A {@link FSDataOutputStream} delegates requests to other one and supports writing data with + * {@link ByteBuffer}. + * + *

All methods in this class maybe used by ForSt, please start a discussion firstly if it has to + * be modified. + */ +public class ByteBufferWritableFSDataOutputStream extends FSDataOutputStream { + + private final FSDataOutputStream originalOutputStream; + + public ByteBufferWritableFSDataOutputStream(FSDataOutputStream originalOutputStream) { + this.originalOutputStream = originalOutputStream; + } + + /** + * Writes ByteBuffer#remaining bytes from the ByteBuffer to this output stream. Not + * Thread-safe yet since the interface of write of ForSt only be accessed by one thread at a + * time. + * + *

If bb is null, a NullPointerException is thrown. + * + * @exception IOException if an I/O error occurs. In particular, an IOException is + * thrown if the output stream is closed. + */ + public void write(ByteBuffer bb) throws IOException { + if (bb == null) { + throw new NullPointerException(); + } + if (bb.hasArray()) { + write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()); + } else { + byte[] tmp = new byte[bb.remaining()]; + bb.get(tmp); + write(tmp, 0, tmp.length); + } + } + + @Override + public long getPos() throws IOException { + return originalOutputStream.getPos(); + } + + @Override + public void write(int b) throws IOException { + originalOutputStream.write(b); + } + + public void write(byte[] b) throws IOException { + originalOutputStream.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + originalOutputStream.write(b, off, len); + } + + @Override + public void flush() throws IOException { + originalOutputStream.flush(); + } + + @Override + public void sync() throws IOException { + originalOutputStream.sync(); + + // Data only be visible after closing for some object storages e.g. oss, s3 + // TODO: 1. Support to close when sync for some object storages. + // (maybe introduce isSupportSync for FileSystem) + // TODO: 2. Support to handle specific files, e.g. MANIFEST, LOG. + } + + @Override + public void close() throws IOException { + originalOutputStream.close(); + } +} diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java index 3cabad4c519a3..62239a5d8dd27 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystem.java @@ -19,8 +19,6 @@ package org.apache.flink.state.forst.fs; import org.apache.flink.core.fs.BlockLocation; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemKind; @@ -43,28 +41,51 @@ public ForStFlinkFileSystem(FileSystem delegateFS) { this.delegateFS = delegateFS; } + /** + * Returns a reference to the {@link FileSystem} instance for accessing the file system + * identified by the given {@link URI}. + * + * @param uri the {@link URI} identifying the file system. + * @return a reference to the {@link FileSystem} instance for accessing the file system + * identified by the given {@link URI}. + * @throws IOException thrown if a reference to the file system instance could not be obtained. + */ public static FileSystem get(URI uri) throws IOException { return new ForStFlinkFileSystem(FileSystem.get(uri)); } + /** + * Create ByteBufferWritableFSDataOutputStream from specific path which supports to write data + * to ByteBuffer with {@link org.apache.flink.core.fs.FileSystem.WriteMode#OVERWRITE} mode. + * + * @param path The file path to write to. + * @return The stream to the new file at the target path. + * @throws IOException Thrown, if the stream could not be opened because of an I/O, or because a + * file already exists at that path and the write mode indicates to not overwrite the file. + */ + public ByteBufferWritableFSDataOutputStream create(Path path) throws IOException { + return create(path, WriteMode.OVERWRITE); + } + @Override - public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOException { - return delegateFS.create(path, overwriteMode); + public ByteBufferWritableFSDataOutputStream create(Path path, WriteMode overwriteMode) + throws IOException { + return new ByteBufferWritableFSDataOutputStream(delegateFS.create(path, overwriteMode)); } @Override - public FSDataInputStream open(Path path, int bufferSize) throws IOException { - return delegateFS.open(path, bufferSize); + public ByteBufferReadableFSDataInputStream open(Path path, int bufferSize) throws IOException { + return new ByteBufferReadableFSDataInputStream(delegateFS.open(path, bufferSize)); } @Override - public FSDataInputStream open(Path path) throws IOException { - return delegateFS.open(path); + public ByteBufferReadableFSDataInputStream open(Path path) throws IOException { + return new ByteBufferReadableFSDataInputStream(delegateFS.open(path)); } @Override public boolean rename(Path src, Path dst) throws IOException { - // The rename is not atomic for RocksDB. Some FileSystems e.g. HDFS, OSS does not allow a + // The rename is not atomic for ForSt. Some FileSystems e.g. HDFS, OSS does not allow a // renaming if the target already exists. So, we delete the target before attempting the // rename. if (delegateFS.exists(dst)) { @@ -117,10 +138,6 @@ public boolean mkdirs(Path path) throws IOException { return delegateFS.mkdirs(path); } - public FSDataOutputStream create(Path path) throws IOException { - return create(path, WriteMode.OVERWRITE); - } - @Override public boolean isDistributedFS() { return delegateFS.isDistributedFS(); diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java new file mode 100644 index 0000000000000..b0f04b99268da --- /dev/null +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/fs/ForStFlinkFileSystemTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.forst.fs; + +import org.apache.flink.util.concurrent.FutureUtils; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +/** Tests for {@link ForStFlinkFileSystem}. */ +public class ForStFlinkFileSystemTest { + + @TempDir Path tempDir; + + @Test + void testReadAndWriteWithByteBuffer() throws Exception { + ForStFlinkFileSystem fileSystem = + (ForStFlinkFileSystem) ForStFlinkFileSystem.get(new URI(tempDir.toString())); + org.apache.flink.core.fs.Path testFilePath = + new org.apache.flink.core.fs.Path(tempDir.toString() + "/temp-file"); + final int attempt = 200; + + // Test write with ByteBuffer + ByteBufferWritableFSDataOutputStream outputStream = fileSystem.create(testFilePath); + ByteBuffer writeBuffer = ByteBuffer.allocate(20); + for (int i = 0; i < attempt; i++) { + writeBuffer.clear(); + writeBuffer.position(2); + writeBuffer.putLong(i); + writeBuffer.putLong(i * 2); + writeBuffer.flip(); + writeBuffer.position(2); + outputStream.write(writeBuffer); + } + outputStream.flush(); + outputStream.close(); + + // Test sequential read with ByteBuffer + ByteBufferReadableFSDataInputStream inputStream = fileSystem.open(testFilePath); + inputStream.seek(0); + ByteBuffer readBuffer = ByteBuffer.allocate(20); + for (int i = 0; i < attempt; i++) { + readBuffer.clear(); + readBuffer.position(1); + readBuffer.limit(17); + int read = inputStream.readFully(readBuffer); + Assertions.assertThat(read).isEqualTo(16); + Assertions.assertThat(readBuffer.getLong(1)).isEqualTo(i); + Assertions.assertThat(readBuffer.getLong(9)).isEqualTo(i * 2); + } + inputStream.close(); + + // Test random read with ByteBuffer concurrently + ByteBufferReadableFSDataInputStream randomInputStream = fileSystem.open(testFilePath); + List> futureList = new ArrayList<>(); + for (int index = 0; index < attempt; index++) { + futureList.add( + CompletableFuture.runAsync( + () -> { + try { + ByteBuffer randomReadBuffer = ByteBuffer.allocate(20); + for (int i = 0; i < attempt; i += 2) { + randomReadBuffer.clear(); + randomReadBuffer.position(1); + randomReadBuffer.limit(17); + int read = + randomInputStream.readFully( + i * 16L, randomReadBuffer); + Assertions.assertThat(read).isEqualTo(16); + Assertions.assertThat(randomReadBuffer.getLong(1)) + .isEqualTo(i); + Assertions.assertThat(randomReadBuffer.getLong(9)) + .isEqualTo(i * 2L); + } + } catch (Exception ex) { + throw new CompletionException(ex); + } + })); + } + FutureUtils.waitForAll(futureList).get(); + inputStream.close(); + + Assertions.assertThat(fileSystem.exists(testFilePath)).isTrue(); + fileSystem.delete(testFilePath, true); + Assertions.assertThat(fileSystem.exists(testFilePath)).isFalse(); + } +}