Skip to content

Commit

Permalink
Merge pull request apache#7922: [BEAM-6819] Report estimated size as …
Browse files Browse the repository at this point in the history
…source metadata when splitting
  • Loading branch information
chamikaramj authored Mar 14, 2019
2 parents d7ea7aa + e4ec725 commit 0fa4e20
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.api.services.dataflow.model.DerivedSource;
import com.google.api.services.dataflow.model.DynamicSourceSplit;
import com.google.api.services.dataflow.model.ReportedParallelism;
import com.google.api.services.dataflow.model.SourceMetadata;
import com.google.api.services.dataflow.model.SourceOperationResponse;
import com.google.api.services.dataflow.model.SourceSplitOptions;
import com.google.api.services.dataflow.model.SourceSplitRequest;
Expand All @@ -55,6 +56,7 @@
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.WindowedValue;
Expand Down Expand Up @@ -133,9 +135,7 @@ public static DynamicSourceSplit toSourceSplit(BoundedSourceSplit<?> sourceSplit

/**
* Version of {@link CustomSources#serializeToCloudSource(Source, PipelineOptions)} intended for
* use on splits of {@link BoundedSource}: the backend only needs the metadata for top-level
* sources, so here we bypass computing it (esp. the costly {@link
* BoundedSource#getEstimatedSizeBytes(PipelineOptions)}).
* use on splits of {@link BoundedSource}.
*/
private static com.google.api.services.dataflow.model.Source serializeSplitToCloudSource(
BoundedSource<?> source) throws Exception {
Expand All @@ -144,6 +144,14 @@ private static com.google.api.services.dataflow.model.Source serializeSplitToClo
cloudSource.setSpec(CloudObject.forClass(CustomSources.class));
addString(
cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source)));
SourceMetadata metadata = new SourceMetadata();
// Size estimation is best effort so we continue even if it fails here.
try {
metadata.setEstimatedSizeBytes(source.getEstimatedSizeBytes(PipelineOptionsFactory.create()));
} catch (Exception e) {
LOG.warn("Size estimation of the source failed: " + source, e);
}
cloudSource.setMetadata(metadata);
return cloudSource;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public void testSplitAndReadBundlesBack() throws Exception {
"Failed on bundle " + i,
xs,
contains(valueInGlobalWindow(0L + 2 * i), valueInGlobalWindow(1L + 2 * i)));
assertTrue(bundle.getSource().getMetadata().getEstimatedSizeBytes() > 0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io;

import static org.apache.beam.sdk.io.FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -215,6 +216,15 @@ public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPat
readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
}

public static AvroSource<GenericRecord> from(Metadata metadata) {
return new AvroSource<>(
metadata,
DEFAULT_MIN_BUNDLE_SIZE,
0,
metadata.sizeBytes(),
readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
}

/** Like {@link #from(ValueProvider)}. */
public static AvroSource<GenericRecord> from(String fileNameOrPattern) {
return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
Expand Down Expand Up @@ -244,6 +254,14 @@ public AvroSource<GenericRecord> withSchema(Schema schema) {
/** Reads files containing records of the given class. */
public <X> AvroSource<X> withSchema(Class<X> clazz) {
checkArgument(clazz != null, "clazz can not be null");
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(),
getMinBundleSize(),
getStartOffset(),
getEndOffset(),
readGeneratedClasses(clazz));
}
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
Expand All @@ -259,6 +277,14 @@ public <X> AvroSource<X> withParseFn(
SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {
checkArgument(parseFn != null, "parseFn can not be null");
checkArgument(coder != null, "coder can not be null");
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(),
getMinBundleSize(),
getStartOffset(),
getEndOffset(),
parseGenericRecords(parseFn, coder));
}
return new AvroSource<>(
getFileOrPatternSpecProvider(),
getEmptyMatchTreatment(),
Expand All @@ -271,6 +297,10 @@ public <X> AvroSource<X> withParseFn(
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
if (getMode() == SINGLE_FILE_OR_SUBRANGE) {
return new AvroSource<>(
getSingleFileMetadata(), minBundleSize, getStartOffset(), getEndOffset(), mode);
}
return new AvroSource<>(
getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,25 @@ public void testReadSchemaString() throws Exception {
assertEquals(4, schema.getFields().size());
}

@Test
public void testCreateFromMetadata() throws Exception {
List<Bird> expected = createRandomRecords(DEFAULT_RECORD_COUNT);
String codec = DataFileConstants.NULL_CODEC;
String filename =
generateTestFile(
codec, expected, SyncBehavior.SYNC_DEFAULT, 0, AvroCoder.of(Bird.class), codec);
Metadata fileMeta = FileSystems.matchSingleFileSpec(filename);

AvroSource<GenericRecord> source = AvroSource.from(fileMeta);
AvroSource<Bird> sourceWithSchema = source.withSchema(Bird.class);
AvroSource<Bird> sourceWithSchemaWithMinBundleSize = sourceWithSchema.withMinBundleSize(1234);

assertEquals(FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, source.getMode());
assertEquals(FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, sourceWithSchema.getMode());
assertEquals(
FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE, sourceWithSchemaWithMinBundleSize.getMode());
}

/**
* Class that will encode to a fixed size: 16 bytes.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,8 @@ public void processElement(ProcessContext c) throws Exception {
ImmutableList.of(
FileSystems.matchNewResource(
c.element(), false /* is directory */)),
schema);
schema,
null);
checkArgument(sources.size() == 1, "Expected exactly one source.");
BoundedSource<T> avroSource = sources.get(0);
BoundedSource.BoundedReader<T> reader =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.FileSystems.match;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
Expand All @@ -36,6 +37,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
Expand Down Expand Up @@ -90,10 +92,17 @@ abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
protected static class ExtractResult {
public final TableSchema schema;
public final List<ResourceId> extractedFiles;
public List<MatchResult.Metadata> metadata = null;

public ExtractResult(TableSchema schema, List<ResourceId> extractedFiles) {
this(schema, extractedFiles, null);
}

public ExtractResult(
TableSchema schema, List<ResourceId> extractedFiles, List<MatchResult.Metadata> metadata) {
this.schema = schema;
this.extractedFiles = extractedFiles;
this.metadata = metadata;
}
}

Expand Down Expand Up @@ -138,8 +147,19 @@ public List<BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions
if (cachedSplitResult == null) {
ExtractResult res = extractFiles(options);
LOG.info("Extract job produced {} files", res.extractedFiles.size());

if (res.extractedFiles.size() > 0) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
final String extractDestinationDir =
resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
// Match all files in the destination directory to stat them in bulk.
List<MatchResult> matches = match(ImmutableList.of(extractDestinationDir + "*"));
if (matches.size() > 0) {
res.metadata = matches.get(0).metadata();
}
}
cleanupTempResource(options.as(BigQueryOptions.class));
cachedSplitResult = checkNotNull(createSources(res.extractedFiles, res.schema));
cachedSplitResult = checkNotNull(createSources(res.extractedFiles, res.schema, res.metadata));
}
return cachedSplitResult;
}
Expand Down Expand Up @@ -206,7 +226,8 @@ public TableSchema apply(@Nullable String input) {
}
}

List<BoundedSource<T>> createSources(List<ResourceId> files, TableSchema schema)
List<BoundedSource<T>> createSources(
List<ResourceId> files, TableSchema schema, List<MatchResult.Metadata> metadata)
throws IOException, InterruptedException {

final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);
Expand All @@ -221,9 +242,18 @@ public T apply(GenericRecord input) {
return parseFn.apply(new SchemaAndRecord(input, schema.get()));
}
};

List<BoundedSource<T>> avroSources = Lists.newArrayList();
for (ResourceId file : files) {
avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));
// If metadata is available, create AvroSources with said metadata in SINGLE_FILE_OR_SUBRANGE
// mode.
if (metadata != null) {
for (MatchResult.Metadata file : metadata) {
avroSources.add(AvroSource.from(file).withParseFn(fnWrapper, getOutputCoder()));
}
} else {
for (ResourceId file : files) {
avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));
}
}
return ImmutableList.copyOf(avroSources);
}
Expand Down

0 comments on commit 0fa4e20

Please sign in to comment.