Skip to content

Commit

Permalink
[FLINK-32370][streaming] Fix warn log in result fetcher when job is f…
Browse files Browse the repository at this point in the history
…inished

Close apache#22819
  • Loading branch information
FangYongs authored and libenchao committed Jun 20, 2023
1 parent b462d0e commit 8119411
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.JobStatusInfo;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
Expand Down Expand Up @@ -122,6 +123,7 @@
import org.junit.jupiter.api.io.TempDir;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -1136,6 +1138,32 @@ void testSendCoordinationRequest() throws Exception {
}
}

@Test
void testSendCoordinationRequestException() throws Exception {
final TestClientCoordinationHandler handler =
new TestClientCoordinationHandler(new FlinkJobNotFoundException(jobId));
try (TestRestServerEndpoint restServerEndpoint = createRestServerEndpoint(handler)) {
try (RestClusterClient<?> restClusterClient =
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
String payload = "testing payload";
TestCoordinationRequest<String> request = new TestCoordinationRequest<>(payload);

assertThatThrownBy(
() ->
restClusterClient
.sendCoordinationRequest(
jobId, new OperatorID(), request)
.get())
.matches(
e ->
ExceptionUtils.findThrowableWithMessage(
e,
FlinkJobNotFoundException.class.getName())
.isPresent());
}
}
}

/**
* The SUSPENDED job status should never be returned by the client thus client retries until it
* either receives a different job status or the cluster is not reachable.
Expand Down Expand Up @@ -1166,9 +1194,15 @@ private class TestClientCoordinationHandler
ClientCoordinationRequestBody,
ClientCoordinationResponseBody,
ClientCoordinationMessageParameters> {
@Nullable private final FlinkJobNotFoundException exception;

private TestClientCoordinationHandler() {
this(null);
}

private TestClientCoordinationHandler(@Nullable FlinkJobNotFoundException exception) {
super(ClientCoordinationHeaders.getInstance());
this.exception = exception;
}

@Override
Expand All @@ -1178,6 +1212,9 @@ protected CompletableFuture<ClientCoordinationResponseBody> handleRequest(
@Nonnull DispatcherGateway gateway)
throws RestHandlerException {
try {
if (exception != null) {
throw exception;
}
TestCoordinationRequest req =
(TestCoordinationRequest)
request.getRequestBody()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,13 @@ public T next() throws IOException {
try {
response = sendRequest(buffer.getVersion(), requestOffset);
} catch (Exception e) {
if (ExceptionUtils.findThrowable(
e, UnavailableDispatcherOperationException.class)
if (ExceptionUtils.findThrowableWithMessage(
e, UnavailableDispatcherOperationException.class.getName())
.isPresent()) {
LOG.debug(
"The job execution has not started yet; cannot fetch results.", e);
} else if (ExceptionUtils.findThrowable(e, FlinkJobNotFoundException.class)
} else if (ExceptionUtils.findThrowableWithMessage(
e, FlinkJobNotFoundException.class.getName())
.isPresent()) {
LOG.debug(
"The job cannot be found. It is very likely that the job is not in a RUNNING state.",
Expand Down

0 comments on commit 8119411

Please sign in to comment.