Skip to content

Commit

Permalink
GCP: Add range reads to GCSInputStream (apache#8301)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Aug 15, 2023
1 parent 4c1188a commit 462a203
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class GCPProperties implements Serializable {

public GCPProperties() {}

@SuppressWarnings("JavaUtilDate") // GCP API uses java.util.Date
public GCPProperties(Map<String, String> properties) {
projectId = properties.get(GCS_PROJECT_ID);
clientLibToken = properties.get(GCS_CLIENT_LIB_TOKEN);
Expand Down
14 changes: 7 additions & 7 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.iceberg.metrics.MetricsContext;

class GCSInputFile extends BaseGCSFile implements InputFile {
private Long length;
private Long blobSize;

static GCSInputFile fromLocation(
String location, Storage storage, GCPProperties gcpProperties, MetricsContext metrics) {
Expand All @@ -50,24 +50,24 @@ static GCSInputFile fromLocation(
GCSInputFile(
Storage storage,
BlobId blobId,
Long length,
Long blobSize,
GCPProperties gcpProperties,
MetricsContext metrics) {
super(storage, blobId, gcpProperties, metrics);
this.length = length;
this.blobSize = blobSize;
}

@Override
public long getLength() {
if (length == null) {
this.length = getBlob().getSize();
if (blobSize == null) {
this.blobSize = getBlob().getSize();
}

return length;
return blobSize;
}

@Override
public SeekableInputStream newStream() {
return new GCSInputStream(storage(), blobId(), gcpProperties(), metrics());
return new GCSInputStream(storage(), blobId(), blobSize, gcpProperties(), metrics());
}
}
61 changes: 51 additions & 10 deletions gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobSourceOption;
import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.FileIOMetricsContext;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.Counter;
import org.apache.iceberg.metrics.MetricsContext;
Expand All @@ -43,12 +45,13 @@
* The GCSInputStream leverages native streaming channels from the GCS API for streaming uploads.
* See <a href="https://cloud.google.com/storage/docs/streaming">Streaming Transfers</a>
*/
class GCSInputStream extends SeekableInputStream {
class GCSInputStream extends SeekableInputStream implements RangeReadable {
private static final Logger LOG = LoggerFactory.getLogger(GCSInputStream.class);

private final StackTraceElement[] createStack;
private final Storage storage;
private final BlobId blobId;
private Long blobSize;
private final GCPProperties gcpProperties;

private ReadChannel channel;
Expand All @@ -61,9 +64,14 @@ class GCSInputStream extends SeekableInputStream {
private final Counter readOperations;

GCSInputStream(
Storage storage, BlobId blobId, GCPProperties gcpProperties, MetricsContext metrics) {
Storage storage,
BlobId blobId,
Long blobSize,
GCPProperties gcpProperties,
MetricsContext metrics) {
this.storage = storage;
this.blobId = blobId;
this.blobSize = blobSize;
this.gcpProperties = gcpProperties;

this.readBytes = metrics.counter(FileIOMetricsContext.READ_BYTES, Unit.BYTES);
Expand All @@ -75,6 +83,10 @@ class GCSInputStream extends SeekableInputStream {
}

private void openStream() {
channel = openChannel();
}

private ReadChannel openChannel() {
List<BlobSourceOption> sourceOptions = Lists.newArrayList();

gcpProperties
Expand All @@ -84,9 +96,11 @@ private void openStream() {
.userProject()
.ifPresent(userProject -> sourceOptions.add(BlobSourceOption.userProject(userProject)));

channel = storage.reader(blobId, sourceOptions.toArray(new BlobSourceOption[0]));
ReadChannel result = storage.reader(blobId, sourceOptions.toArray(new BlobSourceOption[0]));

gcpProperties.channelReadChunkSize().ifPresent(result::setChunkSize);

gcpProperties.channelReadChunkSize().ifPresent(channel::setChunkSize);
return result;
}

@Override
Expand Down Expand Up @@ -123,19 +137,46 @@ public int read() throws IOException {
@Override
public int read(byte[] b, int off, int len) throws IOException {
Preconditions.checkState(!closed, "Cannot read: already closed");

byteBuffer = byteBuffer != null && byteBuffer.array() == b ? byteBuffer : ByteBuffer.wrap(b);
byteBuffer.position(off);
byteBuffer.limit(Math.min(off + len, byteBuffer.capacity()));

int bytesRead = channel.read(byteBuffer);
int bytesRead = read(channel, byteBuffer, off, len);
pos += bytesRead;
readBytes.increment(bytesRead);
readOperations.increment();

return bytesRead;
}

@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
try (ReadChannel readChannel = openChannel()) {
readChannel.seek(position);
readChannel.limit(position + length);
int bytesRead = read(readChannel, ByteBuffer.wrap(buffer), offset, length);
if (bytesRead < length) {
throw new EOFException(
"Reached the end of stream with " + (length - bytesRead) + " bytes left to read");
}
}
}

@Override
public int readTail(byte[] buffer, int offset, int length) throws IOException {
if (blobSize == null) {
blobSize = storage.get(blobId).getSize();
}
long startPosition = Math.max(0, blobSize - length);
try (ReadChannel readChannel = openChannel()) {
readChannel.seek(startPosition);
return read(readChannel, ByteBuffer.wrap(buffer), offset, length);
}
}

private int read(ReadChannel readChannel, ByteBuffer buffer, int off, int len)
throws IOException {
buffer.position(off);
buffer.limit(Math.min(off + len, buffer.capacity()));
return readChannel.read(buffer);
}

@Override
public void close() throws IOException {
super.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import java.util.Random;
import org.apache.iceberg.gcp.GCPProperties;
import org.apache.iceberg.io.IOUtil;
import org.apache.iceberg.io.RangeReadable;
import org.apache.iceberg.io.SeekableInputStream;
import org.apache.iceberg.metrics.MetricsContext;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class GCSInputStreamTest {
Expand All @@ -51,7 +53,7 @@ public void testRead() throws Exception {
writeGCSData(uri, data);

try (SeekableInputStream in =
new GCSInputStream(storage, uri, gcpProperties, MetricsContext.nullMetrics())) {
new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) {
int readSize = 1024;
byte[] actual = new byte[readSize];

Expand Down Expand Up @@ -88,7 +90,7 @@ public void testReadSingle() throws Exception {
writeGCSData(uri, data);

try (SeekableInputStream in =
new GCSInputStream(storage, uri, gcpProperties, MetricsContext.nullMetrics())) {
new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) {
assertThat(in.read()).isEqualTo(i0);
assertThat(in.read()).isEqualTo(i1);
}
Expand Down Expand Up @@ -116,11 +118,54 @@ private void readAndCheck(
assertThat(actual).isEqualTo(Arrays.copyOfRange(original, (int) rangeStart, (int) rangeEnd));
}

@Test
public void testRangeRead() throws Exception {
BlobId uri = BlobId.fromGsUtilUri("gs://bucket/path/to/read.dat");
int dataSize = 1024 * 1024 * 10;
byte[] expected = randomData(dataSize);
byte[] actual = new byte[dataSize];

long position;
int offset;
int length;

writeGCSData(uri, expected);

try (RangeReadable in =
new GCSInputStream(storage, uri, null, gcpProperties, MetricsContext.nullMetrics())) {
// first 1k
position = 0;
offset = 0;
length = 1024;
readAndCheckRanges(in, expected, position, actual, offset, length);

// last 1k
position = dataSize - 1024;
offset = dataSize - 1024;
readAndCheckRanges(in, expected, position, actual, offset, length);

// middle 2k
position = dataSize / 2 - 1024;
offset = dataSize / 2 - 1024;
length = 1024 * 2;
readAndCheckRanges(in, expected, position, actual, offset, length);
}
}

private void readAndCheckRanges(
RangeReadable in, byte[] original, long position, byte[] buffer, int offset, int length)
throws IOException {
in.readFully(position, buffer, offset, length);

Assertions.assertThat(Arrays.copyOfRange(buffer, offset, offset + length))
.isEqualTo(Arrays.copyOfRange(original, offset, offset + length));
}

@Test
public void testClose() throws Exception {
BlobId blobId = BlobId.fromGsUtilUri("gs://bucket/path/to/closed.dat");
SeekableInputStream closed =
new GCSInputStream(storage, blobId, gcpProperties, MetricsContext.nullMetrics());
new GCSInputStream(storage, blobId, null, gcpProperties, MetricsContext.nullMetrics());
closed.close();
assertThatThrownBy(() -> closed.seek(0)).isInstanceOf(IllegalStateException.class);
}
Expand All @@ -133,7 +178,7 @@ public void testSeek() throws Exception {
writeGCSData(blobId, data);

try (SeekableInputStream in =
new GCSInputStream(storage, blobId, gcpProperties, MetricsContext.nullMetrics())) {
new GCSInputStream(storage, blobId, null, gcpProperties, MetricsContext.nullMetrics())) {
in.seek(data.length / 2);
byte[] actual = new byte[data.length / 2];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void testMissingScheme() {
}

@Test
public void tesInvalidScheme() {
public void testInvalidScheme() {
Assertions.assertThatThrownBy(() -> new GCSLocation("s3://bucket/path/to/prefix"))
.isInstanceOf(ValidationException.class)
.hasMessage("Invalid GCS URI, invalid scheme: s3");
Expand Down

0 comments on commit 462a203

Please sign in to comment.