-
Notifications
You must be signed in to change notification settings - Fork 13.5k
/
Copy pathconfig_options.py
124 lines (100 loc) · 5.13 KB
/
config_options.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import TypeVar, Generic
from pyflink.java_gateway import get_gateway
T = TypeVar('T')
__all__ = ['ConfigOptions', 'ConfigOption']
class ConfigOptions(object):
"""
{@code ConfigOptions} are used to build a :class:`~pyflink.common.ConfigOption`. The option is
typically built in one of the following patterns:
Example:
::
# simple string-valued option with a default value
>>> ConfigOptions.key("tmp.dir").string_type().default_value("/tmp")
# simple integer-valued option with a default value
>>> ConfigOptions.key("application.parallelism").int_type().default_value(100)
# option with no default value
>>> ConfigOptions.key("user.name").string_type().no_default_value()
"""
def __init__(self, j_config_options):
self._j_config_options = j_config_options
@staticmethod
def key(key: str):
"""
Starts building a new ConfigOption.
:param key: The key for the config option.
:return: The builder for the config option with the given key.
"""
gateway = get_gateway()
j_option_builder = gateway.jvm.org.apache.flink.configuration.ConfigOptions.key(key)
return ConfigOptions.OptionBuilder(j_option_builder)
class OptionBuilder(object):
def __init__(self, j_option_builder):
self._j_option_builder = j_option_builder
def boolean_type(self) -> 'ConfigOptions.TypedConfigOptionBuilder[bool]':
"""
Defines that the value of the option should be of bool type.
"""
return ConfigOptions.TypedConfigOptionBuilder(self._j_option_builder.booleanType())
def int_type(self) -> 'ConfigOptions.TypedConfigOptionBuilder[int]':
"""
Defines that the value of the option should be of int type
(from -2,147,483,648 to 2,147,483,647).
"""
return ConfigOptions.TypedConfigOptionBuilder(self._j_option_builder.intType())
def long_type(self) -> 'ConfigOptions.TypedConfigOptionBuilder[int]':
"""
Defines that the value of the option should be of int type
(from -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807).
"""
return ConfigOptions.TypedConfigOptionBuilder(self._j_option_builder.longType())
def float_type(self) -> 'ConfigOptions.TypedConfigOptionBuilder[float]':
"""
Defines that the value of the option should be of float type
(4-byte single precision floating point number).
"""
return ConfigOptions.TypedConfigOptionBuilder(self._j_option_builder.floatType())
def double_type(self) -> 'ConfigOptions.TypedConfigOptionBuilder[float]':
"""
Defines that the value of the option should be of float Double} type
(8-byte double precision floating point number).
"""
return ConfigOptions.TypedConfigOptionBuilder(self._j_option_builder.doubleType())
def string_type(self) -> 'ConfigOptions.TypedConfigOptionBuilder[str]':
"""
Defines that the value of the option should be of str type.
"""
return ConfigOptions.TypedConfigOptionBuilder(self._j_option_builder.stringType())
class TypedConfigOptionBuilder(Generic[T]):
def __init__(self, j_typed_config_option_builder):
self._j_typed_config_option_builder = j_typed_config_option_builder
def default_value(self, value: T) -> 'ConfigOption[T]':
return ConfigOption(self._j_typed_config_option_builder.defaultValue(value))
def no_default_value(self) -> 'ConfigOption[str]':
return ConfigOption(self._j_typed_config_option_builder.noDefaultValue())
class ConfigOption(Generic[T]):
"""
A {@code ConfigOption} describes a configuration parameter. It encapsulates the configuration
key, deprecated older versions of the key, and an optional default value for the configuration
parameter.
{@code ConfigOptions} are built via the ConfigOptions class. Once created, a config
option is immutable.
"""
def __init__(self, j_config_option):
self._j_config_option = j_config_option