Skip to content

Commit

Permalink
flink on yarn job status bug fixed. (apache#394)
Browse files Browse the repository at this point in the history
* flink on yarn job status bug fixed apache#391
  • Loading branch information
wolfboys authored Oct 28, 2021
1 parent d9da5fc commit 3ef8219
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -390,9 +390,12 @@ public AppInfo httpYarnAppInfo() throws Exception {
String url = String.format(format, HadoopUtils.getRMWebAppURL(false), appId);
return httpGetDoResult(url, AppInfo.class);
} catch (IOException e) {
log.warn(e.getMessage());
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId);
return httpGetDoResult(url, AppInfo.class);
try {
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId);
return httpGetDoResult(url, AppInfo.class);
} catch (IOException e1) {
throw e1;
}
}
}
return null;
Expand All @@ -406,9 +409,12 @@ public JobsOverview httpJobsOverview() throws Exception {
String url = String.format(format, HadoopUtils.getRMWebAppURL(false), appId);
return httpGetDoResult(url, JobsOverview.class);
} catch (IOException e) {
log.warn(e.getMessage());
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId);
return httpGetDoResult(url, JobsOverview.class);
try {
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId);
return httpGetDoResult(url, JobsOverview.class);
} catch (Exception e1) {
throw e1;
}
}
}
return null;
Expand All @@ -421,9 +427,12 @@ public Overview httpOverview() throws IOException {
String url = String.format(format, HadoopUtils.getRMWebAppURL(false), appId);
return httpGetDoResult(url, Overview.class);
} catch (IOException e) {
log.warn(e.getMessage());
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId);
return httpGetDoResult(url, Overview.class);
try {
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId);
return httpGetDoResult(url, Overview.class);
} catch (Exception e1) {
throw e1;
}
}
}

Expand All @@ -434,9 +443,12 @@ public CheckPoints httpCheckpoints() throws IOException {
String url = String.format(format, HadoopUtils.getRMWebAppURL(false), appId, jobId);
return httpGetDoResult(url, CheckPoints.class);
} catch (IOException e) {
log.warn(e.getMessage());
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId, jobId);
return httpGetDoResult(url, CheckPoints.class);
try {
String url = String.format(format, HadoopUtils.getRMWebAppURL(true), appId, jobId);
return httpGetDoResult(url, CheckPoints.class);
} catch (Exception e1) {
throw e1;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ public enum FlinkAppState implements Serializable {
*/
POS_TERMINATED (21),

/**
* job SUCCEEDED on yarn
*/
SUCCEEDED(22),
/**
* yarn 中检查到被killed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -962,11 +962,8 @@ public void cancel(Application appParam) {
);

StopResponse stopResponse = FlinkSubmitHelper.stop(stopInfo);

assert stopResponse != null;

String savePointDir = stopResponse.savePointDir();
if (savePointDir != null) {
if (stopResponse != null && stopResponse.savePointDir() != null) {
String savePointDir = stopResponse.savePointDir();
log.info("savePoint path:{}", savePointDir);
log.info("savePoint path:{}", savePointDir);
SavePoint savePoint = new SavePoint();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ private void getFromYarnRestApi(Application application, StopFrom stopFrom) thro
cleanSavepoint(application);
application.setEndTime(new Date());
}
if(FlinkAppState.SUCCEEDED.equals(flinkAppState)) {
flinkAppState = FlinkAppState.FINISHED;
}
application.setState(flinkAppState.getValue());
//能运行到这一步,说明到YARN REST api中成功查询到信息
cleanOptioning(optionState, application.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,4 +211,4 @@ private Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve)
return c;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ object FlinkSubmitHelper extends Logger {
val method = submitClass.getDeclaredMethod("stop", requestClass)
method.setAccessible(true)
val obj = method.invoke(null, FlinkShimsProxy.getObject(classLoader, stopRequest))
require(obj != null)
FlinkShimsProxy.getObject(this.getClass.getClassLoader, obj).asInstanceOf[StopResponse]
if (obj == null) null; else {
FlinkShimsProxy.getObject(this.getClass.getClassLoader, obj).asInstanceOf[StopResponse]
}
})
}

Expand Down

0 comments on commit 3ef8219

Please sign in to comment.