Skip to content

Commit

Permalink
[FLINK-18214][Runtime] Remove Job Cache size check against JVM Heap size
Browse files Browse the repository at this point in the history
Checking that the job cache size is less than JVM heap for JM, may be inconclusive and confusing for users. The job cache size option does not strictly limit the real size and stays an advanced emergency mean. The job size calculation is approximate and the real cache size can be larger than its configured limit (`jobstore.cache-size`).

Therefore, this PR removes the related code from `JobManagerFlinkMemoryUtils`, tests and memory tuning guide.

This closes apache#12590.
  • Loading branch information
azagrebin committed Jun 11, 2020
1 parent 4c05dac commit 1adfd58
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 47 deletions.
5 changes: 1 addition & 4 deletions docs/ops/memory/mem_setup_master.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,12 @@ As mentioned before in the [total memory description](mem_setup.html#configure-t
for the Master is to specify explicitly the *JVM Heap* size ([`jobmanager.memory.heap.size`](../config.html#jobmanager-memory-heap-size)).
It gives more control over the available *JVM Heap* which is used by:

* Flink framework (e.g. *Job cache*)
* Flink framework
* User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks

The required size of *JVM Heap* is mostly driven by the number of running jobs, their structure, and requirements for
the mentioned user code.

The *Job cache* resides in the *JVM Heap*. It can be configured by
[`jobstore.cache-size`](../config.html#jobstore-cache-size) which must be less than the configured or derived *JVM Heap* size.

<span class="label label-info">Note</span> If you have configured the *JVM Heap* explicitly, it is recommended to set
neither *total process memory* nor *total Flink memory*. Otherwise, it may easily lead to memory configuration conflicts.
The Flink scripts and CLI set the *JVM Heap* size via the JVM parameters *-Xms* and *-Xmx* when they start the Master process, see also [JVM parameters](mem_setup.html#jvm-parameters).
Expand Down
5 changes: 1 addition & 4 deletions docs/ops/memory/mem_setup_master.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,12 @@ As mentioned before in the [total memory description](mem_setup.html#configure-t
for the Master is to specify explicitly the *JVM Heap* size ([`jobmanager.memory.heap.size`](../config.html#jobmanager-memory-heap-size)).
It gives more control over the available *JVM Heap* which is used by:

* Flink framework (e.g. *Job cache*)
* Flink framework
* User code executed during job submission (e.g. for certain batch sources) or in checkpoint completion callbacks

The required size of *JVM Heap* is mostly driven by the number of running jobs, their structure, and requirements for
the mentioned user code.

The *Job cache* resides in the *JVM Heap*. It can be configured by
[`jobstore.cache-size`](../config.html#jobstore-cache-size) which must be less than the configured or derived *JVM Heap* size.

<span class="label label-info">Note</span> If you have configured the *JVM Heap* explicitly, it is recommended to set
neither *total process memory* nor *total Flink memory*. Otherwise, it may easily lead to memory configuration conflicts.
The Flink scripts and CLI set the *JVM Heap* size via the JVM parameters *-Xms* and *-Xmx* when they start the Master process, see also [JVM parameters](mem_setup.html#jvm-parameters).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public JobManagerFlinkMemory deriveFromRequiredFineGrainedOptions(Configuration
}
}

return createJobManagerFlinkMemory(config, jvmHeapMemorySize, offHeapMemorySize);
return createJobManagerFlinkMemory(jvmHeapMemorySize, offHeapMemorySize);
}

private static void sanityCheckTotalFlinkMemory(
Expand Down Expand Up @@ -107,15 +107,13 @@ public JobManagerFlinkMemory deriveFromTotalFlinkMemory(Configuration config, Me
offHeapMemorySize.toHumanReadableString());
}
MemorySize derivedJvmHeapMemorySize = totalFlinkMemorySize.subtract(offHeapMemorySize);
return createJobManagerFlinkMemory(config, derivedJvmHeapMemorySize, offHeapMemorySize);
return createJobManagerFlinkMemory(derivedJvmHeapMemorySize, offHeapMemorySize);
}

private static JobManagerFlinkMemory createJobManagerFlinkMemory(
Configuration config,
MemorySize jvmHeap,
MemorySize offHeapMemory) {
verifyJvmHeapSize(jvmHeap);
verifyJobStoreCacheSize(config, jvmHeap);
return new JobManagerFlinkMemory(jvmHeap, offHeapMemory);
}

Expand All @@ -127,18 +125,4 @@ private static void verifyJvmHeapSize(MemorySize jvmHeapSize) {
JobManagerOptions.MIN_JVM_HEAP_SIZE.toHumanReadableString());
}
}

private static void verifyJobStoreCacheSize(Configuration config, MemorySize jvmHeapSize) {
MemorySize jobStoreCacheHeapSize =
MemorySize.parse(config.getLong(JobManagerOptions.JOB_STORE_CACHE_SIZE) + "b");
if (jvmHeapSize.compareTo(jobStoreCacheHeapSize) < 0) {
LOG.warn(
"The configured or derived JVM heap memory size ({}: {}) is less than the configured or default size " +
"of the job store cache ({}: {})",
JobManagerOptions.JVM_HEAP_MEMORY.key(),
jvmHeapSize.toHumanReadableString(),
JobManagerOptions.JOB_STORE_CACHE_SIZE.key(),
jobStoreCacheHeapSize.toHumanReadableString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,27 +83,6 @@ public void testLogFailureOfJvmHeapSizeMinSizeVerification() {
JobManagerOptions.MIN_JVM_HEAP_SIZE.toHumanReadableString()))));
}

@Test
public void testLogFailureOfJobStoreCacheSizeVerification() {
MemorySize jvmHeapMemory = MemorySize.parse("150m");
MemorySize jobStoreCacheSize = MemorySize.parse("200m");

Configuration conf = new Configuration();
conf.set(JobManagerOptions.JVM_HEAP_MEMORY, jvmHeapMemory);
conf.set(JobManagerOptions.JOB_STORE_CACHE_SIZE, jobStoreCacheSize.getBytes());

JobManagerProcessUtils.processSpecFromConfig(conf);
MatcherAssert.assertThat(
testLoggerResource.getMessages(),
hasItem(containsString(String.format(
"The configured or derived JVM heap memory size (%s: %s) is less than the configured or default size " +
"of the job store cache (%s: %s)",
JobManagerOptions.JVM_HEAP_MEMORY.key(),
jvmHeapMemory.toHumanReadableString(),
JobManagerOptions.JOB_STORE_CACHE_SIZE.key(),
jobStoreCacheSize.toHumanReadableString()))));
}

@Test
public void testConfigOffHeapMemory() {
MemorySize offHeapMemory = MemorySize.parse("100m");
Expand Down

0 comments on commit 1adfd58

Please sign in to comment.