Skip to content

Commit

Permalink
Move streaming custom transform expansions into the runner
Browse files Browse the repository at this point in the history
Previously, these transforms had ad hoc code in their `apply` method
that customized their expansion for the streaming (or sometimes batch)
runner. All of these are now encapsulated in the runner.

-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=96703501
  • Loading branch information
kennknowles authored and lukecwik committed Jun 26, 2015
1 parent 4153044 commit 6cba946
Show file tree
Hide file tree
Showing 9 changed files with 752 additions and 428 deletions.
12 changes: 3 additions & 9 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.StreamingOptions;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
Expand Down Expand Up @@ -632,7 +631,7 @@ public void processElement(ProcessContext c) throws IOException {
}
}
} catch (IOException e) {
throw new RuntimeException("Unexected exception while reading from Pubsub: ", e);
throw new RuntimeException("Unexpected exception while reading from Pubsub: ", e);
} finally {
if (getTopic() != null) {
try {
Expand Down Expand Up @@ -800,13 +799,8 @@ public PDone apply(PCollection<T> input) {
throw new IllegalStateException(
"need to set the topic of a PubsubIO.Write transform");
}

if (input.getPipeline().getOptions().as(StreamingOptions.class).isStreaming()) {
return PDone.in(input.getPipeline());
} else {
input.apply(ParDo.of(new PubsubWriter()));
return PDone.in(input.getPipeline());
}
input.apply(ParDo.of(new PubsubWriter()));
return PDone.in(input.getPipeline());
}

@Override
Expand Down
Loading

0 comments on commit 6cba946

Please sign in to comment.