Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#564 from jkff/backport-2594
Browse files Browse the repository at this point in the history
Cache result of BigQuerySourceBase.split
  • Loading branch information
dhalperi authored Apr 19, 2017
2 parents 67bfc90 + 7cecf6e commit 2a1627b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 14 deletions.
39 changes: 26 additions & 13 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,8 @@ private abstract static class BigQuerySourceBase extends BoundedSource<TableRow>
protected final BigQueryServices bqServices;
protected final ValueProvider<String> executingProject;

private List<BoundedSource<TableRow>> cachedSplitResult;

private BigQuerySourceBase(
String jobIdToken,
String extractDestinationDir,
Expand All @@ -1225,19 +1227,30 @@ private BigQuerySourceBase(
@Override
public List<BoundedSource<TableRow>> splitIntoBundles(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
JobService jobService = bqServices.getJobService(bqOptions);
String extractJobId = getExtractJobId(jobIdToken);
List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);

TableSchema tableSchema = bqServices.getDatasetService(bqOptions).getTable(
tableToExtract.getProjectId(),
tableToExtract.getDatasetId(),
tableToExtract.getTableId()).getSchema();

cleanupTempResource(bqOptions);
return createSources(tempFiles, tableSchema);
// splitIntoBundles() can be called multiple times, e.g. Dataflow runner may call it multiple
// times with different desiredBundleSizeBytes in case the splitIntoBundles() call produces
// too many sources. We ignore desiredBundleSizeBytes anyway, however in any case, we should
// not initiate another BigQuery extract job for the repeated splitIntoBundles() calls.
if (cachedSplitResult == null) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
JobService jobService = bqServices.getJobService(bqOptions);
String extractJobId = getExtractJobId(jobIdToken);
List<String> tempFiles = executeExtract(extractJobId, tableToExtract, jobService);

TableSchema tableSchema =
bqServices
.getDatasetService(bqOptions)
.getTable(
tableToExtract.getProjectId(),
tableToExtract.getDatasetId(),
tableToExtract.getTableId())
.getSchema();

cleanupTempResource(bqOptions);
cachedSplitResult = createSources(tempFiles, tableSchema);
}
return cachedSplitResult;
}

protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

import com.google.api.client.util.Data;
Expand Down Expand Up @@ -1130,10 +1131,14 @@ public void testBigQueryTableSourceInitSplit() throws Exception {

List<? extends BoundedSource<TableRow>> sources = bqSource.splitIntoBundles(100, options);
assertEquals(1, sources.size());
// Simulate a repeated call to splitIntoBundles(), like a Dataflow worker will sometimes do.
sources = bqSource.splitIntoBundles(200, options);
assertEquals(1, sources.size());
BoundedSource<TableRow> actual = sources.get(0);
assertThat(actual, CoreMatchers.instanceOf(TransformingSource.class));

Mockito.verify(mockJobService)
// A repeated call to splitIntoBundles() should not have caused a duplicate extract job.
Mockito.verify(mockJobService, times(1))
.startExtractJob(Mockito.<JobReference>any(), Mockito.<JobConfigurationExtract>any());
}

Expand Down

0 comments on commit 2a1627b

Please sign in to comment.