Skip to content

Commit

Permalink
[BEAM-6008] Propagate errors during pipeline execution in Java's Port…
Browse files Browse the repository at this point in the history
…ableRunner

The error reporting for Java test cases currently yields:

```
java.lang.AssertionError: Pipeline did not succeed.
Expected: is <DONE>
     but: was <FAILED>
```

Not particularly helpful. This improves the situation by retrieving the error
messages from the job service.
  • Loading branch information
mxm committed Jan 13, 2020
1 parent 182b27e commit 1db2e39
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,9 +344,6 @@ public void getMessageStream(
event -> {
syncResponseObserver.onNext(
JobMessagesResponse.newBuilder().setStateResponse(event).build());
if (JobInvocation.isTerminated(event.getState())) {
responseObserver.onCompleted();
}
};
Consumer<JobMessage> messageListener =
message ->
Expand All @@ -355,6 +352,10 @@ public void getMessageStream(

invocation.addStateListener(stateListener);
invocation.addMessageListener(messageListener);

if (JobInvocation.isTerminated(invocation.getStateEvent().getState())) {
responseObserver.onCompleted();
}
} catch (StatusRuntimeException | StatusException e) {
responseObserver.onError(e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ public class JobInvocation {
private final PortablePipelineRunner pipelineRunner;
private final JobInfo jobInfo;
private final ListeningExecutorService executorService;
private List<Consumer<JobStateEvent>> stateObservers;
private List<Consumer<JobMessage>> messageObservers;
private final List<JobStateEvent> stateHistory;
private final List<JobMessage> messageHistory;
private final List<Consumer<JobStateEvent>> stateObservers;
private final List<Consumer<JobMessage>> messageObservers;

private JobApi.MetricResults metrics;
private PortablePipelineResult resultHandle;
@Nullable private ListenableFuture<PortablePipelineResult> invocationFuture;
private List<JobStateEvent> stateHistory;

public JobInvocation(
JobInfo jobInfo,
Expand All @@ -73,6 +75,7 @@ public JobInvocation(
this.messageObservers = new ArrayList<>();
this.invocationFuture = null;
this.stateHistory = new ArrayList<>();
this.messageHistory = new ArrayList<>();
this.metrics = JobApi.MetricResults.newBuilder().build();
this.setState(JobState.Enum.STOPPED);
}
Expand Down Expand Up @@ -217,6 +220,9 @@ public synchronized void addStateListener(Consumer<JobStateEvent> stateStreamObs

/** Listen for job messages with a {@link Consumer}. */
public synchronized void addMessageListener(Consumer<JobMessage> messageStreamObserver) {
for (JobMessage msg : messageHistory) {
messageStreamObserver.accept(msg);
}
messageObservers.add(messageStreamObserver);
}

Expand All @@ -243,6 +249,7 @@ private synchronized void setState(JobState.Enum state) {
}

private synchronized void sendMessage(JobMessage message) {
messageHistory.add(message);
for (Consumer<JobMessage> observer : messageObservers) {
observer.accept(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.portability;

import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -26,6 +27,9 @@
import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.CancelJobResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.GetJobStateRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesRequest;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessagesResponse;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobStateEvent;
import org.apache.beam.model.jobmanagement.v1.JobServiceGrpc.JobServiceBlockingStub;
import org.apache.beam.sdk.PipelineResult;
Expand All @@ -37,28 +41,28 @@

class JobServicePipelineResult implements PipelineResult, AutoCloseable {

private static final long POLL_INTERVAL_MS = 10 * 1000;
private static final long POLL_INTERVAL_MS = 3_000;

private static final Logger LOG = LoggerFactory.getLogger(JobServicePipelineResult.class);

private final ByteString jobId;
private final CloseableResource<JobServiceBlockingStub> jobService;
@Nullable private State terminationState;
@Nullable private State terminalState;
@Nullable private final Runnable cleanup;
private org.apache.beam.model.jobmanagement.v1.JobApi.MetricResults jobMetrics;

JobServicePipelineResult(
ByteString jobId, CloseableResource<JobServiceBlockingStub> jobService, Runnable cleanup) {
this.jobId = jobId;
this.jobService = jobService;
this.terminationState = null;
this.terminalState = null;
this.cleanup = cleanup;
}

@Override
public State getState() {
if (terminationState != null) {
return terminationState;
if (terminalState != null) {
return terminalState;
}
JobServiceBlockingStub stub = jobService.get();
JobStateEvent response =
Expand Down Expand Up @@ -98,26 +102,16 @@ public State waitUntilFinish(Duration duration) {

@Override
public State waitUntilFinish() {
if (terminationState != null) {
return terminationState;
if (terminalState != null) {
return terminalState;
}
JobServiceBlockingStub stub = jobService.get();
GetJobStateRequest request = GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build();
JobStateEvent response = stub.getState(request);
State lastState = getJavaState(response.getState());
while (!lastState.isTerminal()) {
try {
Thread.sleep(POLL_INTERVAL_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
response = stub.getState(request);
lastState = getJavaState(response.getState());
try {
waitForTerminalState();
propagateErrors();
return terminalState;
} finally {
close();
}
close();
terminationState = lastState;
return lastState;
}

@Override
Expand All @@ -139,6 +133,41 @@ public void close() {
}
}

private void waitForTerminalState() {
JobServiceBlockingStub stub = jobService.get();
GetJobStateRequest request = GetJobStateRequest.newBuilder().setJobIdBytes(jobId).build();
JobStateEvent response = stub.getState(request);
State lastState = getJavaState(response.getState());
while (!lastState.isTerminal()) {
try {
Thread.sleep(POLL_INTERVAL_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
response = stub.getState(request);
lastState = getJavaState(response.getState());
}
terminalState = lastState;
}

private void propagateErrors() {
if (terminalState != State.DONE) {
JobMessagesRequest messageStreamRequest =
JobMessagesRequest.newBuilder().setJobIdBytes(jobId).build();
Iterator<JobMessagesResponse> messageStreamIterator =
jobService.get().getMessageStream(messageStreamRequest);
while (messageStreamIterator.hasNext()) {
JobMessage messageResponse = messageStreamIterator.next().getMessageResponse();
if (messageResponse.getImportance() == JobMessage.MessageImportance.JOB_MESSAGE_ERROR) {
throw new RuntimeException(
"The Runner experienced the following error during execution:\n"
+ messageResponse.getMessageText());
}
}
}
}

private static State getJavaState(JobApi.JobState.Enum protoState) {
switch (protoState) {
case UNSPECIFIED:
Expand Down

0 comments on commit 1db2e39

Please sign in to comment.