Skip to content

Commit

Permalink
Backport Beam PR-828
Browse files Browse the repository at this point in the history
Set Gcs upload buffer size to 1M in DataflowRunner streaming mode
  • Loading branch information
peihe committed Aug 17, 2016
1 parent 39e57d6 commit b9b2c74
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TupleTagList;
import com.google.cloud.hadoop.util.AbstractGoogleAsyncWriteChannel;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -229,6 +230,9 @@ public class DataflowPipelineRunner extends PipelineRunner<DataflowPipelineJob>
// The limit of CreateJob request size.
private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;

@VisibleForTesting
static final int GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT = 1 * 1024 * 1024;

private final Set<PCollection<?>> pcollectionsRequiringIndexedFormat;

/**
Expand Down Expand Up @@ -334,6 +338,9 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) {
+ "' invalid. Please make sure the value is non-negative.");
}

if (dataflowOptions.isStreaming() && dataflowOptions.getGcsUploadBufferSizeBytes() == null) {
dataflowOptions.setGcsUploadBufferSizeBytes(GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT);
}
return new DataflowPipelineRunner(dataflowOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,43 @@ public void testValidJobName() throws IOException {
}
}

@Test
public void testGcsUploadBufferSizeIsUnsetForBatchWhenDefault() throws IOException {
DataflowPipelineOptions batchOptions = buildPipelineOptions();
batchOptions.setRunner(DataflowPipelineRunner.class);
Pipeline.create(batchOptions);
assertNull(batchOptions.getGcsUploadBufferSizeBytes());
}

@Test
public void testGcsUploadBufferSizeIsSetForStreamingWhenDefault() throws IOException {
DataflowPipelineOptions streamingOptions = buildPipelineOptions();
streamingOptions.setStreaming(true);
streamingOptions.setRunner(DataflowPipelineRunner.class);
Pipeline.create(streamingOptions);
assertEquals(
DataflowPipelineRunner.GCS_UPLOAD_BUFFER_SIZE_BYTES_DEFAULT,
streamingOptions.getGcsUploadBufferSizeBytes().intValue());
}

@Test
public void testGcsUploadBufferSizeUnchangedWhenNotDefault() throws IOException {
int gcsUploadBufferSizeBytes = 12345678;
DataflowPipelineOptions batchOptions = buildPipelineOptions();
batchOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
batchOptions.setRunner(DataflowPipelineRunner.class);
Pipeline.create(batchOptions);
assertEquals(gcsUploadBufferSizeBytes, batchOptions.getGcsUploadBufferSizeBytes().intValue());

DataflowPipelineOptions streamingOptions = buildPipelineOptions();
streamingOptions.setStreaming(true);
streamingOptions.setGcsUploadBufferSizeBytes(gcsUploadBufferSizeBytes);
streamingOptions.setRunner(DataflowPipelineRunner.class);
Pipeline.create(streamingOptions);
assertEquals(
gcsUploadBufferSizeBytes, streamingOptions.getGcsUploadBufferSizeBytes().intValue());
}

/**
* A fake PTransform for testing.
*/
Expand Down

0 comments on commit b9b2c74

Please sign in to comment.