-
Notifications
You must be signed in to change notification settings - Fork 13.5k
/
Copy pathcompiled_plan.py
140 lines (117 loc) · 6.03 KB
/
compiled_plan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pathlib import Path
from typing import Union
from pyflink.java_gateway import get_gateway
from pyflink.table.explain_detail import ExplainDetail
from pyflink.table.table_result import TableResult
from pyflink.util.java_utils import to_j_explain_detail_arr
__all__ = ["CompiledPlan"]
class CompiledPlan(object):
"""
Represents an immutable, fully optimized, and executable entity that has been compiled from a
Table & SQL API pipeline definition. It encodes operators, expressions, functions, data types,
and table connectors.
Every new Flink version might introduce improved optimizer rules, more efficient operators,
and other changes that impact the behavior of previously defined pipelines. In order to ensure
backwards compatibility and enable stateful streaming job upgrades, compiled plans can be
persisted and reloaded across Flink versions. See the website documentation for more
information about provided guarantees during stateful pipeline upgrades.
A plan can be compiled from a SQL query using
:func:`~pyflink.table.TableEnvironment.compile_plan_sql`.
It can be persisted using :func:`~pyflink.table.CompiledPlan.write_to_file` or by manually
extracting the JSON representation with func:`~pyflink.table.CompiledPlan.as_json_string`.
A plan can be loaded back from a file or a string using
:func:`~pyflink.table.TableEnvironment.load_plan` with a :class:`~pyflink.table.PlanReference`.
Instances can be executed using :func:`~pyflink.table.CompiledPlan.execute`.
Depending on the configuration, permanent catalog metadata (such as information about tables
and functions) will be persisted in the plan as well. Anonymous/inline objects will be
persisted (including schema and options) if possible or fail the compilation otherwise.
For temporary objects, only the identifier is part of the plan and the object needs to be
present in the session context during a restore.
JSON encoding is assumed to be the default representation of a compiled plan in all API
endpoints, and is the format used to persist the plan to files by default.
For advanced use cases, :func:`~pyflink.table.CompiledPlan.as_smile_bytes` provides a binary
format representation of the compiled plan.
.. note::
Plan restores assume a stable session context. Configuration, loaded modules and
catalogs, and temporary objects must not change. Schema evolution and changes of function
signatures are not supported.
"""
def __init__(self, j_compiled_plan, t_env):
self._j_compiled_plan = j_compiled_plan
self._t_env = t_env
def __str__(self) -> str:
return self._j_compiled_plan.toString()
def as_json_string(self) -> str:
"""
Convert the plan to a JSON string representation.
"""
return self._j_compiled_plan.asJsonString()
def as_smile_bytes(self) -> bytes:
"""
Convert the plan to a Smile binary representation.
"""
return self._j_compiled_plan.asSmileBytes()
def write_to_file(self, file: Union[str, Path], ignore_if_exists: bool = False):
"""
Writes this plan to a file using the JSON representation.
:param ignore_if_exists: If a plan exists in the given file path and this flag is true,
no operation is executed and the plan is not overwritten. An exception is thrown
otherwise.
:raises TableException: if the file cannot be written or if ``ignore_if_exists`` is false
and a plan already exists.
"""
self._j_compiled_plan.writeToFile(str(file), ignore_if_exists)
def get_flink_version(self) -> str:
"""
Returns the Flink version used to compile the plan.
"""
return str(self._j_compiled_plan.getFlinkVersion())
def print_json_string(self):
"""
Like :func:`~pyflink.table.CompiledPlan.as_json_string`, but prints the result to the
client console.
.. versionadded:: 2.1.0
"""
self._j_compiled_plan.printJsonString()
def execute(self) -> TableResult:
"""
Executes the compiled plan.
"""
self._t_env._before_execute()
return TableResult(self._j_compiled_plan.execute())
def explain(self, *extra_details: ExplainDetail) -> str:
"""
Returns the AST and the execution plan of the compiled plan.
:param extra_details: The extra explain details which the explain result should include,
e.g. estimated cost, changelog mode for streaming
:return: AST and execution plans
"""
gateway = get_gateway()
j_extra_details = to_j_explain_detail_arr(extra_details)
return self._j_compiled_plan.explain(
gateway.jvm.org.apache.flink.table.api.ExplainFormat.TEXT, j_extra_details
)
def print_explain(self, *extra_details: ExplainDetail):
"""
Like :func:`~pyflink.table.CompiledPlan.explain`, but prints the result to the client
console.
.. versionadded:: 2.1.0
"""
print(self.explain(*extra_details))