Skip to content

Commit

Permalink
[FLINK-18061] [table] TableResult#collect method should return a clos…
Browse files Browse the repository at this point in the history
…eable iterator to avoid resource leak

This closes apache#12473
  • Loading branch information
godfreyhe authored and dawidwys committed Jun 9, 2020
1 parent 8a8d8a9 commit d27c888
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 25 deletions.
49 changes: 49 additions & 0 deletions flink-python/pyflink/table/table_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,49 @@ def get_table_schema(self):
"""
Get the schema of result.
The schema of DDL, USE, SHOW, EXPLAIN:
::
+-------------+-------------+----------+
| column name | column type | comments |
+-------------+-------------+----------+
| result | STRING | |
+-------------+-------------+----------+
The schema of DESCRIBE:
::
+------------------+-------------+-------------------------------------------------+
| column name | column type | comments |
+------------------+-------------+-------------------------------------------------+
| name | STRING | field name |
+------------------+-------------+-------------------------------------------------+
| type | STRING | field type expressed as a String |
+------------------+-------------+-------------------------------------------------+
| null | BOOLEAN | field nullability: true if a field is nullable, |
| | | else false |
+------------------+-------------+-------------------------------------------------+
| key | BOOLEAN | key constraint: 'PRI' for primary keys, |
| | | 'UNQ' for unique keys, else null |
+------------------+-------------+-------------------------------------------------+
| computed column | STRING | computed column: string expression |
| | | if a field is computed column, else null |
+------------------+-------------+-------------------------------------------------+
| watermark | STRING | watermark: string expression if a field is |
| | | watermark, else null |
+------------------+-------------+-------------------------------------------------+
The schema of INSERT: (one column per one sink)
::
+----------------------------+-------------+-----------------------+
| column name | column type | comments |
+----------------------------+-------------+-----------------------+
| (name of the insert table) | BIGINT | the insert table name |
+----------------------------+-------------+-----------------------+
The schema of SELECT is the selected field names and types.
:return: The schema of result.
:rtype: pyflink.table.TableSchema
Expand All @@ -63,6 +106,9 @@ def get_result_kind(self):
"""
Return the ResultKind which represents the result type.
For DDL operation and USE operation, the result kind is always SUCCESS.
For other operations, the result kind is always SUCCESS_WITH_CONTENT.
:return: The result kind.
:rtype: pyflink.table.ResultKind
Expand All @@ -74,6 +120,9 @@ def print(self):
"""
Print the result contents as tableau form to client console.
NOTE: please make sure the result data to print should be small.
Because all data will be collected to local first, and then print them to console.
.. versionadded:: 1.11.0
"""
self._j_table_result.print()
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.util.CloseableIterator;

import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;

/**
* An iterator which iterates through the results of a query job.
*
* <p>NOTE: After using this iterator, the close method MUST be called in order to release job related resources.
*/
public class CollectResultIterator<T> implements Iterator<T>, AutoCloseable {
public class CollectResultIterator<T> implements CloseableIterator<T> {

private final CollectResultFetcher<T> fetcher;
private T bufferedResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.util.Iterator;
import java.util.Optional;

/**
Expand All @@ -40,21 +40,91 @@ public interface TableResult {

/**
* Get the schema of result.
*
* <p>The schema of DDL, USE, SHOW, EXPLAIN:
* <pre>
* +-------------+-------------+----------+
* | column name | column type | comments |
* +-------------+-------------+----------+
* | result | STRING | |
* +-------------+-------------+----------+
* </pre>
*
* <p>The schema of DESCRIBE:
* <pre>
* +------------------+-------------+-----------------------------------------------------------------------------+
* | column name | column type | comments |
* +------------------+-------------+-----------------------------------------------------------------------------+
* | name | STRING | field name |
* | type | STRING | field type expressed as a String |
* | null | BOOLEAN | field nullability: true if a field is nullable, else false |
* | key | BOOLEAN | key constraint: 'PRI' for primary keys, 'UNQ' for unique keys, else null |
* | computed column | STRING | computed column: string expression if a field is computed column, else null |
* | watermark | STRING | watermark: string expression if a field is watermark, else null |
* +------------------+-------------+-----------------------------------------------------------------------------+
* </pre>
*
* <p>The schema of INSERT: (one column per one sink)
* <pre>
* +----------------------------+-------------+-----------------------+
* | column name | column type | comments |
* +----------------------------+-------------+-----------------------+
* | (name of the insert table) | BIGINT | the insert table name |
* +----------------------------+-------------+-----------------------+
* </pre>
*
* <p>The schema of SELECT is the selected field names and types.
*/
TableSchema getTableSchema();

/**
* Return the {@link ResultKind} which represents the result type.
*
* <p>For DDL operation and USE operation, the result kind is always {@link ResultKind#SUCCESS}.
* For other operations, the result kind is always {@link ResultKind#SUCCESS_WITH_CONTENT}.
*/
ResultKind getResultKind();

/**
* Get the result contents as a row iterator.
* Get the result contents as a closeable row iterator.
*
* <p><strong>NOTE:</strong>
* <ul>
* <li>
* For SELECT operation, the job will not be finished unless all result data has been collected.
* So we should actively close the job to avoid resource leak through CloseableIterator#close method.
* Calling CloseableIterator#close method will cancel the job and release related resources.
* </li>
* <li>
* For DML operation, Flink does not support getting the real affected row count now.
* So the affected row count is always -1 (unknown) for every sink, and them will be
* returned after the job is submitted.
* Calling CloseableIterator#close method does not bind to the job.
* Therefore the `CloseableIterator#close` will not cancel the job as in the case of SELECT.
* If you need to cancel the job, you can use the {@link #getJobClient()}.
* </li>
* <li>
* For other operations, no flink job will be submitted ({@link #getJobClient()} is always empty),
* and the result is bounded. Do nothing when calling CloseableIterator#close method.
* </li>
* </ul>
*
* <p>Recommended code to call CloseableIterator#close method looks like:
* <pre>{@code
* TableResult result = tEnv.execute("select ...");
* // using try-with-resources statement
* try (CloseableIterator<Row> it = result.collect()) {
* it... // collect same data
* }
* }</pre>
*/
Iterator<Row> collect();
CloseableIterator<Row> collect();

/**
* Print the result contents as tableau form to client console.
*
* <p><strong>NOTE:</strong> please make sure the result data to print should be small.
* Because all data will be collected to local first, and then print them to console.
*/
void print();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.types.Row;

import java.util.Iterator;
import org.apache.flink.util.CloseableIterator;

/**
* An internal class which helps the client to get the execute result from a specific sink.
Expand All @@ -39,5 +38,5 @@ public interface SelectResultProvider {
/**
* Returns the select result as row iterator.
*/
Iterator<Row> getResultIterator();
CloseableIterator<Row> getResultIterator();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.utils.PrintUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;
Expand All @@ -51,14 +52,14 @@ class TableResultImpl implements TableResult {
private final JobClient jobClient;
private final TableSchema tableSchema;
private final ResultKind resultKind;
private final Iterator<Row> data;
private final CloseableIterator<Row> data;
private final PrintStyle printStyle;

private TableResultImpl(
@Nullable JobClient jobClient,
TableSchema tableSchema,
ResultKind resultKind,
Iterator<Row> data,
CloseableIterator<Row> data,
PrintStyle printStyle) {
this.jobClient = jobClient;
this.tableSchema = Preconditions.checkNotNull(tableSchema, "tableSchema should not be null");
Expand All @@ -83,7 +84,7 @@ public ResultKind getResultKind() {
}

@Override
public Iterator<Row> collect() {
public CloseableIterator<Row> collect() {
return data;
}

Expand Down Expand Up @@ -116,7 +117,7 @@ public static class Builder {
private JobClient jobClient = null;
private TableSchema tableSchema = null;
private ResultKind resultKind = null;
private Iterator<Row> data = null;
private CloseableIterator<Row> data = null;
private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false);

private Builder() {
Expand Down Expand Up @@ -159,7 +160,7 @@ public Builder resultKind(ResultKind resultKind) {
*
* @param rowIterator a row iterator as the execution result.
*/
public Builder data(Iterator<Row> rowIterator) {
public Builder data(CloseableIterator<Row> rowIterator) {
Preconditions.checkNotNull(rowIterator, "rowIterator should not be null");
this.data = rowIterator;
return this;
Expand All @@ -172,7 +173,7 @@ public Builder data(Iterator<Row> rowIterator) {
*/
public Builder data(List<Row> rowList) {
Preconditions.checkNotNull(rowList, "listRows should not be null");
this.data = rowList.iterator();
this.data = CloseableIterator.adapterForIterator(rowList.iterator());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import java.util.Iterator;
import java.util.UUID;
import java.util.stream.Stream;

Expand Down Expand Up @@ -89,7 +89,7 @@ public void setJobClient(JobClient jobClient) {
}

@Override
public Iterator<Row> getResultIterator() {
public CloseableIterator<Row> getResultIterator() {
return new RowIteratorWrapper(iterator);
}
};
Expand All @@ -98,7 +98,7 @@ public Iterator<Row> getResultIterator() {
/**
* An Iterator wrapper class that converts Iterator&lt;T&gt; to Iterator&lt;Row&gt;.
*/
private class RowIteratorWrapper implements Iterator<Row>, AutoCloseable {
private class RowIteratorWrapper implements CloseableIterator<Row> {
private final CollectResultIterator<T> iterator;

public RowIteratorWrapper(CollectResultIterator<T> iterator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.api

import org.apache.flink.api.common.JobStatus
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.table.api.internal.TableEnvironmentImpl
Expand Down Expand Up @@ -93,7 +94,10 @@ class TableITCase(tableEnvName: String, isStreaming: Boolean) extends TestLogger
Row.of(Integer.valueOf(4), "Peter Smith"),
Row.of(Integer.valueOf(6), "Sally Miller"),
Row.of(Integer.valueOf(8), "Kelly Williams"))
val actual = Lists.newArrayList(tableResult.collect())
val it = tableResult.collect()
val actual = Lists.newArrayList(it)
// actively close the job even it is finished
it.close()
actual.sort(new util.Comparator[Row]() {
override def compare(o1: Row, o2: Row): Int = {
o1.getField(0).asInstanceOf[Int].compareTo(o2.getField(0).asInstanceOf[Int])
Expand All @@ -102,6 +106,22 @@ class TableITCase(tableEnvName: String, isStreaming: Boolean) extends TestLogger
assertEquals(expected, actual)
}

@Test
def testCollectWithClose(): Unit = {
val query =
"""
|select id, concat(concat(`first`, ' '), `last`) as `full name`
|from MyTable where mod(id, 2) = 0
""".stripMargin
val table = tEnv.sqlQuery(query)
val tableResult = table.execute()
assertTrue(tableResult.getJobClient.isPresent)
assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind)
val it = tableResult.collect()
it.close()
assertEquals(JobStatus.CANCELED, tableResult.getJobClient.get().getJobStatus().get())
}

@Test
def testExecuteWithUpdateChanges(): Unit = {
val tableResult = tEnv.sqlQuery("select count(*) as c from MyTable").execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -89,14 +89,14 @@ public void setJobClient(JobClient jobClient) {
}

@Override
public Iterator<Row> getResultIterator() {
public CloseableIterator<Row> getResultIterator() {
Preconditions.checkNotNull(jobClient, "jobClient is null, please call setJobClient first.");
return collectResult(jobClient);
}
};
}

private Iterator<Row> collectResult(JobClient jobClient) {
private CloseableIterator<Row> collectResult(JobClient jobClient) {
JobExecutionResult jobExecutionResult;
try {
jobExecutionResult = jobClient.getJobExecutionResult(
Expand All @@ -115,7 +115,7 @@ private Iterator<Row> collectResult(JobClient jobClient) {
} catch (IOException | ClassNotFoundException e) {
throw new TableException("Failed to deserialize the result.", e);
}
return rowList.iterator();
return CloseableIterator.adapterForIterator(rowList.iterator());
}

}
Loading

0 comments on commit d27c888

Please sign in to comment.