diff --git a/docs/_includes/generated/yarn_config_configuration.html b/docs/_includes/generated/yarn_config_configuration.html
index 40dfc09d3612b..6615c325522f6 100644
--- a/docs/_includes/generated/yarn_config_configuration.html
+++ b/docs/_includes/generated/yarn_config_configuration.html
@@ -32,6 +32,11 @@
-1 |
The port where the application master RPC system is listening. |
+
+ yarn.appmaster.vcores |
+ 1 |
+ The number of virtual cores (vcores) used by YARN application master. |
+
yarn.containers.vcores |
-1 |
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 0f244961dc217..3135ecf070802 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -242,6 +242,14 @@ private void isReadyForDeployment(ClusterSpecification clusterSpecification) thr
throw new YarnDeploymentException("Couldn't get cluster description, please check on the YarnConfiguration", e);
}
+ int configuredAmVcores = flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES);
+ if (configuredAmVcores > numYarnMaxVcores) {
+ throw new IllegalConfigurationException(
+ String.format("The number of requested virtual cores for application master %d" +
+ " exceeds the maximum number of virtual cores %d available in the Yarn Cluster.",
+ configuredAmVcores, numYarnMaxVcores));
+ }
+
int configuredVcores = flinkConfiguration.getInteger(YarnConfigOptions.VCORES, clusterSpecification.getSlotsPerTaskManager());
// don't configure more than the maximum configured number of vcores
if (configuredVcores > numYarnMaxVcores) {
@@ -971,7 +979,7 @@ public ApplicationReport startAppMaster(
// Set up resource type requirements for ApplicationMaster
Resource capability = Records.newRecord(Resource.class);
capability.setMemory(clusterSpecification.getMasterMemoryMB());
- capability.setVirtualCores(1);
+ capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));
final String customApplicationName = customName != null ? customName : applicationName;
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
index 0f46a572256b3..c0b6cfebf4bf3 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
@@ -49,6 +49,14 @@ public class YarnConfigOptions {
.defaultValue(-1)
.withDescription("The port where the application master RPC system is listening.");
+ /**
+ * The vcores used by YARN application master.
+ */
+ public static final ConfigOption APP_MASTER_VCORES =
+ key("yarn.appmaster.vcores")
+ .defaultValue(1)
+ .withDescription("The number of virtual cores (vcores) used by YARN application master.");
+
/**
* Defines whether user-jars are included in the system class path for per-job-clusters as well as their positioning
* in the path. They can be positioned at the beginning ("FIRST"), at the end ("LAST"), or be positioned based on