Skip to content

Commit

Permalink
[FLINK-25402] Make JobManager process resource id configurable via jo…
Browse files Browse the repository at this point in the history
…bmanager.resource-id
  • Loading branch information
tillrohrmann committed Jan 19, 2022
1 parent 6bbf57f commit f107710
Show file tree
Hide file tree
Showing 15 changed files with 70 additions and 5 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/all_jobmanager_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<td>Integer</td>
<td>The size of the IO thread pool to run blocking operations for all spawned JobMasters. This includes recovery and completion of checkpoints. Increase this value if you experience slow checkpoint operations when running many jobs. If no value is specified, then Flink defaults to the number of available CPU cores.</td>
</tr>
<tr>
<td><h5>jobmanager.resource-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The JobManager's ResourceID. If not configured, the ResourceID will be generated randomly.</td>
</tr>
<tr>
<td><h5>jobmanager.retrieve-taskmanager-hostname</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
<td>MemorySize</td>
<td>Total Process Memory size for the JobManager. This includes all the memory that a JobManager JVM process consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. In containerized setups, this should be set to the container memory. See also 'jobmanager.memory.flink.size' for Total Flink Memory size configuration.</td>
</tr>
<tr>
<td><h5>jobmanager.resource-id</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The JobManager's ResourceID. If not configured, the ResourceID will be generated randomly.</td>
</tr>
<tr>
<td><h5>jobmanager.retrieve-taskmanager-hostname</h5></td>
<td style="word-wrap: break-word;">true</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,17 @@ public enum SchedulerType {
code(SchedulerType.AdaptiveBatch.name()))
.build());

/**
* The JobManager's ResourceID. If not configured, the ResourceID will be generated randomly.
*/
@Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
public static final ConfigOption<String> JOB_MANAGER_RESOURCE_ID =
key("jobmanager.resource-id")
.stringType()
.noDefaultValue()
.withDescription(
"The JobManager's ResourceID. If not configured, the ResourceID will be generated randomly.");

// ---------------------------------------------------------------------------------------------

private JobManagerOptions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.management.jmx.JMXService;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.MiniDispatcher;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
Expand Down Expand Up @@ -123,6 +124,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro

private final AtomicBoolean isShutDown = new AtomicBoolean(false);

@GuardedBy("lock")
private ResourceID resourceId;

@GuardedBy("lock")
private DispatcherResourceManagerComponent clusterComponent;

Expand Down Expand Up @@ -253,6 +257,7 @@ private void runCluster(Configuration configuration, PluginManager pluginManager
clusterComponent =
dispatcherResourceManagerComponentFactory.create(
configuration,
resourceId,
ioExecutor,
commonRpcService,
haServices,
Expand Down Expand Up @@ -338,6 +343,17 @@ protected void initializeServices(Configuration configuration, PluginManager plu
executionGraphInfoStore =
createSerializableExecutionGraphStore(
configuration, commonRpcService.getScheduledExecutor());

resourceId =
configuration
.getOptional(JobManagerOptions.JOB_MANAGER_RESOURCE_ID)
.map(ResourceID::new)
.orElseGet(ResourceID::generate);

LOG.debug(
"Initialize cluster entrypoint {} with resource id {}.",
getClass().getSimpleName(),
resourceId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.DispatcherOperationCaches;
Expand Down Expand Up @@ -101,6 +102,7 @@ public DefaultDispatcherResourceManagerComponentFactory(
@Override
public DispatcherResourceManagerComponent create(
Configuration configuration,
ResourceID resourceId,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
Expand Down Expand Up @@ -178,6 +180,7 @@ public DispatcherResourceManagerComponent create(
ResourceManagerServiceImpl.create(
resourceManagerFactory,
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand All @@ -35,6 +36,7 @@ public interface DispatcherResourceManagerComponentFactory {

DispatcherResourceManagerComponent create(
Configuration configuration,
ResourceID resourceId,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ private void setupDispatcherResourceManagerComponents(
return Collections.singleton(
dispatcherResourceManagerComponentFactory.create(
configuration,
ResourceID.generate(),
ioExecutor,
rpcServiceFactory.createRpcService(),
haServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public abstract class ResourceManagerFactory<T extends ResourceIDRetrievable> {

public ResourceManagerProcessContext createResourceManagerProcessContext(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
Expand Down Expand Up @@ -77,6 +78,7 @@ public ResourceManagerProcessContext createResourceManagerProcessContext(

return new ResourceManagerProcessContext(
rmConfig,
resourceId,
runtimeServiceConfig,
rpcService,
highAvailabilityServices,
Expand All @@ -90,8 +92,7 @@ public ResourceManagerProcessContext createResourceManagerProcessContext(
}

public ResourceManager<T> createResourceManager(
ResourceManagerProcessContext context, UUID leaderSessionId, ResourceID resourceId)
throws Exception {
ResourceManagerProcessContext context, UUID leaderSessionId) throws Exception {

final ResourceManagerRuntimeServices resourceManagerRuntimeServices =
createResourceManagerRuntimeServices(
Expand All @@ -102,7 +103,7 @@ public ResourceManager<T> createResourceManager(

return createResourceManager(
context.getRmConfig(),
resourceId,
context.getResourceId(),
context.getRpcService(),
leaderSessionId,
context.getHeartbeatServices(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.resourcemanager;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand All @@ -41,6 +42,7 @@
*/
public class ResourceManagerProcessContext {
private final Configuration rmConfig;
private final ResourceID resourceId;
private final ResourceManagerRuntimeServicesConfiguration rmRuntimeServicesConfig;
private final RpcService rpcService;
private final HighAvailabilityServices highAvailabilityServices;
Expand All @@ -54,6 +56,7 @@ public class ResourceManagerProcessContext {

public ResourceManagerProcessContext(
Configuration rmConfig,
ResourceID resourceId,
ResourceManagerRuntimeServicesConfiguration rmRuntimeServicesConfig,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
Expand All @@ -65,6 +68,7 @@ public ResourceManagerProcessContext(
SlotManagerMetricGroup slotManagerMetricGroup,
Executor ioExecutor) {
this.rmConfig = checkNotNull(rmConfig);
this.resourceId = checkNotNull(resourceId);
this.rmRuntimeServicesConfig = checkNotNull(rmRuntimeServicesConfig);
this.rpcService = checkNotNull(rpcService);
this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
Expand All @@ -82,6 +86,10 @@ public Configuration getRmConfig() {
return rmConfig;
}

public ResourceID getResourceId() {
return resourceId;
}

public ResourceManagerRuntimeServicesConfiguration getRmRuntimeServicesConfig() {
return rmRuntimeServicesConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,7 @@ private void startNewLeaderResourceManager(UUID newLeaderSessionID) throws Excep

this.leaderSessionID = newLeaderSessionID;
this.leaderResourceManager =
resourceManagerFactory.createResourceManager(
rmProcessContext, newLeaderSessionID, ResourceID.generate());
resourceManagerFactory.createResourceManager(rmProcessContext, newLeaderSessionID);

final ResourceManager<?> newLeaderResourceManager = this.leaderResourceManager;

Expand Down Expand Up @@ -334,6 +333,7 @@ public ResourceManager<?> getLeaderResourceManager() {
public static ResourceManagerServiceImpl create(
ResourceManagerFactory<?> resourceManagerFactory,
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
Expand All @@ -349,6 +349,7 @@ public static ResourceManagerServiceImpl create(
resourceManagerFactory,
resourceManagerFactory.createResourceManagerProcessContext(
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
Expand Down Expand Up @@ -409,6 +410,7 @@ private class PersistingMiniCluster extends MiniCluster {
return Collections.singleton(
dispatcherResourceManagerComponentFactory.create(
configuration,
ResourceID.generate(),
getIOExecutor(),
rpcServiceFactory.createRpcService(),
haServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
Expand Down Expand Up @@ -156,6 +157,7 @@ protected HighAvailabilityServices createHighAvailabilityServices(
result.add(
dispatcherResourceManagerComponentFactory.create(
configuration,
ResourceID.generate(),
getIOExecutor(),
rpcServiceFactory.createRpcService(),
haServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
Expand Down Expand Up @@ -126,6 +127,7 @@ private void createResourceManager() throws Exception {
ResourceManagerServiceImpl.create(
rmFactory,
new Configuration(),
ResourceID.generate(),
rpcService,
haService,
heartbeatServices,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
Expand Down Expand Up @@ -184,6 +185,7 @@ public TestingResourceManagerService build() throws Exception {
ResourceManagerServiceImpl.create(
StandaloneResourceManagerFactory.getInstance(),
new Configuration(),
ResourceID.generate(),
rpcService,
haServices,
new TestingHeartbeatServices(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
Expand Down Expand Up @@ -140,6 +141,7 @@ public void testCancelingOnProcessFailure() throws Throwable {
dispatcherResourceManagerComponent =
resourceManagerComponentFactory.create(
config,
ResourceID.generate(),
ioExecutor,
rpcService,
haServices,
Expand Down

0 comments on commit f107710

Please sign in to comment.