Skip to content

Commit

Permalink
[FLINK-31230][yarn] Improve YarnClusterDescriptor memory unit display.
Browse files Browse the repository at this point in the history
  • Loading branch information
slfan1989 authored and 1996fanrui committed Mar 23, 2023
1 parent e91eb5e commit 753da61
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void testQueryCluster() throws Exception {
log.info("Starting testQueryCluster()");
runWithArgs(
new String[] {"-q"},
"Summary: totalMemory 8192 totalCores 1332",
"Summary: totalMemory 8.000gb (8589934592 bytes) totalCores 1332",
null,
RunTypes.YARN_SESSION,
0); // we have 666*2 cores.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ResourceManagerOptions;
import org.apache.flink.configuration.RestOptions;
Expand Down Expand Up @@ -97,6 +98,7 @@
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -844,7 +846,7 @@ private ApplicationReport startAppMaster(

final ApplicationId appId = appContext.getApplicationId();

// ------------------ Add Zookeeper namespace to local flinkConfiguration ------
// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
setHAClusterIdIfNotSet(configuration, appId);

if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
Expand Down Expand Up @@ -1229,7 +1231,6 @@ private ApplicationReport startAppMaster(

LOG.info("Waiting for the cluster to be allocated");
final long startTime = System.currentTimeMillis();
long lastLogTime = System.currentTimeMillis();
ApplicationReport report;
YarnApplicationState lastAppState = YarnApplicationState.NEW;
loop:
Expand Down Expand Up @@ -1265,11 +1266,9 @@ private ApplicationReport startAppMaster(
if (appState != lastAppState) {
LOG.info("Deploying cluster, current state " + appState);
}
if (System.currentTimeMillis() - lastLogTime > 60000) {
lastLogTime = System.currentTimeMillis();
if (System.currentTimeMillis() - startTime > 60000) {
LOG.info(
"Deployment took more than {} seconds. Please check if the requested resources are available in the YARN cluster",
(lastLogTime - startTime) / 1000);
"Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
}
}
lastAppState = appState;
Expand Down Expand Up @@ -1443,13 +1442,17 @@ public String getClusterDescription() {
totalMemory += res.getMemory();
totalCores += res.getVirtualCores();
ps.format(format, "NodeID", rep.getNodeId());
ps.format(format, "Memory", res.getMemory() + " MB");
ps.format(format, "Memory", getDisplayMemory(res.getMemory()));
ps.format(format, "vCores", res.getVirtualCores());
ps.format(format, "HealthReport", rep.getHealthReport());
ps.format(format, "Containers", rep.getNumContainers());
ps.println("+---------------------------------------+");
}
ps.println("Summary: totalMemory " + totalMemory + " totalCores " + totalCores);
ps.println(
"Summary: totalMemory "
+ getDisplayMemory(totalMemory)
+ " totalCores "
+ totalCores);
List<QueueInfo> qInfo = yarnClient.getAllQueues();
for (QueueInfo q : qInfo) {
ps.println(
Expand Down Expand Up @@ -1875,15 +1878,16 @@ private void setClusterEntrypointInfoToConfig(final ApplicationReport report) {
flinkConfiguration.setString(RestOptions.ADDRESS, host);
flinkConfiguration.setInteger(RestOptions.PORT, port);

flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, appId.toString());
flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(appId));

setHAClusterIdIfNotSet(flinkConfiguration, appId);
}

private void setHAClusterIdIfNotSet(Configuration configuration, ApplicationId appId) {
// set cluster-id to app id if not specified
if (!configuration.contains(HighAvailabilityOptions.HA_CLUSTER_ID)) {
configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, appId.toString());
configuration.set(
HighAvailabilityOptions.HA_CLUSTER_ID, ConverterUtils.toString(appId));
}
}

Expand Down Expand Up @@ -1938,4 +1942,8 @@ Map<String, String> generateApplicationMasterEnv(
Utils.setupYarnClassPath(this.yarnConfiguration, env);
return env;
}

private String getDisplayMemory(long memoryMB) {
return MemorySize.ofMebiBytes(memoryMB).toHumanReadableString();
}
}

0 comments on commit 753da61

Please sign in to comment.