From 1e53eac391c7f69467bbf8ad453c424f643a0fc7 Mon Sep 17 00:00:00 2001 From: Heejong Lee Date: Thu, 5 Dec 2019 16:45:17 -0800 Subject: [PATCH] [BEAM-8904] properly update output pcollections from expanded transforms --- .../expansion/ExpansionService.java | 31 ++++++++++++++----- .../expansion/ExpansionServiceTest.java | 2 ++ 2 files changed, 25 insertions(+), 8 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java index ae1a3d724a20..ba0ed180d6c7 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/expansion/ExpansionService.java @@ -277,7 +277,7 @@ default Map> extractOutputs(OutputT output) { } else if (output instanceof PCollectionTuple) { return ((PCollectionTuple) output) .getAll().entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey().toString(), Map.Entry::getValue)); + .collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue)); } else if (output instanceof PCollectionList) { PCollectionList listOutput = (PCollectionList) output; return IntStream.range(0, listOutput.size()) @@ -334,18 +334,31 @@ private Map loadRegisteredTransforms() { throw new UnsupportedOperationException( "Unknown urn: " + request.getTransform().getSpec().getUrn()); } - registeredTransforms - .get(request.getTransform().getSpec().getUrn()) - .apply( - pipeline, - request.getTransform().getUniqueName(), - request.getTransform().getSpec(), - inputs); + Map> outputs = + registeredTransforms + .get(request.getTransform().getSpec().getUrn()) + .apply( + pipeline, + request.getTransform().getUniqueName(), + request.getTransform().getSpec(), + inputs); // Needed to find which transform was new... SdkComponents sdkComponents = rehydratedComponents.getSdkComponents().withNewIdPrefix(request.getNamespace()); sdkComponents.registerEnvironment(Environments.JAVA_SDK_HARNESS_ENVIRONMENT); + Map outputMap = + outputs.entrySet().stream() + .collect( + Collectors.toMap( + Map.Entry::getKey, + output -> { + try { + return sdkComponents.registerPCollection(output.getValue()); + } catch (IOException exn) { + throw new RuntimeException(exn); + } + })); pipeline.replaceAll(ImmutableList.of(JavaReadViaImpulse.boundedOverride())); RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline, sdkComponents); String expandedTransformId = @@ -359,6 +372,8 @@ private Map loadRegisteredTransforms() { .getTransformsOrThrow(expandedTransformId) .toBuilder() .setUniqueName(expandedTransformId) + .clearOutputs() + .putAllOutputs(outputMap) .build(); LOG.debug("Expanded to {}", expandedTransform); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java index 3e5cf51d25fc..6024c109dd6b 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/expansion/ExpansionServiceTest.java @@ -99,6 +99,8 @@ public void testConstruct() { assertEquals(TEST_NAMESPACE + TEST_NAME, expandedTransform.getUniqueName()); // Verify it has the right input. assertEquals(inputPcollId, Iterables.getOnlyElement(expandedTransform.getInputsMap().values())); + // Verify it has the right output. + assertEquals("output", Iterables.getOnlyElement(expandedTransform.getOutputsMap().keySet())); // Loose check that it's composite, and its children are represented. assertNotEquals(expandedTransform.getSubtransformsCount(), 0); for (String subtransform : expandedTransform.getSubtransformsList()) {