Skip to content

Commit

Permalink
[FLINK-16367][table] Introduce TableEnvironment#createStatementSet api
Browse files Browse the repository at this point in the history
This closes apache#12042
  • Loading branch information
godfreyhe authored May 10, 2020
1 parent 5bf121a commit 55d04d5
Show file tree
Hide file tree
Showing 16 changed files with 897 additions and 86 deletions.
6 changes: 5 additions & 1 deletion flink-python/pyflink/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
user-defined function is executed, such as the metric group, and global job parameters, etc.
- :class:`pyflink.table.ScalarFunction`
Base interface for user-defined scalar function.
- :class:`pyflink.table.StatementSet`
Base interface accepts DML statements or Tables.
"""
from __future__ import absolute_import

Expand All @@ -71,6 +73,7 @@
from pyflink.table.table_schema import TableSchema
from pyflink.table.udf import FunctionContext, ScalarFunction
from pyflink.table.explain_detail import ExplainDetail
from pyflink.table.statement_set import StatementSet

__all__ = [
'TableEnvironment',
Expand All @@ -95,5 +98,6 @@
'FunctionContext',
'ScalarFunction',
'SqlDialect',
'ExplainDetail'
'ExplainDetail',
'StatementSet'
]
93 changes: 93 additions & 0 deletions flink-python/pyflink/table/statement_set.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from pyflink.util.utils import to_j_explain_detail_arr

__all__ = ['StatementSet']


class StatementSet(object):
"""
A StatementSet accepts DML statements or Tables,
the planner can optimize all added statements and Tables together
and then submit as one job.
.. note::
The added statements and Tables will be cleared
when calling the `execute` method.
"""

def __init__(self, _j_statement_set):
self._j_statement_set = _j_statement_set

def add_insert_sql(self, stmt):
"""
add insert statement to the set.
:param stmt: The statement to be added.
:type stmt: str
:return: current StatementSet instance.
:rtype: pyflink.table.StatementSet
"""
self._j_statement_set.addInsertSql(stmt)
return self

def add_insert(self, target_path, table, overwrite=False):
"""
add Table with the given sink table name to the set.
:param target_path: The path of the registered :class:`~pyflink.table.TableSink` to which
the :class:`~pyflink.table.Table` is written.
:type target_path: str
:param table: The Table to add.
:type table: pyflink.table.Table
:param overwrite: The flag that indicates whether the insert
should overwrite existing data or not.
:type overwrite: bool
:return: current StatementSet instance.
:rtype: pyflink.table.StatementSet
"""
self._j_statement_set.addInsert(target_path, table._j_table, overwrite)
return self

def explain(self, *extra_details):
"""
returns the AST and the execution plan of all statements and Tables.
:param extra_details: The extra explain details which the explain result should include,
e.g. estimated cost, changelog mode for streaming
:type extra_details: tuple[ExplainDetail] (variable-length arguments of ExplainDetail)
:return: All statements and Tables for which the AST and execution plan will be returned.
:rtype: str
"""
j_extra_details = to_j_explain_detail_arr(extra_details)
return self._j_statement_set.explain(j_extra_details)

def execute(self):
"""
execute all statements and Tables as a batch.
.. note::
The added statements and Tables will be cleared when executing this method.
:return: execution result.
"""
# TODO convert java TableResult to python TableResult once FLINK-17303 is finished
return self._j_statement_set.execute()
13 changes: 13 additions & 0 deletions flink-python/pyflink/table/table_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from pyflink.serializers import BatchedSerializer, PickleSerializer
from pyflink.table.catalog import Catalog
from pyflink.table.serializers import ArrowSerializer
from pyflink.table.statement_set import StatementSet
from pyflink.table.table_config import TableConfig
from pyflink.table.descriptors import StreamTableDescriptor, BatchTableDescriptor

Expand Down Expand Up @@ -524,6 +525,18 @@ def execute_sql(self, stmt):
"""
return self._j_tenv.executeSql(stmt)

def create_statement_set(self):
"""
Create a StatementSet instance which accepts DML statements or Tables,
the planner can optimize all added statements and Tables together
and then submit as one job.
:return statement_set instance
:rtype: pyflink.table.StatementSet
"""
_j_statement_set = self._j_tenv.createStatementSet()
return StatementSet(_j_statement_set)

def sql_update(self, stmt):
"""
Evaluates a SQL statement such as INSERT, UPDATE or DELETE or a DDL statement
Expand Down
40 changes: 40 additions & 0 deletions flink-python/pyflink/table/tests/test_table_environment_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,26 @@ def test_insert_into(self):
expected = ['1,Hi,Hello']
self.assert_equals(actual, expected)

def test_statement_set(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
field_names = ["a", "b", "c"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
t_env.register_table_sink(
"sink1",
source_sink_utils.TestAppendSink(field_names, field_types))
t_env.register_table_sink(
"sink2",
source_sink_utils.TestAppendSink(field_names, field_types))

stmt_set = t_env.create_statement_set()

stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)\
.add_insert("sink2", source.filter("a < 100"), False)

actual = stmt_set.explain(ExplainDetail.CHANGELOG_MODE)
assert isinstance(actual, str)

def test_explain_with_multi_sinks(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
Expand Down Expand Up @@ -431,6 +451,26 @@ def test_explain_with_multi_sinks(self):

assert isinstance(actual, str)

def test_statement_set(self):
t_env = self.t_env
source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])
field_names = ["a", "b", "c"]
field_types = [DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING()]
t_env.register_table_sink(
"sink1",
CsvTableSink(field_names, field_types, "path1"))
t_env.register_table_sink(
"sink2",
CsvTableSink(field_names, field_types, "path2"))

stmt_set = t_env.create_statement_set()

stmt_set.add_insert_sql("insert into sink1 select * from %s where a > 100" % source)\
.add_insert("sink2", source.filter("a < 100"))

actual = stmt_set.explain()
assert isinstance(actual, str)

def test_create_table_environment(self):
table_config = TableConfig()
table_config.set_max_generated_code_length(32000)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.api;

import org.apache.flink.annotation.PublicEvolving;

/**
* A {@link StatementSet} accepts DML statements or {@link Table}s,
* the planner can optimize all added statements and Tables together
* and then submit as one job.
*
* <p>The added statements and Tables will be cleared
* when calling the `execute` method.
*/
@PublicEvolving
public interface StatementSet {

/**
* add insert statement to the set.
*/
StatementSet addInsertSql(String statement);

/**
* add Table with the given sink table name to the set.
*/
StatementSet addInsert(String targetPath, Table table);

/**
* add {@link Table} with the given sink table name to the set.
*/
StatementSet addInsert(String targetPath, Table table, boolean overwrite);

/**
* returns the AST and the execution plan to compute the result of the
* all statements and Tables.
*
* @param extraDetails The extra explain details which the explain result should include,
* e.g. estimated cost, changelog mode for streaming
* @return AST and the execution plan.
*/
String explain(ExplainDetail... extraDetails);

/**
* execute all statements and Tables as a batch.
*
* <p>The added statements and Tables will be cleared when executing this method.
*/
TableResult execute();
}
Original file line number Diff line number Diff line change
Expand Up @@ -1132,4 +1132,11 @@ default Table fromValues(DataType rowType, Object... values) {
* @throws Exception which occurs during job execution.
*/
JobExecutionResult execute(String jobName) throws Exception;

/**
* Create a {@link StatementSet} instance which accepts DML statements or Tables,
* the planner can optimize all added statements and Tables together
* and then submit as one job.
*/
StatementSet createStatementSet();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.api.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
* Implementation for {@link StatementSet}.
*/
@Internal
class StatementSetImpl implements StatementSet {
private final TableEnvironmentInternal tableEnvironment;
private List<ModifyOperation> operations = new ArrayList<>();

protected StatementSetImpl(TableEnvironmentInternal tableEnvironment) {
this.tableEnvironment = tableEnvironment;
}

@Override
public StatementSet addInsertSql(String statement) {
List<Operation> operations = tableEnvironment.getParser().parse(statement);

if (operations.size() != 1) {
throw new TableException("Only single statement is supported.");
}

Operation operation = operations.get(0);
if (operation instanceof ModifyOperation) {
this.operations.add((ModifyOperation) operation);
} else {
throw new TableException("Only insert statement is supported now.");
}
return this;
}

@Override
public StatementSet addInsert(String targetPath, Table table) {
return addInsert(targetPath, table, false);
}

@Override
public StatementSet addInsert(String targetPath, Table table, boolean overwrite) {
UnresolvedIdentifier unresolvedIdentifier = tableEnvironment.getParser().parseIdentifier(targetPath);
ObjectIdentifier objectIdentifier = tableEnvironment.getCatalogManager()
.qualifyIdentifier(unresolvedIdentifier);

operations.add(new CatalogSinkModifyOperation(
objectIdentifier,
table.getQueryOperation(),
Collections.emptyMap(),
overwrite,
Collections.emptyMap()));

return this;
}

@Override
public String explain(ExplainDetail... extraDetails) {
List<Operation> operationList = operations.stream().map(o -> (Operation) o).collect(Collectors.toList());
return tableEnvironment.explainInternal(operationList, extraDetails);
}

@Override
public TableResult execute() {
try {
return tableEnvironment.executeInternal(operations);
} finally {
operations.clear();
}
}
}
Loading

0 comments on commit 55d04d5

Please sign in to comment.