Skip to content

Commit

Permalink
ORC: Add FileIO support for readers and writers (apache#6293)
Browse files Browse the repository at this point in the history
  • Loading branch information
pavibhai authored Dec 18, 2022
1 parent 5a9eb3c commit b4d9770
Show file tree
Hide file tree
Showing 6 changed files with 460 additions and 32 deletions.
57 changes: 56 additions & 1 deletion core/src/main/java/org/apache/iceberg/hadoop/HadoopStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Arrays;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.iceberg.io.DelegatingInputStream;
import org.apache.iceberg.io.DelegatingOutputStream;
import org.apache.iceberg.io.PositionOutputStream;
Expand All @@ -38,7 +39,7 @@
*
* <p>This class is based on Parquet's HadoopStreams.
*/
class HadoopStreams {
public class HadoopStreams {

private HadoopStreams() {}

Expand All @@ -65,6 +66,16 @@ static PositionOutputStream wrap(FSDataOutputStream stream) {
return new HadoopPositionOutputStream(stream);
}

/**
* Wraps a {@link SeekableInputStream} in a {@link FSDataOutputStream} implementation for readers.
*
* @param stream a SeekableInputStream
* @return a FSDataOutputStream
*/
public static FSInputStream wrap(SeekableInputStream stream) {
return new WrappedSeekableInputStream(stream);
}

/**
* SeekableInputStream implementation for FSDataInputStream that implements ByteBufferReadable in
* Hadoop 2.
Expand Down Expand Up @@ -190,4 +201,48 @@ protected void finalize() throws Throwable {
}
}
}

private static class WrappedSeekableInputStream extends FSInputStream
implements DelegatingInputStream {
private final SeekableInputStream inputStream;

private WrappedSeekableInputStream(SeekableInputStream inputStream) {
this.inputStream = inputStream;
}

@Override
public void seek(long pos) throws IOException {
inputStream.seek(pos);
}

@Override
public long getPos() throws IOException {
return inputStream.getPos();
}

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
throw new UnsupportedOperationException("seekToNewSource not supported");
}

@Override
public int read() throws IOException {
return inputStream.read();
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return inputStream.read(b, off, len);
}

@Override
public void close() throws IOException {
inputStream.close();
}

@Override
public InputStream getDelegate() {
return inputStream;
}
}
}
164 changes: 164 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/FileIOFSUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.orc;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import org.apache.iceberg.hadoop.HadoopStreams;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class FileIOFSUtil {
private FileIOFSUtil() {}

private static class NullFileSystem extends FileSystem {

@Override
public URI getUri() {
throw new UnsupportedOperationException();
}

@Override
public FSDataInputStream open(Path f) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public FSDataOutputStream create(
Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress)
throws IOException {
throw new UnsupportedOperationException();
}

@Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
throws IOException {
throw new UnsupportedOperationException();
}

@Override
public boolean rename(Path src, Path dst) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public boolean delete(Path f, boolean recursive) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
throw new UnsupportedOperationException();
}

@Override
public void setWorkingDirectory(Path new_dir) {
throw new UnsupportedOperationException();
}

@Override
public Path getWorkingDirectory() {
throw new UnsupportedOperationException();
}

@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public FileStatus getFileStatus(Path f) throws IOException {
throw new UnsupportedOperationException();
}
}

static class InputFileSystem extends NullFileSystem {
private final InputFile inputFile;
private final Path inputPath;

InputFileSystem(InputFile inputFile) {
this.inputFile = inputFile;
this.inputPath = new Path(inputFile.location());
}

@Override
public FSDataInputStream open(Path f) throws IOException {
Preconditions.checkArgument(
f.equals(inputPath), String.format("Input %s does not equal expected %s", f, inputPath));
return new FSDataInputStream(HadoopStreams.wrap(inputFile.newStream()));
}

@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
return open(f);
}
}

static class OutputFileSystem extends NullFileSystem {
private final OutputFile outputFile;
private final Path outPath;

OutputFileSystem(OutputFile outputFile) {
this.outputFile = outputFile;
this.outPath = new Path(outputFile.location());
}

@Override
public FSDataOutputStream create(Path f, boolean overwrite) throws IOException {
Preconditions.checkArgument(
f.equals(outPath), String.format("Input %s does not equal expected %s", f, outPath));
OutputStream outputStream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
return new FSDataOutputStream(outputStream, null);
}

@Override
public FSDataOutputStream create(
Path f,
FsPermission permission,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress)
throws IOException {
return create(f, overwrite);
}
}
}
40 changes: 32 additions & 8 deletions orc/src/main/java/org/apache/iceberg/orc/ORC.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.apache.iceberg.TableProperties.ORC_WRITE_BATCH_SIZE_DEFAULT;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.apache.orc.OrcFile.ReaderOptions;
import org.apache.orc.Reader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

@SuppressWarnings("checkstyle:AbbreviationAsWordInName")
Expand Down Expand Up @@ -764,19 +766,41 @@ public <D> CloseableIterable<D> build() {
}
}

static Reader newFileReader(String location, ReaderOptions readerOptions) {
static Reader newFileReader(InputFile file, Configuration config) {
ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true);
if (file instanceof HadoopInputFile) {
readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
} else {
// In case of any other InputFile we wrap the InputFile with InputFileSystem that only
// supports the creation of an InputStream. To prevent a file status call to determine the
// length we supply the length as input
readerOptions.filesystem(new FileIOFSUtil.InputFileSystem(file)).maxLength(file.getLength());
}
try {
return OrcFile.createReader(new Path(location), readerOptions);
return OrcFile.createReader(new Path(file.location()), readerOptions);
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Failed to open file: %s", location);
throw new RuntimeIOException(ioe, "Failed to open file: %s", file.location());
}
}

static Reader newFileReader(InputFile file, Configuration config) {
ReaderOptions readerOptions = OrcFile.readerOptions(config).useUTCTimestamp(true);
if (file instanceof HadoopInputFile) {
readerOptions.filesystem(((HadoopInputFile) file).getFileSystem());
static Writer newFileWriter(
OutputFile file, OrcFile.WriterOptions options, Map<String, byte[]> metadata) {
if (file instanceof HadoopOutputFile) {
options.fileSystem(((HadoopOutputFile) file).getFileSystem());
} else {
options.fileSystem(new FileIOFSUtil.OutputFileSystem(file));
}
return newFileReader(file.location(), readerOptions);
final Path locPath = new Path(file.location());
final Writer writer;

try {
writer = OrcFile.createWriter(locPath, options);
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Can't create file %s", locPath);
}

metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value)));

return writer;
}
}
21 changes: 1 addition & 20 deletions orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -88,8 +86,7 @@ class OrcFileAppender<D> implements FileAppender<D> {
options.fileSystem(((HadoopOutputFile) file).getFileSystem());
}
options.setSchema(orcSchema);
this.writer = newOrcWriter(file, options, metadata);

this.writer = ORC.newFileWriter(file, options, metadata);
this.valueWriter = newOrcRowWriter(schema, orcSchema, createWriterFunc);
}

Expand Down Expand Up @@ -170,22 +167,6 @@ public void close() throws IOException {
}
}

private static Writer newOrcWriter(
OutputFile file, OrcFile.WriterOptions options, Map<String, byte[]> metadata) {
final Path locPath = new Path(file.location());
final Writer writer;

try {
writer = OrcFile.createWriter(locPath, options);
} catch (IOException ioe) {
throw new RuntimeIOException(ioe, "Can't create file %s", locPath);
}

metadata.forEach((key, value) -> writer.addUserMetadata(key, ByteBuffer.wrap(value)));

return writer;
}

@SuppressWarnings("unchecked")
private static <D> OrcRowWriter<D> newOrcRowWriter(
Schema schema,
Expand Down
Loading

0 comments on commit b4d9770

Please sign in to comment.