-
Notifications
You must be signed in to change notification settings - Fork 13.5k
/
Copy pathenvironment_settings.py
214 lines (167 loc) · 8.6 KB
/
environment_settings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.java_gateway import get_gateway
from pyflink.util.java_utils import create_url_class_loader
from pyflink.common import Configuration
__all__ = ['EnvironmentSettings']
class EnvironmentSettings(object):
"""
Defines all parameters that initialize a table environment. Those parameters are used only
during instantiation of a :class:`~pyflink.table.TableEnvironment` and cannot be changed
afterwards.
Example:
::
>>> EnvironmentSettings.new_instance() \\
... .in_streaming_mode() \\
... .with_built_in_catalog_name("my_catalog") \\
... .with_built_in_database_name("my_database") \\
... .build()
:func:`~EnvironmentSettings.in_streaming_mode` or :func:`~EnvironmentSettings.in_batch_mode`
might be convenient as shortcuts.
"""
class Builder(object):
"""
A builder for :class:`~EnvironmentSettings`.
"""
def __init__(self):
gateway = get_gateway()
self._j_builder = gateway.jvm.EnvironmentSettings.Builder()
def with_configuration(self, config: Configuration) -> 'EnvironmentSettings.Builder':
"""
Creates the EnvironmentSetting with specified Configuration.
:return: EnvironmentSettings.
"""
self._j_builder = self._j_builder.withConfiguration(config._j_configuration)
return self
def in_batch_mode(self) -> 'EnvironmentSettings.Builder':
"""
Sets that the components should work in a batch mode. Streaming mode by default.
:return: This object.
"""
self._j_builder = self._j_builder.inBatchMode()
return self
def in_streaming_mode(self) -> 'EnvironmentSettings.Builder':
"""
Sets that the components should work in a streaming mode. Enabled by default.
:return: This object.
"""
self._j_builder = self._j_builder.inStreamingMode()
return self
def with_built_in_catalog_name(self, built_in_catalog_name: str) \
-> 'EnvironmentSettings.Builder':
"""
Specifies the name of the initial catalog to be created when instantiating
a :class:`~pyflink.table.TableEnvironment`.
This catalog is an in-memory catalog that will be used to store all temporary objects
(e.g. from :func:`~pyflink.table.TableEnvironment.create_temporary_view` or
:func:`~pyflink.table.TableEnvironment.create_temporary_system_function`) that cannot
be persisted because they have no serializable representation.
It will also be the initial value for the current catalog which can be altered via
:func:`~pyflink.table.TableEnvironment.use_catalog`.
Default: "default_catalog".
:param built_in_catalog_name: The specified built-in catalog name.
:return: This object.
"""
self._j_builder = self._j_builder.withBuiltInCatalogName(built_in_catalog_name)
return self
def with_built_in_database_name(self, built_in_database_name: str) \
-> 'EnvironmentSettings.Builder':
"""
Specifies the name of the default database in the initial catalog to be
created when instantiating a :class:`~pyflink.table.TableEnvironment`.
This database is an in-memory database that will be used to store all temporary
objects (e.g. from :func:`~pyflink.table.TableEnvironment.create_temporary_view` or
:func:`~pyflink.table.TableEnvironment.create_temporary_system_function`) that cannot
be persisted because they have no serializable representation.
It will also be the initial value for the current catalog which can be altered via
:func:`~pyflink.table.TableEnvironment.use_catalog`.
Default: "default_database".
:param built_in_database_name: The specified built-in database name.
:return: This object.
"""
self._j_builder = self._j_builder.withBuiltInDatabaseName(built_in_database_name)
return self
def build(self) -> 'EnvironmentSettings':
"""
Returns an immutable instance of EnvironmentSettings.
:return: an immutable instance of EnvironmentSettings.
"""
gateway = get_gateway()
context_classloader = gateway.jvm.Thread.currentThread().getContextClassLoader()
new_classloader = create_url_class_loader([], context_classloader)
gateway.jvm.Thread.currentThread().setContextClassLoader(new_classloader)
return EnvironmentSettings(self._j_builder.build())
def __init__(self, j_environment_settings):
self._j_environment_settings = j_environment_settings
def get_built_in_catalog_name(self) -> str:
"""
Gets the specified name of the initial catalog to be created when instantiating a
:class:`~pyflink.table.TableEnvironment`.
:return: The specified name of the initial catalog to be created.
"""
return self._j_environment_settings.getBuiltInCatalogName()
def get_built_in_database_name(self) -> str:
"""
Gets the specified name of the default database in the initial catalog to be created when
instantiating a :class:`~pyflink.table.TableEnvironment`.
:return: The specified name of the default database in the initial catalog to be created.
"""
return self._j_environment_settings.getBuiltInDatabaseName()
def is_streaming_mode(self) -> bool:
"""
Tells if the :class:`~pyflink.table.TableEnvironment` should work in a batch or streaming
mode.
:return: True if the TableEnvironment should work in a streaming mode, false otherwise.
"""
return self._j_environment_settings.isStreamingMode()
def get_configuration(self) -> Configuration:
"""
Get the underlying `pyflink.common.Configuration`.
:return: Configuration with specified value.
"""
return Configuration(j_configuration=self._j_environment_settings.getConfiguration())
@staticmethod
def new_instance() -> 'EnvironmentSettings.Builder':
"""
Creates a builder for creating an instance of EnvironmentSettings.
:return: A builder of EnvironmentSettings.
"""
return EnvironmentSettings.Builder()
@staticmethod
def in_streaming_mode() -> 'EnvironmentSettings':
"""
Creates a default instance of EnvironmentSettings in streaming execution mode.
In this mode, both bounded and unbounded data streams can be processed.
This method is a shortcut for creating a :class:`~pyflink.table.TableEnvironment` with
little code. Use the builder provided in :func:`EnvironmentSettings.new_instance` for
advanced settings.
:return: EnvironmentSettings.
"""
return EnvironmentSettings.new_instance().in_streaming_mode().build()
@staticmethod
def in_batch_mode() -> 'EnvironmentSettings':
"""
Creates a default instance of EnvironmentSettings in batch execution mode.
This mode is highly optimized for batch scenarios. Only bounded data streams can be
processed in this mode.
This method is a shortcut for creating a :class:`~pyflink.table.TableEnvironment` with
little code. Use the builder provided in :func:`EnvironmentSettings.new_instance` for
advanced settings.
:return: EnvironmentSettings.
"""
return EnvironmentSettings.new_instance().in_batch_mode().build()