Skip to content

Commit

Permalink
Fix bug in PipelineOptions DisplayData serialization
Browse files Browse the repository at this point in the history
PipelineOptions has been improved to generate display data
to be consumed by a runner and used for display. However, there
was a bug in the ProxyInvocationHandler implementation of
PipelineOptions display data which was causing NullPointerExceptions
when generated display data from PipelineOptions previously
deserialized from JSON.

This change also makes our error handling for display data exceptions
consistent across the Dataflow runner: exceptions thrown during
display data population will propogate out and cause the pipeline to
fail. This is consistent with other user code which may throw
exceptions at pipeline construction time.

(cherry picked from commit 1e669c4)
  • Loading branch information
swegner committed May 21, 2016
1 parent 47cfab1 commit ce2ba6f
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 146 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ private void populateDisplayData(DisplayData.Builder builder) {
}

Object value = getValueFromJson(jsonOption.getKey(), spec.getGetterMethod());
value = value == null ? "" : value;
DisplayData.Type type = DisplayData.inferType(value);
if (type != null) {
builder.add(DisplayData.item(jsonOption.getKey(), type, value)
Expand Down Expand Up @@ -588,7 +589,8 @@ public void serialize(PipelineOptions value, JsonGenerator jgen, SerializerProvi
jgen.writeObject(serializableOptions);

List<Map<String, Object>> serializedDisplayData = Lists.newArrayList();
for (DisplayData.Item<?> item : DisplayData.from(value).items()) {
DisplayData displayData = DisplayData.from(value);
for (DisplayData.Item<?> item : displayData.items()) {
@SuppressWarnings("unchecked")
Map<String, Object> serializedItem = MAPPER.convertValue(item, Map.class);
serializedDisplayData.add(serializedItem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -728,18 +726,7 @@ private void addOutput(String name, PValue value, Coder<?> valueCoder) {
}

private void addDisplayData(String stepName, HasDisplayData hasDisplayData) {
DisplayData displayData;
try {
displayData = DisplayData.from(hasDisplayData);
} catch (Exception e) {
String msg = String.format("Exception thrown while collecting display data for step: %s. "
+ "Display data will be not be available for this step.", stepName);
DisplayDataException displayDataException = new DisplayDataException(msg, e);
LOG.warn(msg, displayDataException);

displayData = displayDataException.asDisplayData();
}

DisplayData displayData = DisplayData.from(hasDisplayData);
List<Map<String, Object>> list = MAPPER.convertValue(displayData, List.class);
addList(getProperties(), PropertyNames.DISPLAY_DATA, list);
}
Expand Down Expand Up @@ -1097,38 +1084,4 @@ private static void translateOutputs(
context.addOutput(tag.getId(), output);
}
}

/**
* Wraps exceptions thrown while collecting {@link DisplayData} for the Dataflow pipeline runner.
*/
static class DisplayDataException extends Exception implements HasDisplayData {
public DisplayDataException(String message, Throwable cause) {
super(checkNotNull(message), checkNotNull(cause));
}

/**
* Retrieve a display data representation of the exception, which can be submitted to
* the service in place of the actual display data.
*/
public DisplayData asDisplayData() {
return DisplayData.from(this);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
Throwable cause = getCause();
builder
.add(DisplayData.item("exceptionMessage", getMessage()))
.add(DisplayData.item("exceptionType", cause.getClass()))
.add(DisplayData.item("exceptionCause", cause.getMessage()))
.add(DisplayData.item("stackTrace", stackTraceToString()));
}

private String stackTraceToString() {
StringWriter stringWriter = new StringWriter();
PrintWriter printWriter = new PrintWriter(stringWriter);
printStackTrace(printWriter);
return stringWriter.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,6 @@ public static DisplayData none() {
* Collect the {@link DisplayData} from a component. This will traverse all subcomponents
* specified via {@link Builder#include} in the given component. Data in this component will be in
* a namespace derived from the component.
*
* <p>Pipeline runners should call this method in order to collect display data. While it should
* be safe to call {@code DisplayData.from} on any component which implements it, runners should
* be resilient to exceptions thrown while collecting display data.
*/
public static DisplayData from(HasDisplayData component) {
checkNotNull(component, "component argument cannot be null");
Expand Down Expand Up @@ -655,7 +651,15 @@ public Builder include(HasDisplayData subComponent, String namespace) {
if (newComponent) {
String prevNs = this.latestNs;
this.latestNs = namespace;
subComponent.populateDisplayData(this);

try {
subComponent.populateDisplayData(this);
} catch (Throwable e) {
String msg = String.format("Error while populating display data for component: %s",
namespace);
throw new RuntimeException(msg, e);
}

this.latestNs = prevNs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,12 +862,16 @@ public void testDisplayDataIncludesExplicitlySetDefaults() {
}

@Test
public void testDisplayDataNullValuesConvertedToEmptyString() {
public void testDisplayDataNullValuesConvertedToEmptyString() throws Exception {
FooOptions options = PipelineOptionsFactory.as(FooOptions.class);
options.setFoo(null);

DisplayData data = DisplayData.from(options);
assertThat(data, hasDisplayItem("foo", ""));

FooOptions deserializedOptions = serializeDeserialize(FooOptions.class, options);
DisplayData deserializedData = DisplayData.from(deserializedOptions);
assertThat(deserializedData, hasDisplayItem("foo", ""));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@

import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
Expand Down Expand Up @@ -52,7 +50,6 @@
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator.TranslationContext;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
Expand All @@ -77,7 +74,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;

import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -103,9 +99,7 @@
*/
@RunWith(JUnit4.class)
public class DataflowPipelineTranslatorTest implements Serializable {

@Rule public transient ExpectedException thrown = ExpectedException.none();
@Rule public transient ExpectedLogs logs = ExpectedLogs.none(DataflowPipelineTranslator.class);

// A Custom Mockito matcher for an initial Job that checks that all
// expected fields are set.
Expand Down Expand Up @@ -914,62 +908,4 @@ public void populateDisplayData(DisplayData.Builder builder) {
assertEquals(expectedFn1DisplayData, ImmutableSet.copyOf(fn1displayData));
assertEquals(expectedFn2DisplayData, ImmutableSet.copyOf(fn2displayData));
}

@Test
public void testCapturesDisplayDataExceptions() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();
options.setRunner(DataflowPipelineRunner.class);
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);
Pipeline pipeline = Pipeline.create(options);

final RuntimeException displayDataException = new RuntimeException("foobar");
pipeline
.apply(Create.of(1, 2, 3))
.apply(ParDo.of(new DoFn<Integer, Integer>() {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(c.element());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
throw displayDataException;
}
}));

Job job = translator.translate(
pipeline,
(DataflowPipelineRunner) pipeline.getRunner(),
Collections.<DataflowPackage>emptyList()).getJob();

String expectedMessage = "Display data will be not be available for this step";
logs.verifyWarn(expectedMessage);

List<Step> steps = job.getSteps();
assertEquals("Job should have 2 steps", 2, steps.size());

@SuppressWarnings("unchecked")
Iterable<Map<String, String>> displayData = (Collection<Map<String, String>>) steps.get(1)
.getProperties().get("display_data");

String namespace = DataflowPipelineTranslator.DisplayDataException.class.getName();
Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
hasEntry("namespace", namespace),
hasEntry("key", "exceptionType"),
hasEntry("value", RuntimeException.class.getName()))));

Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
hasEntry("namespace", namespace),
hasEntry("key", "exceptionMessage"),
hasEntry(is("value"), Matchers.containsString(expectedMessage)))));

Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
hasEntry("namespace", namespace),
hasEntry("key", "exceptionCause"),
hasEntry("value", "foobar"))));

Assert.assertThat(displayData, Matchers.<Map<String, String>>hasItem(allOf(
hasEntry("namespace", namespace),
hasEntry("key", "stackTrace"))));
}
}
Loading

0 comments on commit ce2ba6f

Please sign in to comment.