From c8f27c25e8726360bd09fd21fa8e908c40376881 Mon Sep 17 00:00:00 2001 From: Rui Fan <1996fanrui@gmail.com> Date: Fri, 12 Jan 2024 17:12:25 +0800 Subject: [PATCH] [FLINK-34081][configuration] Refactor all callers of deprecated `getXxx(ConfigOption)`, `getXxx(ConfigOption, Xxx)` and `setXxx(ConfigOption, Xxx)` methods of Configuration --- .../org/apache/flink/client/ClientUtils.java | 2 +- .../client/cli/AbstractCustomCommandLine.java | 4 +- .../apache/flink/client/cli/CliFrontend.java | 8 +- .../apache/flink/client/cli/DefaultCLI.java | 7 +- .../client/cli/ExecutionConfigAccessor.java | 6 +- .../apache/flink/client/cli/GenericCLI.java | 4 +- .../flink/client/cli/ProgramOptions.java | 7 +- ...ractContainerizedClusterClientFactory.java | 2 +- .../deployment/StandaloneClientFactory.java | 3 +- .../StandaloneClusterDescriptor.java | 4 +- .../ApplicationDispatcherBootstrap.java | 4 +- .../deployment/executors/LocalExecutor.java | 4 +- .../executors/PipelineExecutorUtils.java | 6 +- .../client/program/ContextEnvironment.java | 8 +- .../program/PerJobMiniClusterFactory.java | 2 +- .../program/StreamContextEnvironment.java | 8 +- .../program/rest/RestClusterClient.java | 6 +- .../rest/RestClusterClientConfiguration.java | 6 +- .../flink/client/ClientHeartbeatTest.java | 8 +- .../client/cli/CliFrontendTestUtils.java | 4 +- .../flink/client/cli/GenericCLITest.java | 12 +-- .../cli/util/DummyClusterClientFactory.java | 2 +- .../cli/util/DummyCustomCommandLine.java | 2 +- .../ClusterClientServiceLoaderTest.java | 14 +-- .../flink/client/program/ClientTest.java | 8 +- ...DefaultPackagedProgramRetrieverITCase.java | 6 +- .../program/ExecutionPlanCreationTest.java | 4 +- ...estClusterClientCheckpointTriggerTest.java | 10 +-- .../RestClusterClientConfigurationTest.java | 6 +- ...RestClusterClientSavepointTriggerTest.java | 10 +-- .../program/rest/RestClusterClientTest.java | 10 +-- ...SingleThreadMultiplexSourceReaderBase.java | 2 +- .../source/reader/SourceReaderOptions.java | 4 +- .../source/hybrid/HybridSourceReaderTest.java | 4 +- .../CoordinatedSourceRescaleITCase.java | 5 +- .../source/reader/SourceReaderBaseTest.java | 4 +- .../source/reader/mocks/MockBaseSource.java | 4 +- .../file/table/FileSystemTableSink.java | 2 +- .../stream/PartitionTimeCommitPredicate.java | 3 +- .../file/sink/FileSinkSpeculativeITCase.java | 4 +- .../table/stream/StreamingFileWriterTest.java | 4 +- .../flink/connectors/hive/HiveTableSink.java | 8 +- .../connectors/hive/HiveTableSource.java | 6 +- ...eServer2DelegationTokenProviderITCase.java | 4 +- .../flink/api/common/ExecutionConfig.java | 2 +- .../api/common/io/DelimitedInputFormat.java | 6 +- .../flink/api/common/io/FileOutputFormat.java | 4 +- .../configuration/ConfigurationUtils.java | 16 ++-- .../DelegatingConfiguration.java | 36 ++++---- .../flink/configuration/SecurityOptions.java | 10 +-- .../configuration/TaskManagerOptions.java | 2 +- .../org/apache/flink/core/fs/FileSystem.java | 5 +- .../core/fs/LimitedConnectionsFileSystem.java | 10 +-- .../FilesystemSchemeConfigTest.java | 7 +- .../flink/configuration/RestOptionsTest.java | 4 +- .../configuration/SecurityOptionsTest.java | 22 ++--- .../LimitedConnectionsConfigurationTest.java | 6 +- .../fs/FsStateChangelogStorageFactory.java | 4 +- .../tests/AllroundMiniClusterTest.java | 2 +- .../flink/externalresource/gpu/GPUDriver.java | 4 +- .../externalresource/gpu/GPUDriverTest.java | 14 +-- .../fs/osshadoop/OSSFileSystemFactory.java | 4 +- ...opOSSRecoverableWriterExceptionITCase.java | 4 +- .../HadoopOSSRecoverableWriterITCase.java | 4 +- .../common/AbstractS3FileSystemFactory.java | 8 +- ...oopS3RecoverableWriterExceptionITCase.java | 6 +- .../HadoopS3RecoverableWriterITCase.java | 6 +- .../PrestoS3RecoverableWriterTest.java | 8 +- .../flink/orc/AbstractOrcFileInputFormat.java | 3 +- .../parquet/ParquetVectorizedInputFormat.java | 2 +- .../org/apache/flink/hdfstests/HDFSTest.java | 24 +++--- .../flink/api/java/ExecutionEnvironment.java | 6 +- .../flink/api/java/RemoteEnvironment.java | 8 +- .../java/RemoteEnvironmentConfigUtils.java | 8 +- .../java/utils/AbstractParameterToolTest.java | 6 +- .../KubernetesClusterClientFactory.java | 6 +- .../KubernetesClusterDescriptor.java | 12 +-- .../KubernetesResourceManagerDriver.java | 4 +- ...KubernetesLeaderElectionConfiguration.java | 2 +- .../entrypoint/KubernetesEntrypointUtils.java | 14 +-- .../KubernetesResourceManagerFactory.java | 2 +- .../KubernetesWorkerResourceSpecFactory.java | 2 +- .../kubeclient/Fabric8FlinkKubeClient.java | 4 +- .../kubeclient/FlinkKubeClientFactory.java | 9 +- .../decorators/InternalServiceDecorator.java | 2 +- .../factory/KubernetesJobManagerFactory.java | 4 +- .../factory/KubernetesTaskManagerFactory.java | 4 +- .../AbstractKubernetesParameters.java | 19 ++--- .../KubernetesJobManagerParameters.java | 16 ++-- .../KubernetesTaskManagerParameters.java | 7 +- .../KubernetesTaskExecutorRunner.java | 2 +- .../kubernetes/utils/KubernetesUtils.java | 4 +- .../KubernetesClusterClientFactoryTest.java | 2 +- .../KubernetesClusterDescriptorTest.java | 9 +- .../flink/kubernetes/KubernetesExtension.java | 2 +- .../KubernetesResourceManagerDriverTest.java | 4 +- .../flink/kubernetes/KubernetesTestBase.java | 6 +- .../cli/KubernetesSessionCliTest.java | 8 +- ...bernetesWorkerResourceSpecFactoryTest.java | 16 ++-- ...netesLeaderElectionAndRetrievalITCase.java | 2 +- .../KubernetesTestFixture.java | 2 +- .../Fabric8FlinkKubeClientTest.java | 4 +- .../DecoratorWithPodTemplateTestBase.java | 2 +- .../InitTaskManagerDecoratorTest.java | 3 +- .../InternalServiceDecoratorTest.java | 4 +- ...ernetesFactoryWithPodTemplateTestBase.java | 2 +- .../kubernetes/utils/KubernetesUtilsTest.java | 2 +- .../api/runtime/SavepointEnvironment.java | 2 +- .../jobmanager/JMXJobManagerMetricTest.java | 2 +- .../org/apache/flink/optimizer/Optimizer.java | 2 +- .../plantranslate/JobGraphGenerator.java | 5 +- .../metric/process/FlinkMetricContainer.java | 2 +- .../client/python/PythonEnvUtilsTest.java | 2 +- .../flink/python/PythonOptionsTest.java | 40 ++++----- ...ythonStreamGroupAggregateOperatorTest.java | 10 +-- ...StreamGroupTableAggregateOperatorTest.java | 10 +-- ...treamGroupWindowAggregateOperatorTest.java | 8 +- ...honGroupAggregateFunctionOperatorTest.java | 6 +- ...upWindowAggregateFunctionOperatorTest.java | 6 +- ...erWindowAggregateFunctionOperatorTest.java | 6 +- ...upWindowAggregateFunctionOperatorTest.java | 4 +- ...PythonRowTimeBoundedRangeOperatorTest.java | 4 +- ...wPythonRowTimeBoundedRowsOperatorTest.java | 4 +- .../PythonScalarFunctionOperatorTestBase.java | 12 +-- .../PythonTableFunctionOperatorTestBase.java | 8 +- .../HAQueryableStateFsBackendITCase.java | 22 ++--- .../HAQueryableStateRocksDBBackendITCase.java | 22 ++--- .../NonHAQueryableStateFsBackendITCase.java | 16 ++-- ...nHAQueryableStateRocksDBBackendITCase.java | 16 ++-- .../rpc/pekko/ActorSystemBootstrapTools.java | 12 +-- .../rpc/pekko/PekkoRpcServiceUtils.java | 5 +- .../flink/runtime/rpc/pekko/PekkoUtils.java | 44 +++++----- .../rpc/pekko/MessageSerializationTest.java | 2 +- ...oRpcActorOversizedResponseMessageTest.java | 2 +- .../runtime/rpc/pekko/PekkoUtilsTest.java | 6 +- .../webmonitor/history/HistoryServer.java | 17 ++-- .../runtime/webmonitor/utils/LogUrlUtil.java | 2 +- .../utils/WebFrontendBootstrap.java | 3 +- .../runtime/webmonitor/WebFrontendITCase.java | 2 +- .../webmonitor/WebMonitorUtilsTest.java | 2 +- .../webmonitor/history/HistoryServerTest.java | 16 ++-- .../utils/WebFrontendBootstrapTest.java | 5 +- .../flink/runtime/blob/AbstractBlobCache.java | 2 +- .../apache/flink/runtime/blob/BlobClient.java | 6 +- .../apache/flink/runtime/blob/BlobServer.java | 12 +-- .../apache/flink/runtime/blob/BlobUtils.java | 2 +- .../runtime/blob/PermanentBlobCache.java | 2 +- .../runtime/blob/TransientBlobCache.java | 2 +- .../runtime/blocklist/BlocklistUtils.java | 2 +- .../DefaultCompletedCheckpointStoreUtils.java | 2 +- .../clusterframework/BootstrapTools.java | 10 +-- .../TaskExecutorProcessSpecBuilder.java | 2 +- .../TaskExecutorProcessUtils.java | 8 +- .../flink/runtime/dispatcher/Dispatcher.java | 2 +- .../dispatcher/HistoryServerArchivist.java | 2 +- .../dispatcher/JobDispatcherFactory.java | 2 +- .../CheckpointResourcesCleanupRunner.java | 3 +- .../cleanup/CleanupRetryStrategyFactory.java | 7 +- .../runtime/entrypoint/ClusterEntrypoint.java | 34 ++++---- .../entrypoint/ClusterEntrypointUtils.java | 2 +- .../entrypoint/SessionClusterEntrypoint.java | 7 +- ...atcherResourceManagerComponentFactory.java | 6 +- .../component/FileJobGraphRetriever.java | 2 +- .../DefaultExecutionGraphBuilder.java | 4 +- .../FailoverStrategyFactoryLoader.java | 3 +- ...FailureRateRestartBackoffTimeStrategy.java | 2 +- .../FixedDelayRestartBackoffTimeStrategy.java | 3 +- ...tionGroupReleaseStrategyFactoryLoader.java | 2 +- .../ExternalResourceUtils.java | 2 +- .../runtime/failure/FailureEnricherUtils.java | 2 +- .../runtime/heartbeat/HeartbeatServices.java | 4 +- .../HighAvailabilityServicesUtils.java | 13 ++- .../runtime/io/network/netty/NettyConfig.java | 20 ++--- .../shuffle/TieredInternalShuffleMaster.java | 2 +- .../jobgraph/SavepointRestoreSettings.java | 2 +- .../jobmanager/JobManagerProcessUtils.java | 3 +- ...efaultSlotPoolServiceSchedulerFactory.java | 4 +- .../jobmaster/JobManagerSharedServices.java | 12 ++- .../flink/runtime/jobmaster/JobMaster.java | 2 +- .../jobmaster/JobMasterConfiguration.java | 2 +- .../metrics/MetricRegistryConfiguration.java | 2 +- .../flink/runtime/metrics/ReporterSetup.java | 2 +- .../runtime/metrics/TraceReporterSetup.java | 3 +- .../runtime/metrics/scope/ScopeFormats.java | 14 +-- .../runtime/metrics/util/MetricUtils.java | 5 +- .../runtime/minicluster/MiniCluster.java | 2 +- .../minicluster/MiniClusterConfiguration.java | 21 +++-- .../apache/flink/runtime/net/SSLUtils.java | 30 +++---- .../AbstractCachedBuildSideJoinDriver.java | 2 +- .../flink/runtime/operators/JoinDriver.java | 2 +- .../RetryingRegistrationConfiguration.java | 8 +- ...ceManagerRuntimeServicesConfiguration.java | 2 +- .../active/ActiveResourceManagerFactory.java | 2 +- .../slotmanager/SlotManagerConfiguration.java | 14 ++- .../apache/flink/runtime/rest/RestClient.java | 2 +- .../runtime/rest/RestClientConfiguration.java | 6 +- .../rest/RestServerEndpointConfiguration.java | 13 ++- .../handler/RestHandlerConfiguration.java | 15 ++-- .../JobManagerProfilingFileHandler.java | 2 +- .../legacy/metrics/MetricFetcherImpl.java | 5 +- .../DefaultExecutionGraphFactory.java | 3 +- .../scheduler/DefaultSchedulerFactory.java | 3 +- .../runtime/scheduler/SchedulerBase.java | 2 +- .../scheduler/adaptive/AdaptiveScheduler.java | 3 +- .../AdaptiveBatchSchedulerFactory.java | 2 +- ...VertexParallelismAndInputInfosDecider.java | 3 +- .../adaptivebatch/SpeculativeScheduler.java | 2 +- .../ExecutionTimeBasedSlowTaskDetector.java | 4 +- .../security/SecurityConfiguration.java | 16 ++-- .../security/modules/HadoopModule.java | 2 +- .../runtime/security/modules/JaasModule.java | 2 +- .../token/DefaultDelegationTokenManager.java | 2 +- .../DefaultDelegationTokenManagerFactory.java | 2 +- .../HadoopFSDelegationTokenProvider.java | 5 +- .../runtime/shuffle/NettyShuffleMaster.java | 11 ++- .../runtime/shuffle/ShuffleServiceLoader.java | 2 +- .../TaskExecutorLocalStateStoresManager.java | 2 +- .../StateChangelogStorageLoader.java | 4 +- .../QueryableStateConfiguration.java | 15 ++-- .../TaskExecutorResourceUtils.java | 2 +- .../TaskManagerConfiguration.java | 5 +- .../taskexecutor/TaskManagerRunner.java | 16 ++-- .../taskexecutor/TaskManagerServices.java | 4 +- .../TaskManagerServicesConfiguration.java | 10 +-- .../runtime/taskmanager/MemoryLogger.java | 6 +- .../NettyShuffleEnvironmentConfiguration.java | 43 ++++------ .../flink/runtime/taskmanager/Task.java | 6 +- .../taskmanager/TaskManagerRuntimeInfo.java | 2 +- .../util/ConfigurationParserUtils.java | 2 +- .../util/SlotSelectionStrategyUtils.java | 2 +- .../flink/runtime/util/ZooKeeperUtils.java | 18 ++-- .../MemoryBackwardsCompatibilityUtils.java | 3 +- .../config/memory/ProcessMemoryUtils.java | 2 +- .../TaskExecutorFlinkMemoryUtils.java | 3 +- .../util/profiler/ProfilingService.java | 4 +- .../webmonitor/WebMonitorEndpoint.java | 6 +- .../runtime/webmonitor/WebMonitorUtils.java | 2 +- .../history/HistoryServerUtils.java | 6 +- .../runtime/blob/BlobCacheCleanupTest.java | 12 +-- .../runtime/blob/BlobCacheCorruptionTest.java | 4 +- .../runtime/blob/BlobCacheRecoveryTest.java | 4 +- .../runtime/blob/BlobCacheRetriesTest.java | 8 +- .../runtime/blob/BlobCacheSuccessTest.java | 26 +++--- .../flink/runtime/blob/BlobClientSslTest.java | 2 +- .../flink/runtime/blob/BlobClientTest.java | 6 +- .../runtime/blob/BlobServerCleanupTest.java | 4 +- .../blob/BlobServerCorruptionTest.java | 6 +- .../flink/runtime/blob/BlobServerGetTest.java | 8 +- .../runtime/blob/BlobServerRangeTest.java | 6 +- .../runtime/blob/BlobServerRecoveryTest.java | 4 +- .../flink/runtime/blob/BlobServerSslTest.java | 28 +++--- .../blob/BlobUtilsNonWritableTest.java | 3 +- .../flink/runtime/blob/BlobUtilsTest.java | 4 +- .../runtime/blob/TestingBlobHelpers.java | 10 +-- ...aultCompletedCheckpointStoreUtilsTest.java | 2 +- ...ZKCheckpointIDCounterMultiServersTest.java | 2 +- ...ZooKeeperCompletedCheckpointStoreTest.java | 8 +- .../clusterframework/BootstrapToolsTest.java | 11 ++- .../TaskExecutorProcessUtilsTest.java | 38 ++++----- .../TaskDeploymentDescriptorTest.java | 2 +- .../ZooKeeperDefaultDispatcherRunnerTest.java | 6 +- .../BlobLibraryCacheManagerTest.java | 6 +- .../BlobLibraryCacheRecoveryITCase.java | 10 +-- .../DefaultExecutionGraphDeploymentTest.java | 2 +- ...utionGraphDeploymentWithBlobCacheTest.java | 2 +- ...tionGraphDeploymentWithBlobServerTest.java | 2 +- ...oymentWithSmallBlobCacheSizeLimitTest.java | 2 +- .../FailoverStrategyFactoryLoaderTest.java | 6 +- ...tBackoffTimeStrategyFactoryLoaderTest.java | 36 ++++---- ...GroupReleaseStrategyFactoryLoaderTest.java | 3 +- .../failure/FailureEnricherUtilsTest.java | 11 ++- .../HighAvailabilityServicesUtilsTest.java | 10 +-- .../netty/NettyClientServerSslTest.java | 23 +++-- .../netty/NettyConnectionManagerTest.java | 6 +- .../Prio0InboundChannelHandlerFactory.java | 4 +- .../partition/FileBufferReaderITCase.java | 5 +- .../PartialConsumePipelinedResultTest.java | 3 +- .../jobmanager/BlobsCleanupITCase.java | 8 +- .../jobmanager/HighAvailabilityModeTest.java | 4 +- .../JobManagerSharedServicesTest.java | 4 +- .../JobMasterPartitionReleaseTest.java | 2 +- .../runtime/jobmaster/JobMasterTest.java | 8 +- .../leaderelection/LeaderElectionTest.java | 4 +- .../ZooKeeperLeaderElectionTest.java | 4 +- .../ZooKeeperLeaderRetrievalTest.java | 4 +- .../metrics/MetricRegistryImplTest.java | 14 +-- .../runtime/metrics/ReporterSetupTest.java | 2 +- .../groups/AbstractMetricGroupTest.java | 4 +- .../groups/InternalOperatorGroupTest.java | 2 +- .../metrics/groups/JobManagerGroupTest.java | 2 +- .../groups/JobManagerJobGroupTest.java | 8 +- .../groups/JobManagerOperatorGroupTest.java | 8 +- .../metrics/groups/TaskManagerGroupTest.java | 2 +- .../groups/TaskManagerJobGroupTest.java | 8 +- .../metrics/groups/TaskMetricGroupTest.java | 11 ++- .../runtime/metrics/util/MetricUtilsTest.java | 3 +- .../minicluster/MiniClusterITCase.java | 4 +- .../TestingMiniClusterConfiguration.java | 11 ++- .../flink/runtime/net/SSLUtilsTest.java | 85 +++++++++---------- .../CoordinatorEventsExactlyOnceITCase.java | 2 +- ...RetryingRegistrationConfigurationTest.java | 9 +- .../active/ActiveResourceManagerTest.java | 4 +- .../rest/MultipartUploadExtension.java | 8 +- .../Prio0OutboundChannelHandlerFactory.java | 2 +- .../flink/runtime/rest/RestClientTest.java | 6 +- .../rest/RestExternalHandlersITCase.java | 12 +-- .../RestServerEndpointConfigurationTest.java | 10 +-- .../rest/RestServerEndpointITCase.java | 36 ++++---- .../runtime/rest/RestServerSSLAuthITCase.java | 36 ++++---- .../rest/handler/AbstractHandlerTest.java | 8 +- .../handler/RestHandlerConfigurationTest.java | 6 +- .../DocumentingDispatcherRestEndpoint.java | 4 +- .../flink/runtime/rpc/RpcSSLAuthITCase.java | 30 +++---- .../runtime/scheduler/SchedulerUtilsTest.java | 2 +- ...exParallelismAndInputInfosDeciderTest.java | 4 +- .../runtime/security/SecurityUtilsTest.java | 10 +-- .../security/modules/JaasModuleTest.java | 2 +- .../hadoop/KerberosLoginProviderITCase.java | 22 ++--- .../runtime/shuffle/ShuffleMasterTest.java | 2 +- .../shuffle/ShuffleServiceLoaderTest.java | 4 +- .../state/CheckpointStorageLoaderTest.java | 4 +- .../state/StateBackendLoadingTest.java | 50 +++++------ .../runtime/state/StateBackendTestBase.java | 2 +- ...skExecutorLocalStateStoresManagerTest.java | 5 +- .../LatencyTrackingStateConfigTest.java | 6 +- .../metrics/LatencyTrackingStateTestBase.java | 7 +- ...tyShuffleEnvironmentConfigurationTest.java | 8 +- .../TaskExecutorSubmissionTest.java | 10 +-- .../taskexecutor/TaskExecutorTest.java | 9 +- .../TaskManagerRunnerConfigurationTest.java | 16 ++-- .../TaskManagerRunnerStartupTest.java | 6 +- .../taskexecutor/TaskManagerRunnerTest.java | 4 +- .../TaskSubmissionTestEnvironment.java | 6 +- ...TaskCancelAsyncProducerConsumerITCase.java | 2 +- .../flink/runtime/taskmanager/TaskTest.java | 14 +-- .../runtime/testutils/DispatcherProcess.java | 4 +- .../testutils/MiniClusterResource.java | 13 ++- .../TestingClusterEntrypointProcess.java | 4 +- .../runtime/testutils/ZooKeeperTestUtils.java | 15 ++-- .../runtime/util/JvmExitOnFatalErrorTest.java | 2 +- .../runtime/util/ZooKeeperUtilsTest.java | 2 +- .../memory/ProcessMemoryUtilsTestBase.java | 6 +- .../webmonitor/WebMonitorEndpointTest.java | 4 +- .../history/HistoryServerUtilsTest.java | 12 +-- .../runtime/zookeeper/ZooKeeperExtension.java | 2 +- .../ZooKeeperStateHandleStoreTest.java | 6 +- .../ChangelogStateBackendLoadingTest.java | 6 +- .../state/RocksDBNativeMetricMonitorTest.java | 8 +- .../state/RocksDBStateBackendConfigTest.java | 10 +-- .../state/RocksDBStateBackendFactoryTest.java | 12 +-- .../environment/RemoteStreamEnvironment.java | 12 +-- .../StreamExecutionEnvironment.java | 4 +- .../streaming/api/graph/StreamConfig.java | 14 +-- .../api/operators/AbstractStreamOperator.java | 4 +- .../operators/AbstractStreamOperatorV2.java | 4 +- .../api/operators/SourceOperator.java | 2 +- .../streaming/api/operators/StreamSource.java | 2 +- .../collect/CollectSinkFunction.java | 2 +- .../streaming/runtime/tasks/StreamTask.java | 4 +- ...onEnvironmentComplexConfigurationTest.java | 4 +- .../graph/StreamingJobGraphGeneratorTest.java | 2 +- .../collect/CollectSinkFunctionTest.java | 2 +- .../SourceOperatorLatencyMetricsTest.java | 6 +- ...treamSourceOperatorLatencyMetricsTest.java | 6 +- .../runtime/tasks/StreamTaskTest.java | 6 +- .../client/gateway/ExecutorImplITCase.java | 4 +- .../rest/SqlGatewayRestEndpointITCase.java | 2 +- .../DocumentingSqlGatewayRestEndpoint.java | 2 +- .../service/SqlGatewayServiceITCase.java | 4 +- .../apache/flink/table/api/TableConfig.java | 8 +- .../table/factories/TableFactoryUtil.java | 4 +- .../planner/delegation/PlannerContext.java | 2 +- .../BatchExecPythonGroupWindowAggregate.java | 3 +- .../nodes/exec/utils/CommonPythonUtil.java | 4 +- .../planner/factories/TestFileFactory.java | 5 +- .../utils/StreamExchangeModeUtilsTest.java | 18 ++-- .../batch/sql/join/ShuffledHashJoinTest.scala | 2 +- .../sql/StreamTableEnvironmentITCase.scala | 2 +- .../utils/StreamingWithStateTestBase.scala | 2 +- .../operators/join/HashJoinOperator.java | 2 +- .../sort/BinaryExternalSorterTest.java | 8 +- .../sort/BufferedKVExternalSorterTest.java | 4 +- .../container/FlinkContainersSettings.java | 2 +- .../MiniClusterTestEnvironment.java | 2 +- .../test/util/SecureTestEnvironment.java | 14 +-- .../metrics/SystemResourcesMetricsITCase.java | 8 +- .../accumulators/AccumulatorLiveITCase.java | 2 +- .../test/cancelling/CancelingTestBase.java | 4 +- .../checkpointing/AutoRescalingITCase.java | 13 ++- .../ChangelogLocalRecoveryITCase.java | 12 +-- ...gelogRecoverySwitchStateBackendITCase.java | 2 +- ...CheckpointAfterAllTasksFinishedITCase.java | 2 +- .../EventTimeWindowCheckpointingITCase.java | 12 ++- .../checkpointing/LocalRecoveryITCase.java | 2 +- .../NotifyCheckpointAbortedITCase.java | 4 +- .../checkpointing/RegionFailoverITCase.java | 4 +- .../RescaleCheckpointManuallyITCase.java | 4 +- .../test/checkpointing/RescalingITCase.java | 9 +- .../ResumeCheckpointManuallyITCase.java | 13 ++- .../checkpointing/SavepointFormatITCase.java | 2 +- .../test/checkpointing/SavepointITCase.java | 13 ++- .../StreamFaultToleranceTestBase.java | 4 +- .../UnalignedCheckpointTestBase.java | 7 +- .../utils/SnapshotMigrationTestBase.java | 11 ++- .../test/classloading/ClassLoaderITCase.java | 12 +-- .../example/client/JobRetrievalITCase.java | 4 +- .../example/client/LocalExecutorITCase.java | 6 +- .../StreamingScalabilityAndLatency.java | 2 +- .../operators/ExecutionEnvironmentITCase.java | 2 +- .../operators/RemoteEnvironmentITCase.java | 2 +- ...TaskManagerProcessFailureRecoveryTest.java | 18 ++-- .../BatchFineGrainedRecoveryITCase.java | 2 +- ...ManagerHAProcessFailureRecoveryITCase.java | 9 +- .../ProcessFailureCancelingITCase.java | 14 +-- ...ExponentialDelayRestartStrategyITBase.java | 9 +- ...mpleRecoveryFailureRateStrategyITBase.java | 4 +- ...coveryFixedDelayRestartStrategyITBase.java | 4 +- ...TaskManagerDisconnectOnShutdownITCase.java | 6 +- .../test/runtime/BatchShuffleITCaseBase.java | 2 +- .../test/runtime/BlockingShuffleITCase.java | 12 ++- .../DefaultSchedulerLocalRecoveryITCase.java | 4 +- .../test/runtime/IPv6HostnamesITCase.java | 4 +- .../flink/test/runtime/NettyEpollITCase.java | 2 +- .../flink/test/runtime/SchedulingITCase.java | 6 +- .../runtime/ShuffleCompressionITCase.java | 8 +- .../ZooKeeperLeaderElectionITCase.java | 2 +- .../AdaptiveBatchSchedulerITCase.java | 6 +- .../PipelinedRegionSchedulingITCase.java | 4 +- .../SpeculativeSchedulerITCase.java | 4 +- .../state/ChangelogCompatibilityITCase.java | 4 +- .../operator/restore/keyed/KeyedJob.java | 2 +- .../operator/restore/unkeyed/NonKeyedJob.java | 2 +- .../api/StreamExecutionEnvironmentITCase.java | 2 +- .../RemoteStreamEnvironmentTest.java | 4 +- .../PojoSerializerUpgradeTest.java | 4 +- .../flink/yarn/YARNFileReplicationITCase.java | 5 +- .../yarn/YARNHighAvailabilityITCase.java | 14 +-- .../flink/yarn/YARNSessionFIFOITCase.java | 3 +- .../yarn/YARNSessionFIFOSecuredITCase.java | 4 +- .../java/org/apache/flink/yarn/Utils.java | 6 +- .../flink/yarn/YarnClusterClientFactory.java | 4 +- .../flink/yarn/YarnClusterDescriptor.java | 60 ++++++------- .../flink/yarn/YarnResourceManagerDriver.java | 7 +- .../flink/yarn/YarnTaskExecutorRunner.java | 10 +-- .../flink/yarn/cli/FlinkYarnSessionCli.java | 36 ++++---- .../yarn/configuration/YarnLogConfigUtil.java | 2 +- .../YarnApplicationClusterEntryPoint.java | 2 +- .../yarn/entrypoint/YarnEntrypointUtils.java | 16 ++-- .../entrypoint/YarnJobClusterEntrypoint.java | 2 +- .../YarnSessionClusterEntrypoint.java | 2 +- .../YarnWorkerResourceSpecFactory.java | 2 +- .../flink/yarn/FlinkYarnSessionCliTest.java | 12 +-- .../yarn/YarnClusterClientFactoryTest.java | 2 +- .../flink/yarn/YarnClusterDescriptorTest.java | 10 +-- .../yarn/YarnTaskExecutorRunnerTest.java | 10 +-- .../entrypoint/YarnEntrypointUtilsTest.java | 12 +-- .../YarnWorkerResourceSpecFactoryTest.java | 16 ++-- 457 files changed, 1578 insertions(+), 1710 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java index b828534bea571..a104d041eff42 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java @@ -88,7 +88,7 @@ public static void executeProgram( LOG.info( "Starting program (detached: {})", - !configuration.getBoolean(DeploymentOptions.ATTACHED)); + !configuration.get(DeploymentOptions.ATTACHED)); ContextEnvironment.setAsContext( executorServiceLoader, diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java index 931a773d77693..a3e43198506e0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java @@ -56,11 +56,11 @@ public void addGeneralOptions(Options baseOptions) { @Override public Configuration toConfiguration(CommandLine commandLine) throws FlinkException { final Configuration resultingConfiguration = new Configuration(); - resultingConfiguration.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME); + resultingConfiguration.set(DeploymentOptions.TARGET, RemoteExecutor.NAME); if (commandLine.hasOption(zookeeperNamespaceOption.getOpt())) { String zkNamespace = commandLine.getOptionValue(zookeeperNamespaceOption.getOpt()); - resultingConfiguration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); + resultingConfiguration.set(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace); } return resultingConfiguration; diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 60b34f3db9b9d..228ab6c4e73a1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -1413,10 +1413,10 @@ public static String getConfigurationDirectoryFromEnv() { * @param config The configuration to write to */ static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) { - config.setString(JobManagerOptions.ADDRESS, address.getHostString()); - config.setInteger(JobManagerOptions.PORT, address.getPort()); - config.setString(RestOptions.ADDRESS, address.getHostString()); - config.setInteger(RestOptions.PORT, address.getPort()); + config.set(JobManagerOptions.ADDRESS, address.getHostString()); + config.set(JobManagerOptions.PORT, address.getPort()); + config.set(RestOptions.ADDRESS, address.getHostString()); + config.set(RestOptions.PORT, address.getPort()); } public static List loadCustomCommandLines( diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java index d7f2fb25ce45e..4090262158a56 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java @@ -65,11 +65,10 @@ public Configuration toConfiguration(CommandLine commandLine) throws FlinkExcept setJobManagerAddressInConfig(resultingConfiguration, jobManagerAddress); URL url = NetUtils.getCorrectHostnamePort(addressWithPort); - resultingConfiguration.setString(RestOptions.PATH, url.getPath()); - resultingConfiguration.setBoolean( - SecurityOptions.SSL_REST_ENABLED, isHttpsProtocol(url)); + resultingConfiguration.set(RestOptions.PATH, url.getPath()); + resultingConfiguration.set(SecurityOptions.SSL_REST_ENABLED, isHttpsProtocol(url)); } - resultingConfiguration.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME); + resultingConfiguration.set(DeploymentOptions.TARGET, RemoteExecutor.NAME); DynamicPropertiesUtil.encodeDynamicProperties(commandLine, resultingConfiguration); diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java index 4f35165c7f9ba..3f79c2f121221 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java @@ -83,11 +83,11 @@ public List getClasspaths() throws MalformedURLException { } public int getParallelism() { - return configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); + return configuration.get(CoreOptions.DEFAULT_PARALLELISM); } public boolean getDetachedMode() { - return !configuration.getBoolean(DeploymentOptions.ATTACHED); + return !configuration.get(DeploymentOptions.ATTACHED); } public SavepointRestoreSettings getSavepointRestoreSettings() { @@ -95,6 +95,6 @@ public SavepointRestoreSettings getSavepointRestoreSettings() { } public boolean isShutdownOnAttachedExit() { - return configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED); + return configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED); } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java index 67fb4653ec609..a53aab7d79a5e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/GenericCLI.java @@ -114,12 +114,12 @@ public Configuration toConfiguration(final CommandLine commandLine) { final String executorName = commandLine.getOptionValue(executorOption.getOpt()); if (executorName != null) { - resultConfiguration.setString(DeploymentOptions.TARGET, executorName); + resultConfiguration.set(DeploymentOptions.TARGET, executorName); } final String targetName = commandLine.getOptionValue(targetOption.getOpt()); if (targetName != null) { - resultConfiguration.setString(DeploymentOptions.TARGET, targetName); + resultConfiguration.set(DeploymentOptions.TARGET, targetName); } DynamicPropertiesUtil.encodeDynamicProperties(commandLine, resultConfiguration); diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java index b215cabea1f5e..83bd563ebe582 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java @@ -174,12 +174,11 @@ public SavepointRestoreSettings getSavepointRestoreSettings() { public void applyToConfiguration(Configuration configuration) { if (hasParallelismOpt) { - configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, getParallelism()); + configuration.set(CoreOptions.DEFAULT_PARALLELISM, getParallelism()); } - configuration.setBoolean(DeploymentOptions.ATTACHED, !getDetachedMode()); - configuration.setBoolean( - DeploymentOptions.SHUTDOWN_IF_ATTACHED, isShutdownOnAttachedExit()); + configuration.set(DeploymentOptions.ATTACHED, !getDetachedMode()); + configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, isShutdownOnAttachedExit()); ConfigUtils.encodeCollectionToConfig( configuration, PipelineOptions.CLASSPATHS, getClasspaths(), URL::toString); SavepointRestoreSettings.toConfiguration(getSavepointRestoreSettings(), configuration); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java index 2a0f33493fdbe..4cae294de88e8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractContainerizedClusterClientFactory.java @@ -54,7 +54,7 @@ public ClusterSpecification getClusterSpecification(Configuration configuration) .getTotalProcessMemorySize() .getMebiBytes(); - int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); + int slotsPerTaskManager = configuration.get(TaskManagerOptions.NUM_TASK_SLOTS); return new ClusterSpecification.ClusterSpecificationBuilder() .setMasterMemoryMB(jobManagerMemoryMB) diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java index 2a1f8d5e7ea81..bc564a7542bb0 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java @@ -34,8 +34,7 @@ public class StandaloneClientFactory implements ClusterClientFactory finish( DispatcherGateway dispatcherGateway, ApplicationStatus applicationStatus) { boolean shouldShutDownOnFinish = - configuration.getBoolean(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH); + configuration.get(DeploymentOptions.SHUTDOWN_ON_APPLICATION_FINISH); return shouldShutDownOnFinish ? dispatcherGateway.shutDownCluster(applicationStatus) : CompletableFuture.completedFuture(Acknowledge.get()); @@ -209,7 +209,7 @@ private CompletableFuture fixJobIdAndRunApplicationAsync( final Optional configuredJobId = configuration.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID); final boolean submitFailedJobOnApplicationError = - configuration.getBoolean(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR); + configuration.get(DeploymentOptions.SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR); if (!HighAvailabilityMode.isHighAvailabilityModeActivated(configuration) && !configuredJobId.isPresent()) { return runApplicationAsync( diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java index 898d2b733c97a..b8b60fe9adff1 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java @@ -76,7 +76,7 @@ public CompletableFuture execute( effectiveConfig.addAll(configuration); // we only support attached execution with the local executor. - checkState(configuration.getBoolean(DeploymentOptions.ATTACHED)); + checkState(configuration.get(DeploymentOptions.ATTACHED)); final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader); @@ -93,7 +93,7 @@ private JobGraph getJobGraph( if (pipeline instanceof Plan) { Plan plan = (Plan) pipeline; final int slotsPerTaskManager = - configuration.getInteger( + configuration.get( TaskManagerOptions.NUM_TASK_SLOTS, plan.getMaximumParallelism()); final int numTaskManagers = configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS); diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java index 137dca5f2f619..1858315a0c26d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java @@ -68,10 +68,10 @@ public static JobGraph getJobGraph( .getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) .ifPresent(strJobID -> jobGraph.setJobID(JobID.fromHexString(strJobID))); - if (configuration.getBoolean(DeploymentOptions.ATTACHED) - && configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { + if (configuration.get(DeploymentOptions.ATTACHED) + && configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { jobGraph.setInitialClientHeartbeatTimeout( - configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT)); + configuration.get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT)); } jobGraph.addJars(executionConfigAccessor.getJars()); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 4cd0d5aaf9216..6f811cbe31dd3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -94,12 +94,12 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro checkNotNull(jobClient); JobExecutionResult jobExecutionResult; - if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) { + if (getConfiguration().get(DeploymentOptions.ATTACHED)) { CompletableFuture jobExecutionResultFuture = jobClient.getJobExecutionResult(); ScheduledExecutorService clientHeartbeatService = null; - if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { + if (getConfiguration().get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { Thread shutdownHook = ShutdownHookUtil.addShutdownHook( () -> { @@ -119,8 +119,8 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro clientHeartbeatService = ClientUtils.reportHeartbeatPeriodically( jobClient, - getConfiguration().getLong(ClientOptions.CLIENT_HEARTBEAT_INTERVAL), - getConfiguration().getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT)); + getConfiguration().get(ClientOptions.CLIENT_HEARTBEAT_INTERVAL), + getConfiguration().get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT)); } jobExecutionResult = jobExecutionResultFuture.get(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java index 857946d47e5ee..af31e75fbe73c 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PerJobMiniClusterFactory.java @@ -120,7 +120,7 @@ private MiniClusterConfiguration getMiniClusterConfig(int maximumParallelism) { Configuration configuration = new Configuration(this.configuration); if (!configuration.contains(RestOptions.BIND_PORT)) { - configuration.setString(RestOptions.BIND_PORT, "0"); + configuration.set(RestOptions.BIND_PORT, "0"); } int numTaskManagers = configuration.get(TaskManagerOptions.MINI_CLUSTER_NUM_TASK_MANAGERS); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java index bc21e4d4c42db..055ebf7740123 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java @@ -138,12 +138,12 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro checkNotNull(jobClient); JobExecutionResult jobExecutionResult; - if (configuration.getBoolean(DeploymentOptions.ATTACHED)) { + if (configuration.get(DeploymentOptions.ATTACHED)) { CompletableFuture jobExecutionResultFuture = jobClient.getJobExecutionResult(); ScheduledExecutorService clientHeartbeatService = null; - if (configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { + if (configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { Thread shutdownHook = ShutdownHookUtil.addShutdownHook( () -> { @@ -163,8 +163,8 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro clientHeartbeatService = ClientUtils.reportHeartbeatPeriodically( jobClient, - configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_INTERVAL), - configuration.getLong(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT)); + configuration.get(ClientOptions.CLIENT_HEARTBEAT_INTERVAL), + configuration.get(ClientOptions.CLIENT_HEARTBEAT_TIMEOUT)); } jobExecutionResult = jobExecutionResultFuture.get(); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 6fa8325967192..74dcceee40ddc 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -253,9 +253,9 @@ private RestClusterClient( jobmanagerUrl = new URL( SecurityOptions.isRestSSLEnabled(configuration) ? "https" : "http", - configuration.getString(JobManagerOptions.ADDRESS), - configuration.getInteger(JobManagerOptions.PORT), - configuration.getString(RestOptions.PATH)); + configuration.get(JobManagerOptions.ADDRESS), + configuration.get(JobManagerOptions.PORT), + configuration.get(RestOptions.PATH)); if (restClient != null) { this.restClient = restClient; diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java index 7bc8da8d49ef8..c7c56b56f319e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClientConfiguration.java @@ -77,9 +77,9 @@ public static RestClusterClientConfiguration fromConfiguration(Configuration con RestClientConfiguration restClientConfiguration = RestClientConfiguration.fromConfiguration(config); - final long awaitLeaderTimeout = config.getLong(RestOptions.AWAIT_LEADER_TIMEOUT); - final int retryMaxAttempts = config.getInteger(RestOptions.RETRY_MAX_ATTEMPTS); - final long retryDelay = config.getLong(RestOptions.RETRY_DELAY); + final long awaitLeaderTimeout = config.get(RestOptions.AWAIT_LEADER_TIMEOUT); + final int retryMaxAttempts = config.get(RestOptions.RETRY_MAX_ATTEMPTS); + final long retryDelay = config.get(RestOptions.RETRY_DELAY); return new RestClusterClientConfiguration( restClientConfiguration, awaitLeaderTimeout, retryMaxAttempts, retryDelay); diff --git a/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java b/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java index 161a3d5870322..2e69431b18354 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/ClientHeartbeatTest.java @@ -96,8 +96,8 @@ private Configuration createConfiguration(boolean shutdownOnAttachedExit) { Dispatcher.CLIENT_ALIVENESS_CHECK_DURATION, Duration.ofMillis(clientHeartbeatInterval)); if (shutdownOnAttachedExit) { - configuration.setBoolean(DeploymentOptions.ATTACHED, true); - configuration.setBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true); + configuration.set(DeploymentOptions.ATTACHED, true); + configuration.set(DeploymentOptions.SHUTDOWN_IF_ATTACHED, true); } return configuration; } @@ -115,8 +115,8 @@ private JobClient submitJob(Configuration configuration) throws Exception { JobGraph cancellableJobGraph = getCancellableJobGraph(); // Enable heartbeat only when both execution.attached and // execution.shutdown-on-attached-exit are true. - if (configuration.getBoolean(DeploymentOptions.ATTACHED) - && configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { + if (configuration.get(DeploymentOptions.ATTACHED) + && configuration.get(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) { cancellableJobGraph.setInitialClientHeartbeatTimeout(clientHeartbeatTimeout); } return perJobMiniClusterFactory diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestUtils.java index 9fb72371fe9b0..1e98dd831651d 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestUtils.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendTestUtils.java @@ -86,8 +86,8 @@ public void write(int b) {} public static void checkJobManagerAddress( Configuration config, String expectedAddress, int expectedPort) { - String jobManagerAddress = config.getString(JobManagerOptions.ADDRESS); - int jobManagerPort = config.getInteger(JobManagerOptions.PORT, -1); + String jobManagerAddress = config.get(JobManagerOptions.ADDRESS); + int jobManagerPort = config.get(JobManagerOptions.PORT, -1); assertThat(jobManagerAddress).isEqualTo(expectedAddress); assertThat(jobManagerPort).isEqualTo(expectedPort); diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/GenericCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/GenericCLITest.java index 8fd4779ad5332..5e4d7b3e54558 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/GenericCLITest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/GenericCLITest.java @@ -69,8 +69,8 @@ void isActiveWhenTargetOnlyInConfig() throws CliArgsException { @Test void testWithPreexistingConfigurationInConstructor() throws CliArgsException { final Configuration loadedConfig = new Configuration(); - loadedConfig.setInteger(CoreOptions.DEFAULT_PARALLELISM, 2); - loadedConfig.setBoolean(DeploymentOptions.ATTACHED, false); + loadedConfig.set(CoreOptions.DEFAULT_PARALLELISM, 2); + loadedConfig.set(DeploymentOptions.ATTACHED, false); final ConfigOption> listOption = key("test.list").intType().asList().noDefaultValue(); @@ -92,9 +92,9 @@ void testWithPreexistingConfigurationInConstructor() throws CliArgsException { final Configuration configuration = cliUnderTest.toConfiguration(commandLine); - assertThat(configuration.getString(DeploymentOptions.TARGET)).isEqualTo("test-executor"); - assertThat(configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM)).isEqualTo(5); - assertThat(configuration.getBoolean(DeploymentOptions.ATTACHED)).isFalse(); + assertThat(configuration.get(DeploymentOptions.TARGET)).isEqualTo("test-executor"); + assertThat(configuration.get(CoreOptions.DEFAULT_PARALLELISM)).isEqualTo(5); + assertThat(configuration.get(DeploymentOptions.ATTACHED)).isFalse(); assertThat(configuration.get(listOption)).isEqualTo(listValue); } @@ -123,6 +123,6 @@ private void testIsActiveHelper(final String executorOption) throws CliArgsExcep final Configuration configuration = cliUnderTest.toConfiguration(commandLine); assertThat(configuration.get(DeploymentOptions.TARGET)).isEqualTo(expectedExecutorName); - assertThat(configuration.getInteger(configOption)).isEqualTo(expectedValue); + assertThat(configuration.get(configOption)).isEqualTo(expectedValue); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientFactory.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientFactory.java index 5830ae9f49ee2..0a8c6a9116f75 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientFactory.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterClientFactory.java @@ -46,7 +46,7 @@ public DummyClusterClientFactory(ClusterClient clusterClient) { @Override public boolean isCompatibleWith(Configuration configuration) { - return ID.equals(configuration.getString(DeploymentOptions.TARGET)); + return ID.equals(configuration.get(DeploymentOptions.TARGET)); } @Override diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java index 1f39c137c5aa6..7478f41ab9720 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java @@ -51,7 +51,7 @@ public void addGeneralOptions(Options baseOptions) { @Override public Configuration toConfiguration(CommandLine commandLine) { final Configuration configuration = new Configuration(); - configuration.setString(DeploymentOptions.TARGET, DummyClusterClientFactory.ID); + configuration.set(DeploymentOptions.TARGET, DummyClusterClientFactory.ID); return configuration; } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java index d34c61e357623..9075b2f53754d 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java @@ -49,7 +49,7 @@ public void init() { @Test public void testStandaloneClusterClientFactoryDiscovery() { final Configuration config = new Configuration(); - config.setString(DeploymentOptions.TARGET, RemoteExecutor.NAME); + config.set(DeploymentOptions.TARGET, RemoteExecutor.NAME); ClusterClientFactory factory = serviceLoaderUnderTest.getClusterClientFactory(config); @@ -59,7 +59,7 @@ public void testStandaloneClusterClientFactoryDiscovery() { @Test public void testFactoryDiscovery() { final Configuration config = new Configuration(); - config.setString(DeploymentOptions.TARGET, VALID_TARGET); + config.set(DeploymentOptions.TARGET, VALID_TARGET); final ClusterClientFactory factory = serviceLoaderUnderTest.getClusterClientFactory(config); @@ -74,7 +74,7 @@ public void testMoreThanOneCompatibleFactoriesException() { assertThatThrownBy( () -> { final Configuration config = new Configuration(); - config.setString(DeploymentOptions.TARGET, AMBIGUOUS_TARGET); + config.set(DeploymentOptions.TARGET, AMBIGUOUS_TARGET); serviceLoaderUnderTest.getClusterClientFactory(config); }) @@ -86,7 +86,7 @@ public void testNoFactoriesFound() { assertThatThrownBy( () -> { final Configuration config = new Configuration(); - config.setString(DeploymentOptions.TARGET, NON_EXISTING_TARGET); + config.set(DeploymentOptions.TARGET, NON_EXISTING_TARGET); final ClusterClientFactory factory = serviceLoaderUnderTest.getClusterClientFactory(config); @@ -101,7 +101,7 @@ public static class ValidClusterClientFactory extends BaseTestingClusterClientFa @Override public boolean isCompatibleWith(Configuration configuration) { - return configuration.getString(DeploymentOptions.TARGET).equals(VALID_TARGET); + return configuration.get(DeploymentOptions.TARGET).equals(VALID_TARGET); } @Nullable @@ -118,7 +118,7 @@ public static class FirstCollidingClusterClientFactory extends BaseTestingCluste @Override public boolean isCompatibleWith(Configuration configuration) { - return configuration.getString(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET); + return configuration.get(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET); } } @@ -130,7 +130,7 @@ public static class SecondCollidingClusterClientFactory @Override public boolean isCompatibleWith(Configuration configuration) { - return configuration.getString(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET); + return configuration.get(DeploymentOptions.TARGET).equals(AMBIGUOUS_TARGET); } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java index b218daa0f509e..3e591162ba91d 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java @@ -99,7 +99,7 @@ void setUp() { plan = env.createProgramPlan(); config = new Configuration(); - config.setString(JobManagerOptions.ADDRESS, "localhost"); + config.set(JobManagerOptions.ADDRESS, "localhost"); config.set( AkkaOptions.ASK_TIMEOUT_DURATION, AkkaOptions.ASK_TIMEOUT_DURATION.defaultValue()); @@ -108,7 +108,7 @@ void setUp() { private Configuration fromPackagedProgram( final PackagedProgram program, final int parallelism, final boolean detached) { final Configuration configuration = new Configuration(); - configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME); + configuration.set(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME); configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism); configuration.set(DeploymentOptions.ATTACHED, !detached); ConfigUtils.encodeCollectionToConfig( @@ -490,13 +490,13 @@ public String getName() { @Override public boolean isCompatibleWith(@Nonnull Configuration configuration) { return TEST_EXECUTOR_NAME.equalsIgnoreCase( - configuration.getString(DeploymentOptions.TARGET)); + configuration.get(DeploymentOptions.TARGET)); } @Override public PipelineExecutor getExecutor(@Nonnull Configuration configuration) { return (pipeline, config, classLoader) -> { - final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM); + final int parallelism = config.get(CoreOptions.DEFAULT_PARALLELISM); final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph( classLoader, plan, config, parallelism); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java index 2c2df7cc56f1a..f8cb80e32b4f2 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/DefaultPackagedProgramRetrieverITCase.java @@ -187,7 +187,7 @@ void testJobGraphRetrieval() throws IOException, FlinkException, ProgramInvocati final JobID jobId = new JobID(); final Configuration configuration = new Configuration(); - configuration.setInteger(CoreOptions.DEFAULT_PARALLELISM, parallelism); + configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism); configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); final String expectedSuffix = "suffix"; @@ -259,7 +259,7 @@ void testSavepointRestoreSettings() SavepointRestoreSettings.forPath("foobar", true); final JobID jobId = new JobID(); - configuration.setString(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); + configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId.toHexString()); SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration); final String expectedSuffix = "suffix"; @@ -525,7 +525,7 @@ private JobGraph retrieveJobGraph( throws FlinkException, ProgramInvocationException, MalformedURLException { final PackagedProgram packagedProgram = retrieverUnderTest.getPackagedProgram(); - final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM); + final int defaultParallelism = configuration.get(CoreOptions.DEFAULT_PARALLELISM); ConfigUtils.encodeCollectionToConfig( configuration, PipelineOptions.JARS, diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java index d6105000a9d46..6d2e6fb842ed2 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ExecutionPlanCreationTest.java @@ -57,8 +57,8 @@ void testGetExecutionPlan() { Configuration config = new Configuration(); - config.setString(JobManagerOptions.ADDRESS, mockJmAddress.getHostName()); - config.setInteger(JobManagerOptions.PORT, mockJmAddress.getPort()); + config.set(JobManagerOptions.ADDRESS, mockJmAddress.getHostName()); + config.set(JobManagerOptions.PORT, mockJmAddress.getPort()); Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientCheckpointTriggerTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientCheckpointTriggerTest.java index d25474f384282..03acdc694ce74 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientCheckpointTriggerTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientCheckpointTriggerTest.java @@ -87,10 +87,10 @@ class RestClusterClientCheckpointTriggerTest { static { final Configuration config = new Configuration(); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10); - config.setLong(RestOptions.RETRY_DELAY, 0); - config.setInteger(RestOptions.PORT, 0); + config.set(JobManagerOptions.ADDRESS, "localhost"); + config.set(RestOptions.RETRY_MAX_ATTEMPTS, 10); + config.set(RestOptions.RETRY_DELAY, 0L); + config.set(RestOptions.PORT, 0); REST_CONFIG = new UnmodifiableConfiguration(config); } @@ -217,7 +217,7 @@ private TestHandler(MessageHeaders headers) { private RestClusterClient createRestClusterClient(final int port) throws Exception { final Configuration clientConfig = new Configuration(REST_CONFIG); - clientConfig.setInteger(RestOptions.PORT, port); + clientConfig.set(RestOptions.PORT, port); return new RestClusterClient<>( clientConfig, new RestClient(REST_CONFIG, executor), diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java index 9dab887516d91..00316ce407596 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientConfigurationTest.java @@ -34,9 +34,9 @@ class RestClusterClientConfigurationTest { @BeforeEach void setUp() throws Exception { final Configuration config = new Configuration(); - config.setLong(RestOptions.AWAIT_LEADER_TIMEOUT, 1); - config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 2); - config.setLong(RestOptions.RETRY_DELAY, 3); + config.set(RestOptions.AWAIT_LEADER_TIMEOUT, 1L); + config.set(RestOptions.RETRY_MAX_ATTEMPTS, 2); + config.set(RestOptions.RETRY_DELAY, 3L); restClusterClientConfiguration = RestClusterClientConfiguration.fromConfiguration(config); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java index 2819231ec9965..817248c2aa61a 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientSavepointTriggerTest.java @@ -91,10 +91,10 @@ class RestClusterClientSavepointTriggerTest { static { final Configuration config = new Configuration(); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10); - config.setLong(RestOptions.RETRY_DELAY, 0); - config.setInteger(RestOptions.PORT, 0); + config.set(JobManagerOptions.ADDRESS, "localhost"); + config.set(RestOptions.RETRY_MAX_ATTEMPTS, 10); + config.set(RestOptions.RETRY_DELAY, 0L); + config.set(RestOptions.PORT, 0); REST_CONFIG = new UnmodifiableConfiguration(config); } @@ -346,7 +346,7 @@ private TestHandler(MessageHeaders headers) { private RestClusterClient createRestClusterClient(final int port) throws Exception { final Configuration clientConfig = new Configuration(REST_CONFIG); - clientConfig.setInteger(RestOptions.PORT, port); + clientConfig.set(RestOptions.PORT, port); return new RestClusterClient<>( clientConfig, new RestClient(REST_CONFIG, executor), diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index e1db5c6d1b0b0..ce8bd8694e044 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -188,10 +188,10 @@ class RestClusterClientTest { static { final Configuration config = new Configuration(); - config.setString(JobManagerOptions.ADDRESS, "localhost"); - config.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 10); - config.setLong(RestOptions.RETRY_DELAY, 0); - config.setInteger(RestOptions.PORT, 0); + config.set(JobManagerOptions.ADDRESS, "localhost"); + config.set(RestOptions.RETRY_MAX_ATTEMPTS, 10); + config.set(RestOptions.RETRY_DELAY, 0L); + config.set(RestOptions.PORT, 0); restConfig = config; } @@ -222,7 +222,7 @@ private RestClusterClient createRestClusterClient(int port) private RestClusterClient createRestClusterClient( int port, Configuration clientConfig) throws Exception { - clientConfig.setInteger(RestOptions.PORT, port); + clientConfig.set(RestOptions.PORT, port); return new RestClusterClient<>( clientConfig, createRestClient(), diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java index 76ee371720c41..72cb5e2f8f591 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java @@ -76,7 +76,7 @@ public SingleThreadMultiplexSourceReaderBase( SourceReaderContext context) { this( new FutureCompletingBlockingQueue<>( - config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)), + config.get(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)), splitReaderSupplier, recordEmitter, config, diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java index 641acf0f86cd1..71dafd85fd0d0 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java @@ -44,7 +44,7 @@ public class SourceReaderOptions { public final int elementQueueCapacity; public SourceReaderOptions(Configuration config) { - this.sourceReaderCloseTimeout = config.getLong(SOURCE_READER_CLOSE_TIMEOUT); - this.elementQueueCapacity = config.getInteger(ELEMENT_QUEUE_CAPACITY); + this.sourceReaderCloseTimeout = config.get(SOURCE_READER_CLOSE_TIMEOUT); + this.elementQueueCapacity = config.get(ELEMENT_QUEUE_CAPACITY); } } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java index 071e214781370..1853eb4439a5b 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReaderTest.java @@ -342,8 +342,8 @@ public static MutableFutureSourceReader createReader(SourceReaderContext readerC new FutureCompletingBlockingQueue<>(); Configuration config = new Configuration(); - config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 2); - config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); + config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 2); + config.set(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); MockSplitReader.Builder builder = MockSplitReader.newBuilder() .setNumRecordsPerSplitPerFetch(2) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java index db3a3fe81561c..47c81e8622fcd 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceRescaleITCase.java @@ -110,13 +110,12 @@ private void resumeCheckpoint(File checkpointDir, File restoreCheckpoint, int p) private StreamExecutionEnvironment createEnv( File checkpointDir, @Nullable File restoreCheckpoint, int p) { Configuration conf = new Configuration(); - conf.setString( - CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); + conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString()); conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb")); if (restoreCheckpoint != null) { conf.set(SavepointConfigOptions.SAVEPOINT_PATH, restoreCheckpoint.toURI().toString()); } - conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, p); + conf.set(TaskManagerOptions.NUM_TASK_SLOTS, p); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 517c624b4803c..9096bac02acd8 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -434,8 +434,8 @@ protected long getNextRecordIndex(MockSourceSplit split) { private Configuration getConfig() { Configuration config = new Configuration(); - config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1); - config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); + config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1); + config.set(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); return config; } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java index ff3e80e87f31b..11bf88f0f37e7 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java @@ -70,8 +70,8 @@ public SourceReader createReader(SourceReaderContext r new FutureCompletingBlockingQueue<>(); Configuration config = new Configuration(); - config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1); - config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); + config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1); + config.set(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); MockSplitReader.Builder builder = MockSplitReader.newBuilder() .setNumRecordsPerSplitPerFetch(2) diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java index 0555774b65636..b61c0cd08dbf7 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/FileSystemTableSink.java @@ -218,7 +218,7 @@ private DataStreamSink createStreamingSink( FileSystemFactory fsFactory = FileSystem::get; RowDataPartitionComputer computer = partitionComputer(); - boolean autoCompaction = tableOptions.getBoolean(AUTO_COMPACTION); + boolean autoCompaction = tableOptions.get(AUTO_COMPACTION); Object writer = createWriter(sinkContext); boolean isEncoder = writer instanceof Encoder; TableBucketAssigner assigner = new TableBucketAssigner(computer); diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionTimeCommitPredicate.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionTimeCommitPredicate.java index 19a09a293d824..43a832abbeb5d 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionTimeCommitPredicate.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/PartitionTimeCommitPredicate.java @@ -59,8 +59,7 @@ public PartitionTimeCommitPredicate( conf.get(PARTITION_TIME_EXTRACTOR_CLASS), conf.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_PATTERN), conf.get(PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER)); - this.watermarkTimeZone = - ZoneId.of(conf.getString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE)); + this.watermarkTimeZone = ZoneId.of(conf.get(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE)); } @Override diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkSpeculativeITCase.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkSpeculativeITCase.java index 948cfc3fe0702..8b9722070ad6c 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkSpeculativeITCase.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/FileSinkSpeculativeITCase.java @@ -140,8 +140,8 @@ private FileSink createFileSink(String path) { private static Configuration configure() { Configuration configuration = new Configuration(); - configuration.setString(RestOptions.BIND_PORT, "0"); - configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L); + configuration.set(RestOptions.BIND_PORT, "0"); + configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L); // for speculative execution configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true); diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java index 2e394a1ff932e..7d58d36ccbca2 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java @@ -381,7 +381,7 @@ public String getBucketId( private Configuration getPartitionCommitTriggerConf(long commitDelay) { Configuration configuration = new Configuration(); - configuration.setString(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file"); + configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file"); configuration.setString(PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER.key(), "yyyy-MM-dd"); configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), "partition-time"); configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay); @@ -391,7 +391,7 @@ private Configuration getPartitionCommitTriggerConf(long commitDelay) { private Configuration getProcTimeCommitTriggerConf(long commitDelay) { Configuration configuration = new Configuration(); - configuration.setString(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file"); + configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file"); configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), "process-time"); configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay); configuration.setString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC"); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java index 779de538f438a..f55fed51d3844 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSink.java @@ -210,7 +210,7 @@ private void validateAutoGatherStatistic( // the table's option "SINK_PARTITION_COMMIT_POLICY_KIND" should contain 'metastore' org.apache.flink.configuration.Configuration flinkConf = org.apache.flink.configuration.Configuration.fromMap(catalogTable.getOptions()); - String policyKind = flinkConf.getString(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND); + String policyKind = flinkConf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND); String[] policyStrings = policyKind.split(","); Arrays.stream(policyStrings) .filter(policy -> policy.equalsIgnoreCase(PartitionCommitPolicy.METASTORE)) @@ -383,7 +383,7 @@ private DataStreamSink createBatchSink( org.apache.flink.configuration.Configuration conf = new org.apache.flink.configuration.Configuration(); catalogTable.getOptions().forEach(conf::setString); - boolean autoCompaction = conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION); + boolean autoCompaction = conf.get(FileSystemConnectorOptions.AUTO_COMPACTION); if (autoCompaction) { Optional compactParallelismOptional = conf.getOptional(FileSystemConnectorOptions.COMPACTION_PARALLELISM); @@ -638,7 +638,7 @@ private DataStreamSink createStreamSink( catalogTable.getOptions().forEach(conf::setString); String commitPolicies = - conf.getString(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND); + conf.get(FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND); if (!getPartitionKeys().isEmpty() && StringUtils.isNullOrWhitespaceOnly(commitPolicies)) { throw new FlinkHiveException( String.format( @@ -648,7 +648,7 @@ private DataStreamSink createStreamSink( FileSystemConnectorOptions.SINK_PARTITION_COMMIT_POLICY_KIND.key())); } - boolean autoCompaction = conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION); + boolean autoCompaction = conf.get(FileSystemConnectorOptions.AUTO_COMPACTION); if (autoCompaction) { fileNamingBuilder.withPartPrefix( convertToUncompacted(fileNamingBuilder.build().getPartPrefix())); diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java index a1dac332c2d82..919313002176f 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java @@ -429,7 +429,7 @@ public HiveContinuousPartitionFetcherContext( case PARTITION_NAME: if (configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET)) { String consumeOffsetStr = - configuration.getString(STREAMING_SOURCE_CONSUME_START_OFFSET); + configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET); consumeStartOffset = (T) consumeOffsetStr; } else { consumeStartOffset = (T) DEFAULT_MIN_NAME_OFFSET; @@ -440,12 +440,12 @@ public HiveContinuousPartitionFetcherContext( case CREATE_TIME: if (configuration.contains(STREAMING_SOURCE_CONSUME_START_OFFSET)) { String consumeOffsetStr = - configuration.getString(STREAMING_SOURCE_CONSUME_START_OFFSET); + configuration.get(STREAMING_SOURCE_CONSUME_START_OFFSET); LocalDateTime localDateTime = DefaultPartTimeExtractor.toLocalDateTime( consumeOffsetStr, - configuration.getString( + configuration.get( PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER)); consumeStartOffset = diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java index 0bce268ed64eb..14c6c4eb76b8c 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java @@ -108,8 +108,8 @@ public void delegationTokensRequiredShouldReturnTrueWhenAllConditionsIsRight( new org.apache.flink.configuration.Configuration(); final Path keyTab = Files.createFile(tmpDir.resolve("test.keytab")); - configuration.setString(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString()); - configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "test@EXAMPLE.COM"); + configuration.set(KERBEROS_LOGIN_KEYTAB, keyTab.toAbsolutePath().toString()); + configuration.set(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, "test@EXAMPLE.COM"); provider.init(configuration); boolean result = provider.delegationTokensRequired(); assertTrue(result); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index 3e7c78966d374..bfb8078b8de0b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -304,7 +304,7 @@ public boolean isPeriodicMaterializeEnabled() { @Internal public void enablePeriodicMaterialize(boolean enabled) { - configuration.setBoolean(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, enabled); + configuration.set(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, enabled); } @Internal diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java index ccfb3a8f799cc..85249e6904267 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java @@ -89,8 +89,8 @@ protected static void loadGlobalConfigParams() { } protected static void loadConfigParameters(Configuration parameters) { - int maxSamples = parameters.getInteger(OptimizerOptions.DELIMITED_FORMAT_MAX_LINE_SAMPLES); - int minSamples = parameters.getInteger(OptimizerOptions.DELIMITED_FORMAT_MIN_LINE_SAMPLES); + int maxSamples = parameters.get(OptimizerOptions.DELIMITED_FORMAT_MAX_LINE_SAMPLES); + int minSamples = parameters.get(OptimizerOptions.DELIMITED_FORMAT_MIN_LINE_SAMPLES); if (maxSamples < 0) { LOG.error( @@ -124,7 +124,7 @@ protected static void loadConfigParameters(Configuration parameters) { DEFAULT_MIN_NUM_SAMPLES = minSamples; } - int maxLen = parameters.getInteger(OptimizerOptions.DELIMITED_FORMAT_MAX_SAMPLE_LEN); + int maxLen = parameters.get(OptimizerOptions.DELIMITED_FORMAT_MAX_SAMPLE_LEN); if (maxLen <= 0) { maxLen = OptimizerOptions.DELIMITED_FORMAT_MAX_SAMPLE_LEN.defaultValue(); LOG.error( diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java index 79933878a92c7..59a7d2ac87b4c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java @@ -75,12 +75,12 @@ public static enum OutputDirectoryMode { * @param configuration The configuration to load defaults from */ public static void initDefaultsFromConfiguration(Configuration configuration) { - final boolean overwrite = configuration.getBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE); + final boolean overwrite = configuration.get(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE); DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE; final boolean alwaysCreateDirectory = - configuration.getBoolean(CoreOptions.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY); + configuration.get(CoreOptions.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY); DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY; diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index c0f37407ef1b2..58cbd620edec4 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -54,12 +54,11 @@ public class ConfigurationUtils { */ public static Optional