Skip to content

Commit

Permalink
[FLINK-13263] [python] Supports explain DAG plan in flink-python
Browse files Browse the repository at this point in the history
This closes apache#9114
  • Loading branch information
godfreyhe authored and sunjincheng121 committed Jul 15, 2019
1 parent d71c4ee commit dce0d0a
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 6 deletions.
14 changes: 10 additions & 4 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,15 +224,21 @@ def list_tables(self):
j_table_name_array = self._j_tenv.listTables()
return [item for item in j_table_name_array]

def explain(self, table):
def explain(self, table=None, extended=False):
"""
Returns the AST of the specified Table API and SQL queries and the execution plan to compute
the result of the given :class:`Table`.
the result of the given :class:`Table` or multi-sinks plan.
:param table: The table to be explained.
:param table: The table to be explained. If table is None, explain for multi-sinks plan,
else for given table.
:param extended: If the plan should contain additional properties.
e.g. estimated cost, traits
:return: The table for which the AST and execution plan will be returned.
"""
return self._j_tenv.explain(table._j_table)
if table is None:
return self._j_tenv.explain(extended)
else:
return self._j_tenv.explain(table._j_table, extended)

def sql_query(self, query):
"""
Expand Down
69 changes: 67 additions & 2 deletions flink-python/pyflink/table/tests/test_table_environment_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@

from pyflink.dataset import ExecutionEnvironment
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table.table_environment import BatchTableEnvironment, StreamTableEnvironment
from pyflink.table import DataTypes, CsvTableSink, StreamTableEnvironment
from pyflink.table.table_config import TableConfig
from pyflink.table.types import DataTypes, RowType
from pyflink.table.table_environment import BatchTableEnvironment
from pyflink.table.types import RowType
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase
from pyflink.util.exceptions import TableException


class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
Expand Down Expand Up @@ -103,6 +105,38 @@ def test_explain(self):

assert isinstance(actual, str) or isinstance(actual, unicode)

def test_explain_with_extended(self):
schema = RowType() \
.add('a', DataTypes.INT()) \
.add('b', DataTypes.STRING()) \
.add('c', DataTypes.STRING())
t_env = self.t_env
t = t_env.from_elements([], schema)
result = t.select("1 + a, b, c")

actual = t_env.explain(result, True)

assert isinstance(actual, str) or isinstance(actual, unicode)

def test_explain_with_multi_sinks(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
field_names = ["a", "b", "c"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
t_env.register_table_sink(
"sink1",
source_sink_utils.TestAppendSink(field_names, field_types))
t_env.register_table_sink(
"sink2",
source_sink_utils.TestAppendSink(field_names, field_types))

t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)

actual = t_env.explain(extended=True)

assert isinstance(actual, str) or isinstance(actual, unicode)

def test_sql_query(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
Expand Down Expand Up @@ -195,6 +229,37 @@ def test_explain(self):

self.assertIsInstance(actual, (str, unicode))

def test_explain_with_extended(self):
schema = RowType() \
.add('a', DataTypes.INT()) \
.add('b', DataTypes.STRING()) \
.add('c', DataTypes.STRING())
t_env = self.t_env
t = t_env.from_elements([], schema)
result = t.select("1 + a, b, c")

actual = t_env.explain(result, True)

assert isinstance(actual, str) or isinstance(actual, unicode)

def test_explain_with_multi_sinks(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
field_names = ["a", "b", "c"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
t_env.register_table_sink(
"sink1",
CsvTableSink(field_names, field_types, "path1"))
t_env.register_table_sink(
"sink2",
CsvTableSink(field_names, field_types, "path2"))

t_env.sql_update("insert into sink1 select * from %s where a > 100" % source)
t_env.sql_update("insert into sink2 select * from %s where a < 100" % source)

with self.assertRaises(TableException):
t_env.explain(extended=True)

def test_table_config(self):

table_config = TableConfig()
Expand Down
8 changes: 8 additions & 0 deletions flink-python/pyflink/util/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ def __str__(self):
return repr(self.msg)


class TableException(JavaException):
"""
General Exception for all errors during table handling.
"""


class CatalogException(JavaException):
"""
A catalog-related exception.
Expand Down Expand Up @@ -106,6 +112,8 @@ class TableNotPartitionedException(JavaException):

# Mapping from JavaException to PythonException
exception_mapping = {
"org.apache.flink.table.api.TableException":
TableException,
"org.apache.flink.table.catalog.exceptions.CatalogException":
CatalogException,
"org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException":
Expand Down

0 comments on commit dce0d0a

Please sign in to comment.