-
Notifications
You must be signed in to change notification settings - Fork 13.5k
/
Copy pathtable_result.py
231 lines (183 loc) · 9.08 KB
/
table_result.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Optional
from py4j.java_gateway import get_method
from pyflink.common.types import RowKind
from pyflink.common import Row
from pyflink.common.job_client import JobClient
from pyflink.java_gateway import get_gateway
from pyflink.table.catalog import ResolvedSchema
from pyflink.table.result_kind import ResultKind
from pyflink.table.table_schema import TableSchema
from pyflink.table.types import _from_java_data_type
from pyflink.table.utils import pickled_bytes_to_python_converter
__all__ = ['TableResult', 'CloseableIterator']
class TableResult(object):
"""
A :class:`~pyflink.table.TableResult` is the representation of the statement execution result.
.. versionadded:: 1.11.0
"""
def __init__(self, j_table_result):
self._j_table_result = j_table_result
def get_job_client(self) -> Optional[JobClient]:
"""
For DML and DQL statement, return the JobClient which associates the submitted Flink job.
For other statements (e.g. DDL, DCL) return empty.
:return: The job client, optional.
:rtype: pyflink.common.JobClient
.. versionadded:: 1.11.0
"""
job_client = self._j_table_result.getJobClient()
if job_client.isPresent():
return JobClient(job_client.get())
else:
return None
def wait(self, timeout_ms: int = None):
"""
Wait if necessary for at most the given time (milliseconds) for the data to be ready.
For a select operation, this method will wait until the first row can be accessed locally.
For an insert operation, this method will wait for the job to finish,
because the result contains only one row.
For other operations, this method will return immediately,
because the result is already available locally.
.. versionadded:: 1.12.0
"""
if timeout_ms:
TimeUnit = get_gateway().jvm.java.util.concurrent.TimeUnit
get_method(self._j_table_result, "await")(timeout_ms, TimeUnit.MILLISECONDS)
else:
get_method(self._j_table_result, "await")()
def get_table_schema(self) -> TableSchema:
"""
Returns the schema of the result.
:return: The schema of the result.
:rtype: pyflink.table.TableSchema
.. versionadded:: 1.11.0
.. deprecated:: 2.1.0
This function has been deprecated as part of FLIP-164.
:class:`~pyflink.table.table_schema.TableSchema` has been replaced by two more
dedicated classes :class:`~pyflink.table.Schema` and
:class:`~pyflink.table.catalog.ResolvedSchema`. Use :class:`~pyflink.table.Schema` for
declaration in APIs. :class:`~pyflink.table.catalog.ResolvedSchema` is offered by the
framework after resolution and validation.
"""
return TableSchema(j_table_schema=self._get_java_table_schema())
def get_resolved_schema(self) -> ResolvedSchema:
"""
Returns the schema of the result.
:return: The schema of the result.
:rtype: pyflink.table.catalog.ResolvedSchema
.. versionadded:: 2.1.0
"""
return ResolvedSchema(j_resolved_schema=self._j_table_result.getResolvedSchema())
def get_result_kind(self) -> ResultKind:
"""
Return the ResultKind which represents the result type.
For DDL operation and USE operation, the result kind is always SUCCESS.
For other operations, the result kind is always SUCCESS_WITH_CONTENT.
:return: The result kind.
.. versionadded:: 1.11.0
"""
return ResultKind._from_j_result_kind(self._j_table_result.getResultKind())
def collect(self) -> 'CloseableIterator':
"""
Get the result contents as a closeable row iterator.
Note:
For SELECT operation, the job will not be finished unless all result data has been
collected. So we should actively close the job to avoid resource leak through
CloseableIterator#close method. Calling CloseableIterator#close method will cancel the job
and release related resources.
For DML operation, Flink does not support getting the real affected row count now. So the
affected row count is always -1 (unknown) for every sink, and them will be returned until
the job is finished.
Calling CloseableIterator#close method will cancel the job.
For other operations, no flink job will be submitted (get_job_client() is always empty), and
the result is bounded. Do noting when calling CloseableIterator#close method.
Recommended code to call CloseableIterator#close method looks like:
>>> table_result = t_env.execute("select ...")
>>> with table_result.collect() as results:
>>> for result in results:
>>> ...
In order to fetch result to local, you can call either collect() and print(). But, they can
not be called both on the same TableResult instance.
:return: A CloseableIterator.
.. versionadded:: 1.12.0
"""
field_data_types = self._get_java_table_schema().getFieldDataTypes()
j_iter = self._j_table_result.collect()
return CloseableIterator(j_iter, field_data_types)
def print(self):
"""
Print the result contents as tableau form to client console.
This method has slightly different behaviors under different checkpointing settings.
- For batch jobs or streaming jobs without checkpointing,
this method has neither exactly-once nor at-least-once guarantee.
Query results are immediately accessible by the clients once they're produced,
but exceptions will be thrown when the job fails and restarts.
- For streaming jobs with exactly-once checkpointing,
this method guarantees an end-to-end exactly-once record delivery.
A result will be accessible by clients only after its corresponding checkpoint
completes.
- For streaming jobs with at-least-once checkpointing,
this method guarantees an end-to-end at-least-once record delivery.
Query results are immediately accessible by the clients once they're produced,
but it is possible for the same result to be delivered multiple times.
.. versionadded:: 1.11.0
"""
self._j_table_result.print()
def _get_java_table_schema(self):
TableSchema = get_gateway().jvm.org.apache.flink.table.legacy.api.TableSchema
return TableSchema.fromResolvedSchema(self._j_table_result.getResolvedSchema())
class CloseableIterator(object):
"""
Representing an Iterator that is also auto closeable.
"""
def __init__(self, j_closeable_iterator, field_data_types):
self._j_closeable_iterator = j_closeable_iterator
self._j_field_data_types = field_data_types
self._data_types = [_from_java_data_type(j_field_data_type)
for j_field_data_type in self._j_field_data_types]
def __iter__(self):
return self
def __next__(self):
if not self._j_closeable_iterator.hasNext():
raise StopIteration("No more data.")
gateway = get_gateway()
pickle_bytes = gateway.jvm.PythonBridgeUtils. \
getPickledBytesFromRow(self._j_closeable_iterator.next(),
self._j_field_data_types)
row_kind = RowKind(int.from_bytes(pickle_bytes[0], byteorder='big', signed=False))
pickle_bytes = list(pickle_bytes[1:])
field_data = zip(pickle_bytes, self._data_types)
fields = []
for data, field_type in field_data:
if len(data) == 0:
fields.append(None)
else:
fields.append(pickled_bytes_to_python_converter(data, field_type))
result_row = Row(*fields)
result_row.set_row_kind(row_kind)
return result_row
def next(self):
return self.__next__()
def close(self):
self._j_closeable_iterator.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()