Skip to content

Commit

Permalink
[BEAM-8904] properly update output pcollections from expanded transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
ihji committed Dec 9, 2019
1 parent 5143d40 commit 1e53eac
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ default Map<String, PCollection<?>> extractOutputs(OutputT output) {
} else if (output instanceof PCollectionTuple) {
return ((PCollectionTuple) output)
.getAll().entrySet().stream()
.collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue));
.collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue));
} else if (output instanceof PCollectionList<?>) {
PCollectionList<?> listOutput = (PCollectionList<?>) output;
return IntStream.range(0, listOutput.size())
Expand Down Expand Up @@ -334,18 +334,31 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
throw new UnsupportedOperationException(
"Unknown urn: " + request.getTransform().getSpec().getUrn());
}
registeredTransforms
.get(request.getTransform().getSpec().getUrn())
.apply(
pipeline,
request.getTransform().getUniqueName(),
request.getTransform().getSpec(),
inputs);
Map<String, PCollection<?>> outputs =
registeredTransforms
.get(request.getTransform().getSpec().getUrn())
.apply(
pipeline,
request.getTransform().getUniqueName(),
request.getTransform().getSpec(),
inputs);

// Needed to find which transform was new...
SdkComponents sdkComponents =
rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace());
sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT);
Map<String, String> outputMap =
outputs.entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey,
output -> {
try {
return sdkComponents.registerPCollection(output.getValue());
} catch (IOException exn) {
throw new RuntimeException(exn);
}
}));
pipeline.replaceAll(ImmutableList.of(JavaReadViaImpulse.boundedOverride()));
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents);
String expandedTransformId =
Expand All @@ -359,6 +372,8 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
.getTransformsOrThrow(expandedTransformId)
.toBuilder()
.setUniqueName(expandedTransformId)
.clearOutputs()
.putAllOutputs(outputMap)
.build();
LOG.debug("Expanded to {}", expandedTransform);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ public void testConstruct() {
assertEquals(TEST_NAMESPACE + TEST_NAME, expandedTransform.getUniqueName());
// Verify it has the right input.
assertEquals(inputPcollId, Iterables.getOnlyElement(expandedTransform.getInputsMap().values()));
// Verify it has the right output.
assertEquals("output", Iterables.getOnlyElement(expandedTransform.getOutputsMap().keySet()));
// Loose check that it's composite, and its children are represented.
assertNotEquals(expandedTransform.getSubtransformsCount(), 0);
for (String subtransform : expandedTransform.getSubtransformsList()) {
Expand Down

0 comments on commit 1e53eac

Please sign in to comment.