From ce2ba6fe3bd2e0ebf8da352e1dbabd531afdec28 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Thu, 19 May 2016 09:17:37 -0700 Subject: [PATCH] Fix bug in PipelineOptions DisplayData serialization PipelineOptions has been improved to generate display data to be consumed by a runner and used for display. However, there was a bug in the ProxyInvocationHandler implementation of PipelineOptions display data which was causing NullPointerExceptions when generated display data from PipelineOptions previously deserialized from JSON. This change also makes our error handling for display data exceptions consistent across the Dataflow runner: exceptions thrown during display data population will propogate out and cause the pipeline to fail. This is consistent with other user code which may throw exceptions at pipeline construction time. (cherry picked from commit 1e669c44c9d2448b55f5bdba3dcff1831b2cd8b4) --- .../sdk/options/ProxyInvocationHandler.java | 4 +- .../runners/DataflowPipelineTranslator.java | 49 +--------- .../sdk/transforms/display/DisplayData.java | 14 ++- .../options/ProxyInvocationHandlerTest.java | 6 +- .../DataflowPipelineTranslatorTest.java | 64 ------------- .../transforms/display/DisplayDataTest.java | 93 +++++++++++++------ 6 files changed, 84 insertions(+), 146 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java index 170db0b1f7..f353ab3af0 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandler.java @@ -351,6 +351,7 @@ private void populateDisplayData(DisplayData.Builder builder) { } Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod()); + value = value == null ? "" : value; DisplayData.Type type = DisplayData.inferType(value); if (type != null) { builder.add(DisplayData.item(jsonOption.getKey(), type, value) @@ -588,7 +589,8 @@ public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvi jgen.writeObject(serializableOptions); List> serializedDisplayData = Lists.newArrayList(); - for (DisplayData.Item item : DisplayData.from(value).items()) { + DisplayData displayData = DisplayData.from(value); + for (DisplayData.Item item : displayData.items()) { @SuppressWarnings("unchecked") Map serializedItem = MAPPER.convertValue(item, Map.class); serializedDisplayData.add(serializedItem); diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java index 28392a48da..aa9f182f19 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslator.java @@ -86,8 +86,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -728,18 +726,7 @@ private void addOutput(String name, PValue value, Coder valueCoder) { } private void addDisplayData(String stepName, HasDisplayData hasDisplayData) { - DisplayData displayData; - try { - displayData = DisplayData.from(hasDisplayData); - } catch (Exception e) { - String msg = String.format("Exception thrown while collecting display data for step: %s. " - + "Display data will be not be available for this step.", stepName); - DisplayDataException displayDataException = new DisplayDataException(msg, e); - LOG.warn(msg, displayDataException); - - displayData = displayDataException.asDisplayData(); - } - + DisplayData displayData = DisplayData.from(hasDisplayData); List> list = MAPPER.convertValue(displayData, List.class); addList(getProperties(), PropertyNames.DISPLAY_DATA, list); } @@ -1097,38 +1084,4 @@ private static void translateOutputs( context.addOutput(tag.getId(), output); } } - - /** - * Wraps exceptions thrown while collecting {@link DisplayData} for the Dataflow pipeline runner. - */ - static class DisplayDataException extends Exception implements HasDisplayData { - public DisplayDataException(String message, Throwable cause) { - super(checkNotNull(message), checkNotNull(cause)); - } - - /** - * Retrieve a display data representation of the exception, which can be submitted to - * the service in place of the actual display data. - */ - public DisplayData asDisplayData() { - return DisplayData.from(this); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - Throwable cause = getCause(); - builder - .add(DisplayData.item("exceptionMessage", getMessage())) - .add(DisplayData.item("exceptionType", cause.getClass())) - .add(DisplayData.item("exceptionCause", cause.getMessage())) - .add(DisplayData.item("stackTrace", stackTraceToString())); - } - - private String stackTraceToString() { - StringWriter stringWriter = new StringWriter(); - PrintWriter printWriter = new PrintWriter(stringWriter); - printStackTrace(printWriter); - return stringWriter.toString(); - } - } } diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java index 05485e4f7a..07a0ae84fe 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java @@ -69,10 +69,6 @@ public static DisplayData none() { * Collect the {@link DisplayData} from a component. This will traverse all subcomponents * specified via {@link Builder#include} in the given component. Data in this component will be in * a namespace derived from the component. - * - *

Pipeline runners should call this method in order to collect display data. While it should - * be safe to call {@code DisplayData.from} on any component which implements it, runners should - * be resilient to exceptions thrown while collecting display data. */ public static DisplayData from(HasDisplayData component) { checkNotNull(component, "component argument cannot be null"); @@ -655,7 +651,15 @@ public Builder include(HasDisplayData subComponent, String namespace) { if (newComponent) { String prevNs = this.latestNs; this.latestNs = namespace; - subComponent.populateDisplayData(this); + + try { + subComponent.populateDisplayData(this); + } catch (Throwable e) { + String msg = String.format("Error while populating display data for component: %s", + namespace); + throw new RuntimeException(msg, e); + } + this.latestNs = prevNs; } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandlerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandlerTest.java index 017bcf37e5..a77b329616 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandlerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/options/ProxyInvocationHandlerTest.java @@ -862,12 +862,16 @@ public void testDisplayDataIncludesExplicitlySetDefaults() { } @Test - public void testDisplayDataNullValuesConvertedToEmptyString() { + public void testDisplayDataNullValuesConvertedToEmptyString() throws Exception { FooOptions options = PipelineOptionsFactory.as(FooOptions.class); options.setFoo(null); DisplayData data = DisplayData.from(options); assertThat(data, hasDisplayItem("foo", "")); + + FooOptions deserializedOptions = serializeDeserialize(FooOptions.class, options); + DisplayData deserializedData = DisplayData.from(deserializedOptions); + assertThat(deserializedData, hasDisplayItem("foo", "")); } @Test diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index d28bcdf815..9b2218340c 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -22,9 +22,7 @@ import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; -import static org.hamcrest.Matchers.is; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -52,7 +50,6 @@ import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext; -import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; import com.google.cloud.dataflow.sdk.transforms.Count; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.DoFn; @@ -77,7 +74,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; @@ -103,9 +99,7 @@ */ @RunWith(JUnit4.class) public class DataflowPipelineTranslatorTest implements Serializable { - @Rule public transient ExpectedException thrown = ExpectedException.none(); - @Rule public transient ExpectedLogs logs = ExpectedLogs.none(DataflowPipelineTranslator.class); // A Custom Mockito matcher for an initial Job that checks that all // expected fields are set. @@ -914,62 +908,4 @@ public void populateDisplayData(DisplayData.Builder builder) { assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData)); assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData)); } - - @Test - public void testCapturesDisplayDataExceptions() throws IOException { - DataflowPipelineOptions options = buildPipelineOptions(); - options.setRunner(DataflowPipelineRunner.class); - DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); - Pipeline pipeline = Pipeline.create(options); - - final RuntimeException displayDataException = new RuntimeException("foobar"); - pipeline - .apply(Create.of(1, 2, 3)) - .apply(ParDo.of(new DoFn() { - @Override - public void processElement(ProcessContext c) throws Exception { - c.output(c.element()); - } - - @Override - public void populateDisplayData(DisplayData.Builder builder) { - throw displayDataException; - } - })); - - Job job = translator.translate( - pipeline, - (DataflowPipelineRunner) pipeline.getRunner(), - Collections.emptyList()).getJob(); - - String expectedMessage = "Display data will be not be available for this step"; - logs.verifyWarn(expectedMessage); - - List steps = job.getSteps(); - assertEquals("Job should have 2 steps", 2, steps.size()); - - @SuppressWarnings("unchecked") - Iterable> displayData = (Collection>) steps.get(1) - .getProperties().get("display_data"); - - String namespace = DataflowPipelineTranslator.DisplayDataException.class.getName(); - Assert.assertThat(displayData, Matchers.>hasItem(allOf( - hasEntry("namespace", namespace), - hasEntry("key", "exceptionType"), - hasEntry("value", RuntimeException.class.getName())))); - - Assert.assertThat(displayData, Matchers.>hasItem(allOf( - hasEntry("namespace", namespace), - hasEntry("key", "exceptionMessage"), - hasEntry(is("value"), Matchers.containsString(expectedMessage))))); - - Assert.assertThat(displayData, Matchers.>hasItem(allOf( - hasEntry("namespace", namespace), - hasEntry("key", "exceptionCause"), - hasEntry("value", "foobar")))); - - Assert.assertThat(displayData, Matchers.>hasItem(allOf( - hasEntry("namespace", namespace), - hasEntry("key", "stackTrace")))); - } } diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java index 4adbb33fb6..1114c91a3a 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java @@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isA; import static org.hamcrest.Matchers.isEmptyOrNullString; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; @@ -38,11 +39,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; @@ -53,9 +49,9 @@ import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Multimap; import com.google.common.testing.EqualsTester; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; - import org.hamcrest.CustomTypeSafeMatcher; import org.hamcrest.FeatureMatcher; import org.hamcrest.Matcher; @@ -66,7 +62,6 @@ import org.joda.time.format.ISODateTimeFormat; import org.junit.Rule; import org.junit.Test; -import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -83,6 +78,7 @@ @RunWith(JUnit4.class) public class DisplayDataTest implements Serializable { @Rule public transient ExpectedException thrown = ExpectedException.none(); + private static final DateTimeFormatter ISO_FORMATTER = ISODateTimeFormat.dateTime(); private static final ObjectMapper MAPPER = new ObjectMapper(); @@ -410,7 +406,7 @@ public void populateDisplayData(Builder builder) { @Test public void testNullNamespaceOverride() { - thrown.expect(NullPointerException.class); + thrown.expectCause(isA(NullPointerException.class)); DisplayData.from(new HasDisplayData() { @Override @@ -513,7 +509,7 @@ public void populateDisplayData(DisplayData.Builder builder) { @Test public void testDuplicateKeyThrowsException() { - thrown.expect(IllegalArgumentException.class); + thrown.expectCause(isA(IllegalArgumentException.class)); DisplayData.from( new HasDisplayData() { @Override @@ -749,7 +745,7 @@ public void populateDisplayData(Builder builder) { } }; - thrown.expect(ClassCastException.class); + thrown.expectCause(isA(ClassCastException.class)); DisplayData.from(component); } @@ -835,7 +831,7 @@ public void testFromNull() { @Test public void testIncludeNull() { - thrown.expect(NullPointerException.class); + thrown.expectCause(isA(NullPointerException.class)); DisplayData.from( new HasDisplayData() { @Override @@ -853,7 +849,7 @@ public void populateDisplayData(Builder builder) { } }; - thrown.expect(NullPointerException.class); + thrown.expectCause(isA(NullPointerException.class)); DisplayData.from(new HasDisplayData() { @Override public void populateDisplayData(Builder builder) { @@ -864,7 +860,7 @@ public void populateDisplayData(Builder builder) { @Test public void testNullKey() { - thrown.expect(NullPointerException.class); + thrown.expectCause(isA(NullPointerException.class)); DisplayData.from( new HasDisplayData() { @Override @@ -965,23 +961,66 @@ public void populateDisplayData(Builder builder) { } /** - * Validate that all runners are resilient to exceptions thrown while retrieving display data. + * Verify that {@link DisplayData.Builder} can recover from exceptions thrown in user code. + * This is not used within the Beam SDK since we want all code to produce valid DisplayData. + * This test just ensures it is possible to write custom code that does recover. */ @Test - @Category(RunnableOnService.class) - public void testRunnersResilientToDisplayDataExceptions() { - Pipeline p = TestPipeline.create(); - PCollection pCol = p - .apply(Create.of(1, 2, 3)) - .apply(new IdentityTransform() { - @Override - public void populateDisplayData(Builder builder) { - throw new RuntimeException("bug!"); - } - }); + public void testCanRecoverFromBuildException() { + final HasDisplayData safeComponent = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder.add(DisplayData.item("a", "a")); + } + }; + + final HasDisplayData failingComponent = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + throw new RuntimeException("oh noes!"); + } + }; - DataflowAssert.that(pCol).containsInAnyOrder(1, 2, 3); - p.run(); + DisplayData displayData = DisplayData.from(new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder + .add(DisplayData.item("b", "b")) + .add(DisplayData.item("c", "c")); + + try { + builder.include(failingComponent); + fail("Expected exception not thrown"); + } catch (RuntimeException e) { + // Expected + } + + builder + .include(safeComponent) + .add(DisplayData.item("d", "d")); + } + }); + + assertThat(displayData, hasDisplayItem("a")); + assertThat(displayData, hasDisplayItem("b")); + assertThat(displayData, hasDisplayItem("c")); + assertThat(displayData, hasDisplayItem("d")); + } + + @Test + public void testExceptionMessage() { + final RuntimeException cause = new RuntimeException("oh noes!"); + HasDisplayData component = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + throw cause; + } + }; + + thrown.expectMessage(component.getClass().getName()); + thrown.expectCause(is(cause)); + + DisplayData.from(component); } private static class IdentityTransform extends PTransform, PCollection> {