Skip to content

Commit

Permalink
Merge pull request apache#13616: [BEAM-11460] Support reading Parquet…
Browse files Browse the repository at this point in the history
… files with unknown schema
  • Loading branch information
iemejia authored Dec 25, 2020
2 parents b0e3d36 + 0d344ba commit 8149158
Show file tree
Hide file tree
Showing 3 changed files with 330 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@

## New Features / Improvements

* ParquetIO add methods _readGenericRecords_ and _readFilesGenericRecords_ can read files with an unknown schema. See [PR-13554](https://github.com/apache/beam/pull/13554) and ([BEAM-11460](https://issues.apache.org/jira/browse/BEAM-11460))
* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).

## Breaking Changes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.parquet;

import static java.lang.String.format;
import static org.apache.parquet.Preconditions.checkArgument;
import static org.apache.parquet.Preconditions.checkNotNull;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;

Expand All @@ -38,21 +39,30 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.ReadFn;
import org.apache.beam.sdk.io.parquet.ParquetIO.ReadFiles.SplitReadFn;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -152,6 +162,44 @@
* *
* }</pre>
*
* <h3>Reading records of an unknown schema</h3>
*
* <p>To read records from files whose schema is unknown at pipeline construction time or differs
* between files, use {@link #parseGenericRecords(SerializableFunction)} - in this case, you will
* need to specify a parsing function for converting each {@link GenericRecord} into a value of your
* custom type.
*
* <p>For example:
*
* <pre>{@code
* Pipeline p = ...;
*
* PCollection<Foo> records =
* p.apply(
* ParquetIO.parseGenericRecords(
* new SerializableFunction<GenericRecord, Foo>() {
* public Foo apply(GenericRecord record) {
* // If needed, access the schema of the record using record.getSchema()
* return ...;
* }
* })
* .setFilePattern(...));
*
* // For reading from files
* PCollection<FileIO.ReadableFile> files = p.apply(...);
*
* PCollection<Foo> records =
* files
* .apply(
* ParquetIO.parseFilesGenericRecords(
* new SerializableFunction<GenericRecord, Foo>() {
* public Foo apply(GenericRecord record) {
* // If needed, access the schema of the record using record.getSchema()
* return ...;
* }
* }));
* }</pre>
*
* <h3>Writing Parquet files</h3>
*
* <p>{@link ParquetIO.Sink} allows you to write a {@link PCollection} of {@link GenericRecord} into
Expand Down Expand Up @@ -202,7 +250,30 @@ public static Read read(Schema schema) {
*/
public static ReadFiles readFiles(Schema schema) {
return new AutoValue_ParquetIO_ReadFiles.Builder()
.setSplittable(false)
.setSchema(schema)
.build();
}

/**
* Reads {@link GenericRecord} from a Parquet file (or multiple Parquet files matching the
* pattern) and converts to user defined type using provided parseFn.
*/
public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_ParquetIO_Parse.Builder<T>()
.setParseFn(parseFn)
.setSplittable(false)
.build();
}

/**
* Reads {@link GenericRecord} from Parquet files and converts to user defined type using provided
* {@code parseFn}.
*/
public static <T> ParseFiles<T> parseFilesGenericRecords(
SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_ParquetIO_ParseFiles.Builder<T>()
.setParseFn(parseFn)
.setSplittable(false)
.build();
}
Expand Down Expand Up @@ -300,6 +371,121 @@ public void populateDisplayData(DisplayData.Builder builder) {
}
}

/** Implementation of {@link #parseGenericRecords(SerializableFunction)}. */
@AutoValue
public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
abstract @Nullable ValueProvider<String> getFilepattern();

abstract SerializableFunction<GenericRecord, T> getParseFn();

abstract boolean isSplittable();

abstract Builder<T> toBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setFilepattern(ValueProvider<String> inputFiles);

abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);

abstract Builder<T> setSplittable(boolean splittable);

abstract Parse<T> build();
}

public Parse<T> from(ValueProvider<String> inputFiles) {
return toBuilder().setFilepattern(inputFiles).build();
}

public Parse<T> from(String inputFiles) {
return from(ValueProvider.StaticValueProvider.of(inputFiles));
}

public Parse<T> withSplit() {
return toBuilder().setSplittable(true).build();
}

@Override
public PCollection<T> expand(PBegin input) {
checkNotNull(getFilepattern(), "Filepattern cannot be null.");
return input
.apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(
parseFilesGenericRecords(getParseFn())
.toBuilder()
.setSplittable(isSplittable())
.build());
}
}

/** Implementation of {@link #parseFilesGenericRecords(SerializableFunction)}. */
@AutoValue
public abstract static class ParseFiles<T>
extends PTransform<PCollection<FileIO.ReadableFile>, PCollection<T>> {

abstract SerializableFunction<GenericRecord, T> getParseFn();

abstract boolean isSplittable();

abstract Builder<T> toBuilder();

@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);

abstract Builder<T> setSplittable(boolean split);

abstract ParseFiles<T> build();
}

public ParseFiles<T> withSplit() {
return toBuilder().setSplittable(true).build();
}

@Override
public PCollection<T> expand(PCollection<ReadableFile> input) {
checkArgument(!isGenericRecordOutput(), "Parse can't be used for reading as GenericRecord.");

PCollection<T> parsedRecords =
isSplittable()
? input.apply(ParDo.of(new SplitReadFn<>(null, null, getParseFn())))
: input.apply(ParDo.of(new ReadFn<>(null, getParseFn())));

return parsedRecords.setCoder(inferCoder(input.getPipeline().getCoderRegistry()));
}

/** Returns true if expected output is {@code PCollection<GenericRecord>}. */
private boolean isGenericRecordOutput() {
String outputType = TypeDescriptors.outputOf(getParseFn()).getType().getTypeName();
return outputType.equals(GenericRecord.class.getTypeName());
}

/**
* Identifies the {@code Coder} to be used for the output PCollection.
*
* <p>Returns {@link AvroCoder} if expected output is {@link GenericRecord}.
*
* @param coderRegistry the {@link org.apache.beam.sdk.Pipeline}'s CoderRegistry to identify
* Coder for expected output type of {@link #getParseFn()}
*/
private Coder<T> inferCoder(CoderRegistry coderRegistry) {
if (isGenericRecordOutput()) {
throw new IllegalArgumentException("Parse can't be used for reading as GenericRecord.");
}

// If not GenericRecord infer it from ParseFn.
try {
return coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException(
"Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
e);
}
}
}

/** Implementation of {@link #readFiles(Schema)}. */
@AutoValue
public abstract static class ReadFiles
Expand Down Expand Up @@ -357,26 +543,35 @@ public PCollection<GenericRecord> expand(PCollection<FileIO.ReadableFile> input)
if (isSplittable()) {
Schema coderSchema = getProjectionSchema() == null ? getSchema() : getEncoderSchema();
return input
.apply(ParDo.of(new SplitReadFn(getAvroDataModel(), getProjectionSchema())))
.apply(
ParDo.of(
new SplitReadFn<>(
getAvroDataModel(),
getProjectionSchema(),
GenericRecordPassthroughFn.create())))
.setCoder(AvroCoder.of(coderSchema));
}
return input
.apply(ParDo.of(new ReadFn(getAvroDataModel())))
.apply(ParDo.of(new ReadFn<>(getAvroDataModel(), GenericRecordPassthroughFn.create())))
.setCoder(AvroCoder.of(getSchema()));
}

@DoFn.BoundedPerElement
static class SplitReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
static class SplitReadFn<T> extends DoFn<FileIO.ReadableFile, T> {
private Class<? extends GenericData> modelClass;
private static final Logger LOG = LoggerFactory.getLogger(SplitReadFn.class);
private String requestSchemaString;
// Default initial splitting the file into blocks of 64MB. Unit of SPLIT_LIMIT is byte.
private static final long SPLIT_LIMIT = 64000000;

SplitReadFn(GenericData model, Schema requestSchema) {
private final SerializableFunction<GenericRecord, T> parseFn;

SplitReadFn(
GenericData model, Schema requestSchema, SerializableFunction<GenericRecord, T> parseFn) {

this.modelClass = model != null ? model.getClass() : null;
this.requestSchemaString = requestSchema != null ? requestSchema.toString() : null;
this.parseFn = checkNotNull(parseFn, "GenericRecord parse function can't be null");
}

ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws Exception {
Expand All @@ -388,7 +583,7 @@ ParquetFileReader getParquetFileReader(FileIO.ReadableFile file) throws Exceptio
public void processElement(
@Element FileIO.ReadableFile file,
RestrictionTracker<OffsetRange, Long> tracker,
OutputReceiver<GenericRecord> outputReceiver)
OutputReceiver<T> outputReceiver)
throws Exception {
LOG.debug(
"start "
Expand Down Expand Up @@ -468,7 +663,7 @@ record = recordReader.read();
file.toString());
continue;
}
outputReceiver.output(record);
outputReceiver.output(parseFn.apply(record));
} catch (RuntimeException e) {

throw new ParquetDecodingException(
Expand Down Expand Up @@ -618,12 +813,15 @@ public Progress getProgress() {
}
}

static class ReadFn extends DoFn<FileIO.ReadableFile, GenericRecord> {
static class ReadFn<T> extends DoFn<FileIO.ReadableFile, T> {

private Class<? extends GenericData> modelClass;

ReadFn(GenericData model) {
private final SerializableFunction<GenericRecord, T> parseFn;

ReadFn(GenericData model, SerializableFunction<GenericRecord, T> parseFn) {
this.modelClass = model != null ? model.getClass() : null;
this.parseFn = checkNotNull(parseFn, "GenericRecord parse function is null");
}

@ProcessElement
Expand All @@ -647,7 +845,7 @@ public void processElement(ProcessContext processContext) throws Exception {
try (ParquetReader<GenericRecord> reader = builder.build()) {
GenericRecord read;
while ((read = reader.read()) != null) {
processContext.output(read);
processContext.output(parseFn.apply(read));
}
}
}
Expand Down Expand Up @@ -838,6 +1036,28 @@ public void close() throws IOException {
}
}

/**
* Passthrough function to provide seamless backward compatibility to ParquetIO's functionality.
*/
@VisibleForTesting
static final class GenericRecordPassthroughFn
implements SerializableFunction<GenericRecord, GenericRecord> {

private static final GenericRecordPassthroughFn singleton = new GenericRecordPassthroughFn();

static GenericRecordPassthroughFn create() {
return singleton;
}

@Override
public GenericRecord apply(GenericRecord input) {
return input;
}

/** Enforce singleton pattern, by disallowing construction with {@code new} operator. */
private GenericRecordPassthroughFn() {}
}

/** Disallow construction of utility class. */
private ParquetIO() {}
}
Loading

0 comments on commit 8149158

Please sign in to comment.