diff --git a/flink-python/pyflink/common/__init__.py b/flink-python/pyflink/common/__init__.py index 6bf412d55bd4f..fcea204c20ce7 100644 --- a/flink-python/pyflink/common/__init__.py +++ b/flink-python/pyflink/common/__init__.py @@ -22,27 +22,19 @@ - :class:`ExecutionConfig`: A config to define the behavior of the program execution. """ -from pyflink.common.completable_future import CompletableFuture from pyflink.common.configuration import Configuration from pyflink.common.execution_config import ExecutionConfig from pyflink.common.execution_mode import ExecutionMode from pyflink.common.input_dependency_constraint import InputDependencyConstraint -from pyflink.common.job_client import JobClient from pyflink.common.job_execution_result import JobExecutionResult -from pyflink.common.job_id import JobID -from pyflink.common.job_status import JobStatus from pyflink.common.restart_strategy import RestartStrategies, RestartStrategyConfiguration __all__ = [ - 'CompletableFuture', 'Configuration', 'ExecutionConfig', 'ExecutionMode', 'InputDependencyConstraint', - 'JobClient', 'JobExecutionResult', - 'JobID', - 'JobStatus', 'RestartStrategies', 'RestartStrategyConfiguration', ] diff --git a/flink-python/pyflink/common/completable_future.py b/flink-python/pyflink/common/completable_future.py deleted file mode 100644 index 266ea2601f757..0000000000000 --- a/flink-python/pyflink/common/completable_future.py +++ /dev/null @@ -1,63 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from asyncio import Future - -__all__ = ['CompletableFuture'] - - -class CompletableFuture(Future): - """ - A Future that may be explicitly completed (setting its value and status), supporting dependent - functions and actions that trigger upon its completion. - - When two or more threads attempt to set_result, set_exception, or cancel a CompletableFuture, - only one of them succeeds. - """ - - def __init__(self, j_completable_future, py_class=None): - super().__init__() - self._j_completable_future = j_completable_future - self._py_class = py_class - - def cancel(self): - return self._j_completable_future.cancel(True) - - def cancelled(self): - return self._j_completable_future.isCancelled() - - def done(self): - return self._j_completable_future.isDone() - - def result(self): - if self._py_class is None: - return self._j_completable_future.get() - else: - return self._py_class(self._j_completable_future.get()) - - def exception(self): - return self._exception - - def set_result(self, result): - return self._j_completable_future.complete(result) - - def set_exception(self, exception): - self._exception = exception - return self._j_completable_future.completeExceptionally(exception) - - def __str__(self): - return self._j_completable_future.toString() diff --git a/flink-python/pyflink/common/execution_mode.py b/flink-python/pyflink/common/execution_mode.py index f3d9966a7b73e..936f9aab619fe 100644 --- a/flink-python/pyflink/common/execution_mode.py +++ b/flink-python/pyflink/common/execution_mode.py @@ -83,7 +83,7 @@ def _from_j_execution_mode(j_execution_mode): elif j_execution_mode == JExecutionMode.BATCH_FORCED: return ExecutionMode.BATCH_FORCED else: - raise Exception("Unsupported java execution mode: %s" % j_execution_mode) + raise Exception("Unsupported java exection mode: %s" % j_execution_mode) @staticmethod def _to_j_execution_mode(execution_mode): diff --git a/flink-python/pyflink/common/job_client.py b/flink-python/pyflink/common/job_client.py deleted file mode 100644 index e4a1f396bd521..0000000000000 --- a/flink-python/pyflink/common/job_client.py +++ /dev/null @@ -1,114 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from pyflink.common.completable_future import CompletableFuture -from pyflink.common.job_execution_result import JobExecutionResult -from pyflink.common.job_id import JobID -from pyflink.common.job_status import JobStatus - -__all__ = ['JobClient'] - - -class JobClient(object): - """ - A client that is scoped to a specific job. - """ - - def __init__(self, j_job_client): - self._j_job_client = j_job_client - - def get_job_id(self): - """ - Returns the JobID that uniquely identifies the job this client is scoped to. - - :return: JobID, or null if the job has been executed on a runtime without JobIDs - or if the execution failed. - """ - return JobID(self._j_job_client.getJobID()) - - def get_job_status(self): - """ - Requests the JobStatus of the associated job. - - :return: A CompletableFuture containing the JobStatus of the associated job. - :rtype: pyflink.common.CompletableFuture - """ - return CompletableFuture(self._j_job_client.getJobStatus(), JobStatus) - - def cancel(self): - """ - Cancels the associated job. - - :return: A CompletableFuture for canceling the associated job. - :rtype: pyflink.common.CompletableFuture - """ - return CompletableFuture(self._j_job_client.cancel()) - - def stop_with_savepoint(self, advance_to_end_of_event_time, savepoint_directory): - """ - Stops the associated job on Flink cluster. - - Stopping works only for streaming programs. Be aware, that the job might continue to run - for a while after sending the stop command, because after sources stopped to emit data all - operators need to finish processing. - - :param advance_to_end_of_event_time: Flag indicating if the source should inject a - MAX_WATERMARK in the pipeline. - :type advance_to_end_of_event_time: bool - :param savepoint_directory: Directory the savepoint should be written to. - :type savepoint_directory: str - :return: A CompletableFuture containing the path where the savepoint is located. - :rtype: pyflink.common.CompletableFuture - """ - return CompletableFuture( - self._j_job_client.stopWithSavepoint(advance_to_end_of_event_time, savepoint_directory), - str) - - def trigger_savepoint(self, savepoint_directory): - """ - Triggers a savepoint for the associated job. The savepoint will be written to the given - savepoint directory. - - :param savepoint_directory: Directory the savepoint should be written to. - :type savepoint_directory: str - :return: A CompletableFuture containing the path where the savepoint is located. - :rtype: pyflink.common.CompletableFuture - """ - return CompletableFuture(self._j_job_client.triggerSavepoint(savepoint_directory), str) - - def get_accumulators(self, class_loader): - """ - Requests the accumulators of the associated job. Accumulators can be requested while it - is running or after it has finished. The class loader is used to deserialize the incoming - accumulator results. - - :param class_loader: Class loader used to deserialize the incoming accumulator results. - :return: A CompletableFuture containing the accumulators of the associated job. - :rtype: pyflink.common.CompletableFuture - """ - return CompletableFuture(self._j_job_client.getAccumulators(class_loader), dict) - - def get_job_execution_result(self, user_class_loader): - """ - Returns the JobExecutionResult result of the job execution of the submitted job. - - :param user_class_loader: Class loader used to deserialize the accumulators of the job. - :return: A CompletableFuture containing the JobExecutionResult result of the job execution. - :rtype: pyflink.common.CompletableFuture - """ - return CompletableFuture(self._j_job_client.getJobExecutionResult(user_class_loader), - JobExecutionResult) diff --git a/flink-python/pyflink/common/job_execution_result.py b/flink-python/pyflink/common/job_execution_result.py index 26ea95ec9fe41..24926bc667920 100644 --- a/flink-python/pyflink/common/job_execution_result.py +++ b/flink-python/pyflink/common/job_execution_result.py @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from pyflink.common.job_id import JobID __all__ = ['JobExecutionResult'] @@ -36,7 +35,7 @@ def get_job_id(self): :return: JobID, or null if the job has been executed on a runtime without JobIDs or if the execution failed. """ - return JobID(self._j_job_execution_result.getJobID()) + return self._j_job_execution_result.getJobID() def is_job_execution_result(self): """ diff --git a/flink-python/pyflink/common/job_id.py b/flink-python/pyflink/common/job_id.py deleted file mode 100644 index 49c1349b749e1..0000000000000 --- a/flink-python/pyflink/common/job_id.py +++ /dev/null @@ -1,35 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -__all__ = ['JobID'] - - -class JobID(object): - """ - Unique (at least statistically unique) identifier for a Flink Job. Jobs in Flink correspond - to dataflow graphs. - - Jobs act simultaneously as sessions, because jobs can be created and submitted incrementally - in different parts. Newer fragments of a graph can be attached to existing graphs, thereby - extending the current data flow graphs. - """ - - def __init__(self, j_job_id): - self._j_job_id = j_job_id - - def __str__(self): - return self._j_job_id.toString() diff --git a/flink-python/pyflink/common/job_status.py b/flink-python/pyflink/common/job_status.py deleted file mode 100644 index bbd4b3350903d..0000000000000 --- a/flink-python/pyflink/common/job_status.py +++ /dev/null @@ -1,167 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from pyflink.java_gateway import get_gateway - -__all__ = ['JobStatus'] - - -class JobStatus(object): - """ - Possible states of a job once it has been accepted by the job manager. - - :data:`CREATED`: - - Job is newly created, no task has started to run. - - :data:`RUNNING`: - - Some tasks are scheduled or running, some may be pending, some may be finished. - - :data:`FAILING`: - - The job has failed and is currently waiting for the cleanup to complete. - - :data:`FAILED`: - - The job has failed with a non-recoverable task failure. - - :data:`CANCELLING`: - - Job is being cancelled. - - :data:`CANCELED`: - - Job has been cancelled. - - :data:`FINISHED`: - - All of the job's tasks have successfully finished. - - :data:`RESTARTING`: - - The job is currently undergoing a reset and total restart. - - :data:`SUSPENDED`: - - The job has been suspended which means that it has been stopped but not been removed from a - potential HA job store. - - :data:`RECONCILING`: - - The job is currently reconciling and waits for task execution report to recover state. - """ - - CREATED = 0 - RUNNING = 1 - FAILING = 2 - FAILED = 3 - CANCELLING = 4 - CANCELED = 5 - FINISHED = 6 - RESTARTING = 7 - SUSPENDED = 8 - RECONCILING = 9 - - def __init__(self, j_job_status) -> None: - super().__init__() - self._j_job_status = j_job_status - - def is_globally_terminal_state(self): - """ - Checks whether this state is globally terminal. A globally terminal job - is complete and cannot fail any more and will not be restarted or recovered by another - standby master node. - - When a globally terminal state has been reached, all recovery data for the job is - dropped from the high-availability services. - - :return: ``True`` if this job status is globally terminal, ``False`` otherwise. - """ - return self._j_job_status.isGloballyTerminalState() - - def is_terminal_state(self): - """ - Checks whether this state is locally terminal. Locally terminal refers to the - state of a job's execution graph within an executing JobManager. If the execution graph - is locally terminal, the JobManager will not continue executing or recovering the job. - - The only state that is locally terminal, but not globally terminal is SUSPENDED, - which is typically entered when the executing JobManager looses its leader status. - - :return: ``True`` if this job status is terminal, ``False`` otherwise. - """ - return self._j_job_status.isTerminalState() - - @staticmethod - def _from_j_job_status(j_job_status): - gateway = get_gateway() - JJobStatus = gateway.jvm.org.apache.flink.api.common.JobStatus - if j_job_status == JJobStatus.CREATED: - return JobStatus.CREATED - elif j_job_status == JJobStatus.RUNNING: - return JobStatus.RUNNING - elif j_job_status == JJobStatus.FAILING: - return JobStatus.FAILING - elif j_job_status == JJobStatus.FAILED: - return JobStatus.FAILED - elif j_job_status == JJobStatus.CANCELLING: - return JobStatus.CANCELLING - elif j_job_status == JJobStatus.CANCELED: - return JobStatus.CANCELED - elif j_job_status == JJobStatus.FINISHED: - return JobStatus.FINISHED - elif j_job_status == JJobStatus.RESTARTING: - return JobStatus.RESTARTING - elif j_job_status == JJobStatus.SUSPENDED: - return JobStatus.SUSPENDED - elif j_job_status == JJobStatus.RECONCILING: - return JobStatus.RECONCILING - else: - raise Exception("Unsupported java job status: %s" % j_job_status) - - @staticmethod - def _to_j_job_status(job_status): - gateway = get_gateway() - JJobStatus = gateway.jvm.org.apache.flink.api.common.JobStatus - if job_status == JobStatus.CREATED: - return JJobStatus.CREATED - elif job_status == JobStatus.RUNNING: - return JJobStatus.RUNNING - elif job_status == JobStatus.FAILING: - return JJobStatus.FAILING - elif job_status == JobStatus.FAILED: - return JJobStatus.FAILED - elif job_status == JobStatus.CANCELLING: - return JJobStatus.CANCELLING - elif job_status == JobStatus.CANCELED: - return JJobStatus.CANCELED - elif job_status == JobStatus.FINISHED: - return JJobStatus.FINISHED - elif job_status == JobStatus.RESTARTING: - return JJobStatus.RESTARTING - elif job_status == JobStatus.SUSPENDED: - return JJobStatus.SUSPENDED - elif job_status == JobStatus.RECONCILING: - return JJobStatus.RECONCILING - else: - raise TypeError("Unsupported job status: %s, supported job statuses are: " - "JobStatus.CREATED, JobStatus.RUNNING, " - "JobStatus.FAILING, JobStatus.FAILED, " - "JobStatus.CANCELLING, JobStatus.CANCELED, " - "JobStatus.FINISHED, JobStatus.RESTARTING, " - "JobStatus.SUSPENDED and JobStatus.RECONCILING." % job_status) diff --git a/flink-python/pyflink/table/__init__.py b/flink-python/pyflink/table/__init__.py index 12bdf73e38964..010434ce5fb94 100644 --- a/flink-python/pyflink/table/__init__.py +++ b/flink-python/pyflink/table/__init__.py @@ -61,47 +61,43 @@ from __future__ import absolute_import from pyflink.table.environment_settings import EnvironmentSettings -from pyflink.table.explain_detail import ExplainDetail -from pyflink.table.result_kind import ResultKind -from pyflink.table.sinks import CsvTableSink, TableSink, WriteMode -from pyflink.table.sources import CsvTableSource, TableSource from pyflink.table.sql_dialect import SqlDialect -from pyflink.table.statement_set import StatementSet -from pyflink.table.table import GroupWindowedTable, GroupedTable, OverWindowedTable, Table, \ +from pyflink.table.table import Table, GroupedTable, GroupWindowedTable, OverWindowedTable, \ WindowGroupedTable from pyflink.table.table_config import TableConfig from pyflink.table.table_environment import (TableEnvironment, StreamTableEnvironment, BatchTableEnvironment) -from pyflink.table.table_result import TableResult -from pyflink.table.table_schema import TableSchema +from pyflink.table.sinks import TableSink, CsvTableSink, WriteMode +from pyflink.table.sources import TableSource, CsvTableSource from pyflink.table.types import DataTypes, UserDefinedType, Row +from pyflink.table.table_schema import TableSchema from pyflink.table.udf import FunctionContext, ScalarFunction +from pyflink.table.explain_detail import ExplainDetail +from pyflink.table.statement_set import StatementSet __all__ = [ + 'TableEnvironment', + 'StreamTableEnvironment', 'BatchTableEnvironment', - 'CsvTableSink', - 'CsvTableSource', - 'DataTypes', 'EnvironmentSettings', - 'ExplainDetail', - 'FunctionContext', - 'GroupWindowedTable', + 'Table', 'GroupedTable', + 'GroupWindowedTable', 'OverWindowedTable', - 'ResultKind', - 'Row', - 'ScalarFunction', - 'SqlDialect', - 'StatementSet', - 'StreamTableEnvironment', - 'Table', + 'WindowGroupedTable', 'TableConfig', - 'TableEnvironment', - 'TableResult', - 'TableSchema', 'TableSink', 'TableSource', + 'WriteMode', + 'CsvTableSink', + 'CsvTableSource', + 'DataTypes', 'UserDefinedType', - 'WindowGroupedTable', - 'WriteMode' + 'Row', + 'TableSchema', + 'FunctionContext', + 'ScalarFunction', + 'SqlDialect', + 'ExplainDetail', + 'StatementSet' ] diff --git a/flink-python/pyflink/table/result_kind.py b/flink-python/pyflink/table/result_kind.py deleted file mode 100644 index 68c324dfff66e..0000000000000 --- a/flink-python/pyflink/table/result_kind.py +++ /dev/null @@ -1,49 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from pyflink.java_gateway import get_gateway - -__all__ = ['ResultKind'] - - -class ResultKind(object): - """ - ResultKind defines the types of the result. - - :data:`SUCCESS`: - - The statement (e.g. DDL, USE) executes successfully, and the result only contains a simple "OK". - - :data:`SUCCESS_WITH_CONTENT`: - - The statement (e.g. DML, DQL, SHOW) executes successfully, and the result contains important - content. - """ - - SUCCESS = 0 - SUCCESS_WITH_CONTENT = 1 - - @staticmethod - def _from_j_result_kind(j_result_kind): - gateway = get_gateway() - JResultKind = gateway.jvm.org.apache.flink.table.api.ResultKind - if j_result_kind == JResultKind.SUCCESS: - return ResultKind.SUCCESS - elif j_result_kind == JResultKind.SUCCESS_WITH_CONTENT: - return ResultKind.SUCCESS_WITH_CONTENT - else: - raise Exception("Unsupported Java result kind: %s" % j_result_kind) diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py index a31edca0b6f23..af2e7f5f17eb7 100644 --- a/flink-python/pyflink/table/table_environment.py +++ b/flink-python/pyflink/table/table_environment.py @@ -34,7 +34,6 @@ from pyflink.java_gateway import get_gateway from pyflink.table import Table -from pyflink.table.table_result import TableResult from pyflink.table.types import _to_java_type, _create_type_verifier, RowType, DataType, \ _infer_schema_from_data, _create_converter, from_arrow_type, RowField, create_arrow_schema from pyflink.util import utils @@ -532,7 +531,8 @@ def execute_sql(self, stmt): the affected row count for `DML` (-1 means unknown), or a string message ("OK") for other statements. """ - return TableResult(self._j_tenv.executeSql(stmt)) + # TODO convert java TableResult to python TableResult once FLINK-17303 is finished + return self._j_tenv.executeSql(stmt) def create_statement_set(self): """ diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py deleted file mode 100644 index 507142498cef1..0000000000000 --- a/flink-python/pyflink/table/table_result.py +++ /dev/null @@ -1,69 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from pyflink.common.job_client import JobClient -from pyflink.table.result_kind import ResultKind -from pyflink.table.table_schema import TableSchema - -__all__ = ['TableResult'] - - -class TableResult(object): - """ - A :class:`~pyflink.table.TableResult` is the representation of the statement execution result. - """ - - def __init__(self, j_table_result): - self._j_table_result = j_table_result - - def get_job_client(self): - """ - For DML and DQL statement, return the JobClient which associates the submitted Flink job. - For other statements (e.g. DDL, DCL) return empty. - - :return: The job client, optional. - :rtype: pyflink.common.JobClient - """ - job_client = self._j_table_result.getJobClient() - if job_client.isPresent(): - return JobClient(job_client.get()) - else: - return None - - def get_table_schema(self): - """ - Get the schema of result. - - :return: The schema of result. - :rtype: pyflink.table.TableSchema - """ - return TableSchema(j_table_schema=self._j_table_result.getTableSchema()) - - def get_result_kind(self): - """ - Return the ResultKind which represents the result type. - - :return: The result kind. - :rtype: pyflink.table.ResultKind - """ - return ResultKind._from_j_result_kind(self._j_table_result.getResultKind()) - - def print(self): - """ - Print the result contents as tableau form to client console. - """ - self._j_table_result.print() diff --git a/flink-python/pyflink/table/tests/test_sql.py b/flink-python/pyflink/table/tests/test_sql.py index daedaec433efa..61de9b121d694 100644 --- a/flink-python/pyflink/table/tests/test_sql.py +++ b/flink-python/pyflink/table/tests/test_sql.py @@ -21,8 +21,10 @@ import unittest from pyflink.find_flink_home import _find_flink_source_root + from pyflink.java_gateway import get_gateway -from pyflink.table import DataTypes, ResultKind + +from pyflink.table import DataTypes from pyflink.testing import source_sink_utils from pyflink.testing.test_case_utils import PyFlinkStreamTableTestCase, PyFlinkBatchTableTestCase @@ -56,62 +58,6 @@ def test_sql_query(self): expected = ['2,Hi,Hello', '3,Hello,Hello'] self.assert_equals(actual, expected) - def test_execute_sql(self): - t_env = self.t_env - table_result = t_env.execute_sql("create table tbl" - "(" - " a bigint," - " b int," - " c varchar" - ") with (" - " 'connector' = 'COLLECTION'," - " 'is-bounded' = 'false'" - ")") - self.assertIsNone(table_result.get_job_client()) - self.assertIsNotNone(table_result.get_table_schema()) - self.assertEquals(table_result.get_table_schema().get_field_names(), ["result"]) - self.assertIsNotNone(table_result.get_result_kind()) - self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS) - table_result.print() - - table_result = t_env.execute_sql("alter table tbl set ('k1' = 'a', 'k2' = 'b')") - self.assertIsNone(table_result.get_job_client()) - self.assertIsNotNone(table_result.get_table_schema()) - self.assertEquals(table_result.get_table_schema().get_field_names(), ["result"]) - self.assertIsNotNone(table_result.get_result_kind()) - self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS) - table_result.print() - - field_names = ["k1", "k2", "c"] - field_types = [DataTypes.BIGINT(), DataTypes.INT(), DataTypes.STRING()] - t_env.register_table_sink( - "sinks", - source_sink_utils.TestAppendSink(field_names, field_types)) - table_result = t_env.execute_sql("insert into sinks select * from tbl") - self.assertIsNotNone(table_result.get_job_client()) - self.assertIsNotNone(table_result.get_table_schema()) - self.assertEquals(table_result.get_table_schema().get_field_names(), - ["default_catalog.default_database.sinks"]) - self.assertIsNotNone(table_result.get_result_kind()) - self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS_WITH_CONTENT) - job_status = table_result.get_job_client().get_job_status().result() - self.assertFalse(job_status.is_globally_terminal_state()) - self.assertFalse(job_status.is_terminal_state()) - job_execution_result = table_result.get_job_client().get_job_execution_result( - get_gateway().jvm.Thread.currentThread().getContextClassLoader()).result() - self.assertIsNotNone(job_execution_result.get_job_id()) - self.assertIsNotNone(job_execution_result.get_job_execution_result()) - self.assertTrue(job_execution_result.is_job_execution_result()) - table_result.print() - - table_result = t_env.execute_sql("drop table tbl") - self.assertIsNone(table_result.get_job_client()) - self.assertIsNotNone(table_result.get_table_schema()) - self.assertEquals(table_result.get_table_schema().get_field_names(), ["result"]) - self.assertIsNotNone(table_result.get_result_kind()) - self.assertEqual(table_result.get_result_kind(), ResultKind.SUCCESS) - table_result.print() - def test_sql_update(self): t_env = self.t_env source = t_env.from_elements([(1, "Hi", "Hello"), (2, "Hello", "Hello")], ["a", "b", "c"])