From d27c88820dd8ed66dcd39bc488a26c4ccff1a41e Mon Sep 17 00:00:00 2001 From: godfreyhe Date: Wed, 3 Jun 2020 11:33:02 +0800 Subject: [PATCH] [FLINK-18061] [table] TableResult#collect method should return a closeable iterator to avoid resource leak This closes #12473 --- flink-python/pyflink/table/table_result.py | 49 ++++++++++++ .../collect/CollectResultIterator.java | 4 +- .../apache/flink/table/api/TableResult.java | 76 ++++++++++++++++++- .../api/internal/SelectResultProvider.java | 5 +- .../table/api/internal/TableResultImpl.java | 13 ++-- .../planner/sinks/SelectTableSinkBase.java | 6 +- .../apache/flink/table/api/TableITCase.scala | 22 +++++- .../table/sinks/BatchSelectTableSink.java | 8 +- .../table/sinks/StreamSelectTableSink.java | 6 +- 9 files changed, 164 insertions(+), 25 deletions(-) diff --git a/flink-python/pyflink/table/table_result.py b/flink-python/pyflink/table/table_result.py index c611f67e4d831..794bac1a447b8 100644 --- a/flink-python/pyflink/table/table_result.py +++ b/flink-python/pyflink/table/table_result.py @@ -52,6 +52,49 @@ def get_table_schema(self): """ Get the schema of result. + The schema of DDL, USE, SHOW, EXPLAIN: + :: + + +-------------+-------------+----------+ + | column name | column type | comments | + +-------------+-------------+----------+ + | result | STRING | | + +-------------+-------------+----------+ + + The schema of DESCRIBE: + :: + + +------------------+-------------+-------------------------------------------------+ + | column name | column type | comments | + +------------------+-------------+-------------------------------------------------+ + | name | STRING | field name | + +------------------+-------------+-------------------------------------------------+ + | type | STRING | field type expressed as a String | + +------------------+-------------+-------------------------------------------------+ + | null | BOOLEAN | field nullability: true if a field is nullable, | + | | | else false | + +------------------+-------------+-------------------------------------------------+ + | key | BOOLEAN | key constraint: 'PRI' for primary keys, | + | | | 'UNQ' for unique keys, else null | + +------------------+-------------+-------------------------------------------------+ + | computed column | STRING | computed column: string expression | + | | | if a field is computed column, else null | + +------------------+-------------+-------------------------------------------------+ + | watermark | STRING | watermark: string expression if a field is | + | | | watermark, else null | + +------------------+-------------+-------------------------------------------------+ + + The schema of INSERT: (one column per one sink) + :: + + +----------------------------+-------------+-----------------------+ + | column name | column type | comments | + +----------------------------+-------------+-----------------------+ + | (name of the insert table) | BIGINT | the insert table name | + +----------------------------+-------------+-----------------------+ + + The schema of SELECT is the selected field names and types. + :return: The schema of result. :rtype: pyflink.table.TableSchema @@ -63,6 +106,9 @@ def get_result_kind(self): """ Return the ResultKind which represents the result type. + For DDL operation and USE operation, the result kind is always SUCCESS. + For other operations, the result kind is always SUCCESS_WITH_CONTENT. + :return: The result kind. :rtype: pyflink.table.ResultKind @@ -74,6 +120,9 @@ def print(self): """ Print the result contents as tableau form to client console. + NOTE: please make sure the result data to print should be small. + Because all data will be collected to local first, and then print them to console. + .. versionadded:: 1.11.0 """ self._j_table_result.print() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java index bbf41e12732cf..17165400baef0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/collect/CollectResultIterator.java @@ -22,9 +22,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.execution.JobClient; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.util.CloseableIterator; import java.io.IOException; -import java.util.Iterator; import java.util.concurrent.CompletableFuture; /** @@ -32,7 +32,7 @@ * *

NOTE: After using this iterator, the close method MUST be called in order to release job related resources. */ -public class CollectResultIterator implements Iterator, AutoCloseable { +public class CollectResultIterator implements CloseableIterator { private final CollectResultFetcher fetcher; private T bufferedResult; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java index a520e7e53a8f4..e187eb0c58c16 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableResult.java @@ -21,8 +21,8 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.core.execution.JobClient; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; -import java.util.Iterator; import java.util.Optional; /** @@ -40,21 +40,91 @@ public interface TableResult { /** * Get the schema of result. + * + *

The schema of DDL, USE, SHOW, EXPLAIN: + *

+	 * +-------------+-------------+----------+
+	 * | column name | column type | comments |
+	 * +-------------+-------------+----------+
+	 * | result      | STRING      |          |
+	 * +-------------+-------------+----------+
+	 * 
+ * + *

The schema of DESCRIBE: + *

+	 * +------------------+-------------+-----------------------------------------------------------------------------+
+	 * | column name      | column type |                              comments                                       |
+	 * +------------------+-------------+-----------------------------------------------------------------------------+
+	 * | name             | STRING      | field name                                                                  |
+	 * | type             | STRING      | field type expressed as a String                                            |
+	 * | null             | BOOLEAN     | field nullability: true if a field is nullable, else false                  |
+	 * | key              | BOOLEAN     | key constraint: 'PRI' for primary keys, 'UNQ' for unique keys, else null    |
+	 * | computed column  | STRING      | computed column: string expression if a field is computed column, else null |
+	 * | watermark        | STRING      | watermark: string expression if a field is watermark, else null             |
+	 * +------------------+-------------+-----------------------------------------------------------------------------+
+	 * 
+ * + *

The schema of INSERT: (one column per one sink) + *

+	 * +----------------------------+-------------+-----------------------+
+	 * | column name                | column type | comments              |
+	 * +----------------------------+-------------+-----------------------+
+	 * | (name of the insert table) | BIGINT      | the insert table name |
+	 * +----------------------------+-------------+-----------------------+
+	 * 
+ * + *

The schema of SELECT is the selected field names and types. */ TableSchema getTableSchema(); /** * Return the {@link ResultKind} which represents the result type. + * + *

For DDL operation and USE operation, the result kind is always {@link ResultKind#SUCCESS}. + * For other operations, the result kind is always {@link ResultKind#SUCCESS_WITH_CONTENT}. */ ResultKind getResultKind(); /** - * Get the result contents as a row iterator. + * Get the result contents as a closeable row iterator. + * + *

NOTE: + *

+ * + *

Recommended code to call CloseableIterator#close method looks like: + *

{@code
+	 *  TableResult result = tEnv.execute("select ...");
+	 *  // using try-with-resources statement
+	 *  try (CloseableIterator it = result.collect()) {
+	 *      it... // collect same data
+	 *  }
+	 * }
*/ - Iterator collect(); + CloseableIterator collect(); /** * Print the result contents as tableau form to client console. + * + *

NOTE: please make sure the result data to print should be small. + * Because all data will be collected to local first, and then print them to console. */ void print(); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java index 46bf9e1ce0e95..7beeb8471448d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java @@ -21,8 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.core.execution.JobClient; import org.apache.flink.types.Row; - -import java.util.Iterator; +import org.apache.flink.util.CloseableIterator; /** * An internal class which helps the client to get the execute result from a specific sink. @@ -39,5 +38,5 @@ public interface SelectResultProvider { /** * Returns the select result as row iterator. */ - Iterator getResultIterator(); + CloseableIterator getResultIterator(); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java index c3dc1de6e2bac..4461f9d4a58ad 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java @@ -27,6 +27,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.utils.PrintUtils; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; @@ -51,14 +52,14 @@ class TableResultImpl implements TableResult { private final JobClient jobClient; private final TableSchema tableSchema; private final ResultKind resultKind; - private final Iterator data; + private final CloseableIterator data; private final PrintStyle printStyle; private TableResultImpl( @Nullable JobClient jobClient, TableSchema tableSchema, ResultKind resultKind, - Iterator data, + CloseableIterator data, PrintStyle printStyle) { this.jobClient = jobClient; this.tableSchema = Preconditions.checkNotNull(tableSchema, "tableSchema should not be null"); @@ -83,7 +84,7 @@ public ResultKind getResultKind() { } @Override - public Iterator collect() { + public CloseableIterator collect() { return data; } @@ -116,7 +117,7 @@ public static class Builder { private JobClient jobClient = null; private TableSchema tableSchema = null; private ResultKind resultKind = null; - private Iterator data = null; + private CloseableIterator data = null; private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false); private Builder() { @@ -159,7 +160,7 @@ public Builder resultKind(ResultKind resultKind) { * * @param rowIterator a row iterator as the execution result. */ - public Builder data(Iterator rowIterator) { + public Builder data(CloseableIterator rowIterator) { Preconditions.checkNotNull(rowIterator, "rowIterator should not be null"); this.data = rowIterator; return this; @@ -172,7 +173,7 @@ public Builder data(Iterator rowIterator) { */ public Builder data(List rowList) { Preconditions.checkNotNull(rowList, "listRows should not be null"); - this.data = rowList.iterator(); + this.data = CloseableIterator.adapterForIterator(rowList.iterator()); return this; } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java index 707f2ed14763a..aa3b51210de12 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java @@ -37,8 +37,8 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; -import java.util.Iterator; import java.util.UUID; import java.util.stream.Stream; @@ -89,7 +89,7 @@ public void setJobClient(JobClient jobClient) { } @Override - public Iterator getResultIterator() { + public CloseableIterator getResultIterator() { return new RowIteratorWrapper(iterator); } }; @@ -98,7 +98,7 @@ public Iterator getResultIterator() { /** * An Iterator wrapper class that converts Iterator<T> to Iterator<Row>. */ - private class RowIteratorWrapper implements Iterator, AutoCloseable { + private class RowIteratorWrapper implements CloseableIterator { private final CollectResultIterator iterator; public RowIteratorWrapper(CollectResultIterator iterator) { diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableITCase.scala index f301529bb6d24..354fd5fbc6422 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableITCase.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.api +import org.apache.flink.api.common.JobStatus import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.table.api.internal.TableEnvironmentImpl @@ -93,7 +94,10 @@ class TableITCase(tableEnvName: String, isStreaming: Boolean) extends TestLogger Row.of(Integer.valueOf(4), "Peter Smith"), Row.of(Integer.valueOf(6), "Sally Miller"), Row.of(Integer.valueOf(8), "Kelly Williams")) - val actual = Lists.newArrayList(tableResult.collect()) + val it = tableResult.collect() + val actual = Lists.newArrayList(it) + // actively close the job even it is finished + it.close() actual.sort(new util.Comparator[Row]() { override def compare(o1: Row, o2: Row): Int = { o1.getField(0).asInstanceOf[Int].compareTo(o2.getField(0).asInstanceOf[Int]) @@ -102,6 +106,22 @@ class TableITCase(tableEnvName: String, isStreaming: Boolean) extends TestLogger assertEquals(expected, actual) } + @Test + def testCollectWithClose(): Unit = { + val query = + """ + |select id, concat(concat(`first`, ' '), `last`) as `full name` + |from MyTable where mod(id, 2) = 0 + """.stripMargin + val table = tEnv.sqlQuery(query) + val tableResult = table.execute() + assertTrue(tableResult.getJobClient.isPresent) + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind) + val it = tableResult.collect() + it.close() + assertEquals(JobStatus.CANCELED, tableResult.getJobClient.get().getJobStatus().get()) + } + @Test def testExecuteWithUpdateChanges(): Unit = { val tableResult = tEnv.sqlQuery("select count(*) as c from MyTable").execute() diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java index 7785a61080861..d00f007f8a6f8 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java @@ -33,11 +33,11 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; @@ -89,14 +89,14 @@ public void setJobClient(JobClient jobClient) { } @Override - public Iterator getResultIterator() { + public CloseableIterator getResultIterator() { Preconditions.checkNotNull(jobClient, "jobClient is null, please call setJobClient first."); return collectResult(jobClient); } }; } - private Iterator collectResult(JobClient jobClient) { + private CloseableIterator collectResult(JobClient jobClient) { JobExecutionResult jobExecutionResult; try { jobExecutionResult = jobClient.getJobExecutionResult( @@ -115,7 +115,7 @@ private Iterator collectResult(JobClient jobClient) { } catch (IOException | ClassNotFoundException e) { throw new TableException("Failed to deserialize the result.", e); } - return rowList.iterator(); + return CloseableIterator.adapterForIterator(rowList.iterator()); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java index b1fb7ed86d055..68e89a0c60cb9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java @@ -35,8 +35,8 @@ import org.apache.flink.table.api.internal.SelectResultProvider; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; -import java.util.Iterator; import java.util.UUID; /** @@ -92,7 +92,7 @@ public void setJobClient(JobClient jobClient) { } @Override - public Iterator getResultIterator() { + public CloseableIterator getResultIterator() { return new RowIteratorWrapper(iterator); } }; @@ -101,7 +101,7 @@ public Iterator getResultIterator() { /** * An Iterator wrapper class that converts Iterator<Tuple2<Boolean, Row>> to Iterator<Row>. */ - private static class RowIteratorWrapper implements Iterator, AutoCloseable { + private static class RowIteratorWrapper implements CloseableIterator { private final CollectResultIterator> iterator; public RowIteratorWrapper(CollectResultIterator> iterator) { this.iterator = iterator;