From 1db2e3976fb2a5c15c77e55928e19c003db3b387 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Mon, 13 Jan 2020 16:54:23 +0100 Subject: [PATCH] [BEAM-6008] Propagate errors during pipeline execution in Java's PortableRunner The error reporting for Java test cases currently yields: ``` java.lang.AssertionError: Pipeline did not succeed. Expected: is but: was ``` Not particularly helpful. This improves the situation by retrieving the error messages from the job service. --- .../jobsubmission/InMemoryJobService.java | 7 +- .../jobsubmission/JobInvocation.java | 13 +++- .../portability/JobServicePipelineResult.java | 75 +++++++++++++------ 3 files changed, 66 insertions(+), 29 deletions(-) diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java index 2f4df482d0dc..2f457a5859da 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/InMemoryJobService.java @@ -344,9 +344,6 @@ public void getMessageStream( event -> { syncResponseObserver.onNext( JobMessagesResponse.newBuilder().setStateResponse(event).build()); - if (JobInvocation.isTerminated(event.getState())) { - responseObserver.onCompleted(); - } }; Consumer messageListener = message -> @@ -355,6 +352,10 @@ public void getMessageStream( invocation.addStateListener(stateListener); invocation.addMessageListener(messageListener); + + if (JobInvocation.isTerminated(invocation.getStateEvent().getState())) { + responseObserver.onCompleted(); + } } catch (StatusRuntimeException | StatusException e) { responseObserver.onError(e); } catch (Exception e) { diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java index 2da059255796..36234335c8c3 100644 --- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java +++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java @@ -53,12 +53,14 @@ public class JobInvocation { private final PortablePipelineRunner pipelineRunner; private final JobInfo jobInfo; private final ListeningExecutorService executorService; - private List> stateObservers; - private List> messageObservers; + private final List stateHistory; + private final List messageHistory; + private final List> stateObservers; + private final List> messageObservers; + private JobApi.MetricResults metrics; private PortablePipelineResult resultHandle; @Nullable private ListenableFuture invocationFuture; - private List stateHistory; public JobInvocation( JobInfo jobInfo, @@ -73,6 +75,7 @@ public JobInvocation( this.messageObservers = new ArrayList<>(); this.invocationFuture = null; this.stateHistory = new ArrayList<>(); + this.messageHistory = new ArrayList<>(); this.metrics = JobApi.MetricResults.newBuilder().build(); this.setState(JobState.Enum.STOPPED); } @@ -217,6 +220,9 @@ public synchronized void addStateListener(Consumer stateStreamObs /** Listen for job messages with a {@link Consumer}. */ public synchronized void addMessageListener(Consumer messageStreamObserver) { + for (JobMessage msg : messageHistory) { + messageStreamObserver.accept(msg); + } messageObservers.add(messageStreamObserver); } @@ -243,6 +249,7 @@ private synchronized void setState(JobState.Enum state) { } private synchronized void sendMessage(JobMessage message) { + messageHistory.add(message); for (Consumer observer : messageObservers) { observer.accept(message); } diff --git a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java index bcfa32120199..d4f2aa73d7ce 100644 --- a/runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java +++ b/runners/portability/java/src/main/java/org/apache/beam/runners/portability/JobServicePipelineResult.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.portability; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -26,6 +27,9 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest; import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse; import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest; +import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse; import org.apache.beam.model.jobmanagement.v1.JobApi.JobStateEvent; import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub; import org.apache.beam.sdk.PipelineResult; @@ -37,13 +41,13 @@ class JobServicePipelineResult implements PipelineResult, AutoCloseable { - private static final long POLL_INTERVAL_MS = 10 * 1000; + private static final long POLL_INTERVAL_MS = 3_000; private static final Logger LOG = LoggerFactory.getLogger(JobServicePipelineResult.class); private final ByteString jobId; private final CloseableResource jobService; - @Nullable private State terminationState; + @Nullable private State terminalState; @Nullable private final Runnable cleanup; private org.apache.beam.model.jobmanagement.v1.JobApi.MetricResults jobMetrics; @@ -51,14 +55,14 @@ class JobServicePipelineResult implements PipelineResult, AutoCloseable { ByteString jobId, CloseableResource jobService, Runnable cleanup) { this.jobId = jobId; this.jobService = jobService; - this.terminationState = null; + this.terminalState = null; this.cleanup = cleanup; } @Override public State getState() { - if (terminationState != null) { - return terminationState; + if (terminalState != null) { + return terminalState; } JobServiceBlockingStub stub = jobService.get(); JobStateEvent response = @@ -98,26 +102,16 @@ public State waitUntilFinish(Duration duration) { @Override public State waitUntilFinish() { - if (terminationState != null) { - return terminationState; + if (terminalState != null) { + return terminalState; } - JobServiceBlockingStub stub = jobService.get(); - GetJobStateRequest request = GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build(); - JobStateEvent response = stub.getState(request); - State lastState = getJavaState(response.getState()); - while (!lastState.isTerminal()) { - try { - Thread.sleep(POLL_INTERVAL_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } - response = stub.getState(request); - lastState = getJavaState(response.getState()); + try { + waitForTerminalState(); + propagateErrors(); + return terminalState; + } finally { + close(); } - close(); - terminationState = lastState; - return lastState; } @Override @@ -139,6 +133,41 @@ public void close() { } } + private void waitForTerminalState() { + JobServiceBlockingStub stub = jobService.get(); + GetJobStateRequest request = GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build(); + JobStateEvent response = stub.getState(request); + State lastState = getJavaState(response.getState()); + while (!lastState.isTerminal()) { + try { + Thread.sleep(POLL_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + response = stub.getState(request); + lastState = getJavaState(response.getState()); + } + terminalState = lastState; + } + + private void propagateErrors() { + if (terminalState != State.DONE) { + JobMessagesRequest messageStreamRequest = + JobMessagesRequest.newBuilder().setJobIdBytes(jobId).build(); + Iterator messageStreamIterator = + jobService.get().getMessageStream(messageStreamRequest); + while (messageStreamIterator.hasNext()) { + JobMessage messageResponse = messageStreamIterator.next().getMessageResponse(); + if (messageResponse.getImportance() == JobMessage.MessageImportance.JOB_MESSAGE_ERROR) { + throw new RuntimeException( + "The Runner experienced the following error during execution:\n" + + messageResponse.getMessageText()); + } + } + } + } + private static State getJavaState(JobApi.JobState.Enum protoState) { switch (protoState) { case UNSPECIFIED: