diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectTableSink.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java similarity index 69% rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectTableSink.java rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java index bd724006bf079..46bf9e1ce0e95 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectTableSink.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/SelectResultProvider.java @@ -19,25 +19,18 @@ package org.apache.flink.table.api.internal; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.table.sinks.TableSink; import org.apache.flink.types.Row; import java.util.Iterator; /** - * An internal special {@link TableSink} to collect the select query result to local client. + * An internal class which helps the client to get the execute result from a specific sink. + * + *

This class is generated by specific sink and brings the result info to a TableResult. */ @Internal -public interface SelectTableSink extends TableSink { - - default TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { - // disable this to make sure there is only one SelectTableSink instance - // so that the instance can be shared within optimizing and executing in client - throw new UnsupportedOperationException(); - } - +public interface SelectResultProvider { /** * Set the job client associated with the select job to retrieve the result. */ diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 82f83f1112f10..3fbeff2aad54e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -84,13 +84,13 @@ import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.SelectSinkOperation; import org.apache.flink.table.operations.ShowCatalogsOperation; import org.apache.flink.table.operations.ShowDatabasesOperation; import org.apache.flink.table.operations.ShowFunctionsOperation; import org.apache.flink.table.operations.ShowTablesOperation; import org.apache.flink.table.operations.ShowViewsOperation; import org.apache.flink.table.operations.TableSourceQueryOperation; -import org.apache.flink.table.operations.UnregisteredSinkModifyOperation; import org.apache.flink.table.operations.UseCatalogOperation; import org.apache.flink.table.operations.UseDatabaseOperation; import org.apache.flink.table.operations.ddl.AddPartitionsOperation; @@ -689,24 +689,20 @@ public TableResult executeInternal(List operations) { @Override public TableResult executeInternal(QueryOperation operation) { - TableSchema tableSchema = operation.getTableSchema(); - SelectTableSink tableSink = planner.createSelectTableSink(tableSchema); - UnregisteredSinkModifyOperation sinkOperation = new UnregisteredSinkModifyOperation<>( - tableSink, - operation - ); + SelectSinkOperation sinkOperation = new SelectSinkOperation(operation); List> transformations = translate(Collections.singletonList(sinkOperation)); Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, "collect"); try { JobClient jobClient = execEnv.executeAsync(pipeline); - tableSink.setJobClient(jobClient); + SelectResultProvider resultProvider = sinkOperation.getSelectResultProvider(); + resultProvider.setJobClient(jobClient); return TableResultImpl.builder() .jobClient(jobClient) .resultKind(ResultKind.SUCCESS_WITH_CONTENT) - .tableSchema(tableSchema) - .data(tableSink.getResultIterator()) + .tableSchema(operation.getTableSchema()) + .data(resultProvider.getResultIterator()) .setPrintStyle(TableResultImpl.PrintStyle.tableau( - PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN)) + PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, isStreamingMode)) .build(); } catch (Exception e) { throw new TableException("Failed to execute sql", e); @@ -1092,7 +1088,7 @@ private TableResult buildResult(String[] headers, DataType[] types, Object[][] r headers, types).build()) .data(Arrays.stream(rows).map(Row::of).collect(Collectors.toList())) - .setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, "")) + .setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, "", false)) .build(); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java index 5c82f5eb83539..c3dc1de6e2bac 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableResultImpl.java @@ -93,8 +93,9 @@ public void print() { if (printStyle instanceof TableauStyle) { int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth(); String nullColumn = ((TableauStyle) printStyle).getNullColumn(); + boolean printRowKind = ((TableauStyle) printStyle).isPrintRowKind(); PrintUtils.printAsTableauForm( - getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn); + getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn, printRowKind); } else if (printStyle instanceof RawContentStyle) { while (it.hasNext()) { System.out.println(String.join(",", PrintUtils.rowToString(it.next()))); @@ -116,7 +117,7 @@ public static class Builder { private TableSchema tableSchema = null; private ResultKind resultKind = null; private Iterator data = null; - private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN); + private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false); private Builder() { } @@ -197,13 +198,13 @@ public TableResult build() { */ public interface PrintStyle { /** - * Create a tableau print style with given max column width and null column, + * Create a tableau print style with given max column width, null column and change mode indicator, * which prints the result schema and content as tableau form. */ - static PrintStyle tableau(int maxColumnWidth, String nullColumn) { + static PrintStyle tableau(int maxColumnWidth, String nullColumn, boolean printRowKind) { Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0"); Preconditions.checkNotNull(nullColumn, "nullColumn should not be null"); - return new TableauStyle(maxColumnWidth, nullColumn); + return new TableauStyle(maxColumnWidth, nullColumn, printRowKind); } /** @@ -223,10 +224,12 @@ private static final class TableauStyle implements PrintStyle { private final int maxColumnWidth; private final String nullColumn; + private final boolean printRowKind; - private TableauStyle(int maxColumnWidth, String nullColumn) { + private TableauStyle(int maxColumnWidth, String nullColumn, boolean printRowKind) { this.maxColumnWidth = maxColumnWidth; this.nullColumn = nullColumn; + this.printRowKind = printRowKind; } int getMaxColumnWidth() { @@ -236,6 +239,10 @@ int getMaxColumnWidth() { String getNullColumn() { return nullColumn; } + + public boolean isPrintRowKind() { + return printRowKind; + } } /** diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java index 06f35d1d085d0..fb3b940b72b19 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java @@ -21,8 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.dag.Transformation; import org.apache.flink.table.api.ExplainDetail; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectTableSink; import org.apache.flink.table.operations.ModifyOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; @@ -76,14 +74,6 @@ public interface Planner { */ List> translate(List modifyOperations); - /** - * Creates a {@link SelectTableSink} for a select query. - * - * @param tableSchema the table schema of select result. - * @return The {@link SelectTableSink} for the select query. - */ - SelectTableSink createSelectTableSink(TableSchema tableSchema); - /** * Returns the AST of the specified Table API and SQL queries and the execution plan * to compute the result of the given collection of {@link QueryOperation}s. diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java index 63dd1564f6d15..9f34f02fa1492 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java @@ -31,4 +31,6 @@ public interface ModifyOperationVisitor { T visit(OutputConversionModifyOperation outputConversion); T visit(UnregisteredSinkModifyOperation unregisteredSink); + + T visit(SelectSinkOperation selectOperation); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SelectSinkOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SelectSinkOperation.java new file mode 100644 index 0000000000000..a50462cf53a2f --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/SelectSinkOperation.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.SelectResultProvider; + +import java.util.Collections; +import java.util.HashMap; + +/** + * Special, internal kind of {@link ModifyOperation} that + * collects the content of {@link QueryOperation} to local. + */ +@Internal +public class SelectSinkOperation implements ModifyOperation { + + private final QueryOperation child; + // help the client to get the execute result from a specific sink. + private SelectResultProvider resultProvider; + + public SelectSinkOperation(QueryOperation child) { + this.child = child; + } + + public void setSelectResultProvider(SelectResultProvider resultProvider) { + this.resultProvider = resultProvider; + } + + public SelectResultProvider getSelectResultProvider() { + return resultProvider; + } + + @Override + public QueryOperation getChild() { + return child; + } + + @Override + public T accept(ModifyOperationVisitor visitor) { + return visitor.visit(this); + } + + @Override + public String asSummaryString() { + return OperationUtils.formatWithChildren( + "SelectSink", + new HashMap<>(), + Collections.singletonList(child), + Operation::asSummaryString); + } + +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java index 82b3e949a13dc..42b5403c06da2 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java @@ -20,8 +20,6 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.table.api.ExplainDetail; -import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectTableSink; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.operations.ModifyOperation; @@ -44,11 +42,6 @@ public List> translate(List modifyOperations) return null; } - @Override - public SelectTableSink createSelectTableSink(TableSchema tableSchema) { - return null; - } - @Override public String explain(List operations, ExplainDetail... extraDetails) { return null; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java index 7916fa14d87c7..261ac1289907a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/PrintUtils.java @@ -19,6 +19,7 @@ package org.apache.flink.table.utils; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; @@ -31,6 +32,8 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; /** * Utilities for print formatting. @@ -58,44 +61,51 @@ private PrintUtils() { * | (NULL) | (NULL) | (NULL) | * +-------------+---------+-------------+ * 3 rows in result - * - *

Changelog is not supported until FLINK-16998 is finished. */ public static void printAsTableauForm( TableSchema tableSchema, Iterator it, PrintWriter printWriter) { - printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN); + printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN, false); } /** * Displays the result in a tableau form. * - *

For example: - * +-------------+---------+-------------+ - * | boolean_col | int_col | varchar_col | - * +-------------+---------+-------------+ - * | true | 1 | abc | - * | false | 2 | def | - * | (NULL) | (NULL) | (NULL) | - * +-------------+---------+-------------+ - * 3 rows in result - * - *

Changelog is not supported until FLINK-16998 is finished. + *

For example: (printRowKind is true) + * +-------------+-------------+---------+-------------+ + * | row_kind | boolean_col | int_col | varchar_col | + * +-------------+-------------+---------+-------------+ + * | +I | true | 1 | abc | + * | -U | false | 2 | def | + * | +U | false | 3 | def | + * | -D | (NULL) | (NULL) | (NULL) | + * +-------------+-------------+---------+-------------+ + * 4 rows in result */ public static void printAsTableauForm( TableSchema tableSchema, Iterator it, PrintWriter printWriter, int maxColumnWidth, - String nullColumn) { + String nullColumn, + boolean printRowKind) { List rows = new ArrayList<>(); // fill field names first - List columns = tableSchema.getTableColumns(); + final List columns; + if (printRowKind) { + columns = Stream.concat( + Stream.of(TableColumn.of("row_kind", DataTypes.STRING())), + tableSchema.getTableColumns().stream() + ).collect(Collectors.toList()); + } else { + columns = tableSchema.getTableColumns(); + } + rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new)); while (it.hasNext()) { - rows.add(rowToString(it.next(), nullColumn)); + rows.add(rowToString(it.next(), nullColumn, printRowKind)); } int[] colWidths = columnWidthsByContent(columns, rows, maxColumnWidth); @@ -124,20 +134,24 @@ public static void printAsTableauForm( } public static String[] rowToString(Row row) { - return rowToString(row, NULL_COLUMN); + return rowToString(row, NULL_COLUMN, false); } - public static String[] rowToString(Row row, String nullColumn) { - final String[] fields = new String[row.getArity()]; + public static String[] rowToString(Row row, String nullColumn, boolean printRowKind) { + final int len = printRowKind ? row.getArity() + 1 : row.getArity(); + final List fields = new ArrayList<>(len); + if (printRowKind) { + fields.add(row.getKind().shortString()); + } for (int i = 0; i < row.getArity(); i++) { final Object field = row.getField(i); if (field == null) { - fields[i] = nullColumn; + fields.add(nullColumn); } else { - fields[i] = StringUtils.arrayAwareToString(field); + fields.add(StringUtils.arrayAwareToString(field)); } } - return fields; + return fields.toArray(new String[0]); } private static int[] columnWidthsByContent( diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java index 47fd62a00941e..83ef01aa690d1 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/utils/PrintUtilsTest.java @@ -21,6 +21,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import org.junit.Test; @@ -91,6 +92,24 @@ public void testPrintWithEmptyResult() { outContent.toString()); } + @Test + public void testPrintWithEmptyResultAndRowKind() { + PrintUtils.printAsTableauForm( + getSchema(), + Collections.emptyList().iterator(), + new PrintWriter(outContent), + PrintUtils.MAX_COLUMN_WIDTH, + "", + true); + + assertEquals( + "+----------+---------+-----+--------+---------+----------------+-----------+\n" + + "| row_kind | boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" + + "+----------+---------+-----+--------+---------+----------------+-----------+\n" + + "0 row in set" + System.lineSeparator(), + outContent.toString()); + } + @Test public void testPrintWithMultipleRows() { PrintUtils.printAsTableauForm( @@ -118,6 +137,36 @@ public void testPrintWithMultipleRows() { outContent.toString()); } + @Test + public void testPrintWithMultipleRowsAndRowKind() { + PrintUtils.printAsTableauForm( + getSchema(), + getData().iterator(), + new PrintWriter(outContent), + PrintUtils.MAX_COLUMN_WIDTH, + "", + true); + + // note: the expected result may look irregular because every CJK(Chinese/Japanese/Korean) character's + // width < 2 in IDE by default, every CJK character usually's width is 2, you can open this source file + // by vim or just cat the file to check the regular result. + assertEquals( + "+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n" + + "| row_kind | boolean | int | bigint | varchar | decimal(10, 5) | timestamp |\n" + + "+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n" + + "| +I | | 1 | 2 | abc | 1.23 | 2020-03-01 18:39:14.0 |\n" + + "| +I | false | | 0 | | 1 | 2020-03-01 18:39:14.1 |\n" + + "| -D | true | 2147483647 | | abcdefg | 1234567890 | 2020-03-01 18:39:14.12 |\n" + + "| +I | false | -2147483648 | 9223372036854775807 | | 12345.06789 | 2020-03-01 18:39:14.123 |\n" + + "| +I | true | 100 | -9223372036854775808 | abcdefg111 | | 2020-03-01 18:39:14.123456 |\n" + + "| -U | | -1 | -1 | abcdefghijklmnopqrstuvwxyz | -12345.06789 | |\n" + + "| +U | | -1 | -1 | 这是一段中文 | -12345.06789 | 2020-03-04 18:39:14.0 |\n" + + "| -D | | -1 | -1 | これは日本語をテストするた... | -12345.06789 | 2020-03-04 18:39:14.0 |\n" + + "+----------+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+\n" + + "8 rows in set" + System.lineSeparator(), + outContent.toString()); + } + private TableSchema getSchema() { return TableSchema.builder() .field("boolean", DataTypes.BOOLEAN()) @@ -131,7 +180,8 @@ private TableSchema getSchema() { private List getData() { List data = new ArrayList<>(); - data.add(Row.of( + data.add(Row.ofKind( + RowKind.INSERT, null, 1, 2, @@ -139,7 +189,8 @@ private List getData() { BigDecimal.valueOf(1.23), Timestamp.valueOf("2020-03-01 18:39:14")) ); - data.add(Row.of( + data.add(Row.ofKind( + RowKind.INSERT, false, null, 0, @@ -147,7 +198,8 @@ private List getData() { BigDecimal.valueOf(1), Timestamp.valueOf("2020-03-01 18:39:14.1")) ); - data.add(Row.of( + data.add(Row.ofKind( + RowKind.DELETE, true, Integer.MAX_VALUE, null, @@ -155,7 +207,8 @@ private List getData() { BigDecimal.valueOf(1234567890), Timestamp.valueOf("2020-03-01 18:39:14.12")) ); - data.add(Row.of( + data.add(Row.ofKind( + RowKind.INSERT, false, Integer.MIN_VALUE, Long.MAX_VALUE, @@ -163,7 +216,8 @@ private List getData() { BigDecimal.valueOf(12345.06789), Timestamp.valueOf("2020-03-01 18:39:14.123")) ); - data.add(Row.of( + data.add(Row.ofKind( + RowKind.INSERT, true, 100, Long.MIN_VALUE, @@ -171,7 +225,8 @@ private List getData() { null, Timestamp.valueOf("2020-03-01 18:39:14.123456")) ); - data.add(Row.of( + data.add(Row.ofKind( + RowKind.UPDATE_BEFORE, null, -1, -1, @@ -179,7 +234,8 @@ private List getData() { BigDecimal.valueOf(-12345.06789), null) ); - data.add(Row.of( + data.add(Row.ofKind( + RowKind.UPDATE_AFTER, null, -1, -1, @@ -187,7 +243,8 @@ private List getData() { BigDecimal.valueOf(-12345.06789), Timestamp.valueOf("2020-03-04 18:39:14")) ); - data.add(Row.of( + data.add(Row.ofKind( + RowKind.DELETE, null, -1, -1, diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/BatchSelectTableSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/BatchSelectTableSink.java index 4ff80d459b1ae..d7020cd0cb524 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/BatchSelectTableSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/BatchSelectTableSink.java @@ -18,25 +18,31 @@ package org.apache.flink.table.planner.sinks; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectTableSink; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.sinks.StreamTableSink; import org.apache.flink.types.Row; /** - * A {@link SelectTableSink} for batch select job. + * A {@link StreamTableSink} for batch select job to collect the result to local. */ -public class BatchSelectTableSink extends SelectTableSinkBase implements StreamTableSink { +public class BatchSelectTableSink extends SelectTableSinkBase implements StreamTableSink { public BatchSelectTableSink(TableSchema tableSchema) { - super(tableSchema); + super(tableSchema, createRowDataTypeInfo(tableSchema).createSerializer(new ExecutionConfig())); } @Override - public DataStreamSink consumeDataStream(DataStream dataStream) { - return super.consumeDataStream(dataStream); + public TypeInformation getOutputType() { + return createRowDataTypeInfo(getTableSchema()); + } + + @Override + protected Row convertToRow(RowData element) { + // convert RowData to Row + return converter.toExternal(element); } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java index e7664dae7e3e0..707f2ed14763a 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkBase.java @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.sinks; -import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStream; @@ -28,61 +28,106 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectTableSink; -import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter; +import org.apache.flink.table.api.internal.SelectResultProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; +import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo; +import org.apache.flink.table.sinks.StreamTableSink; +import org.apache.flink.table.sinks.TableSink; import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.types.Row; import java.util.Iterator; import java.util.UUID; +import java.util.stream.Stream; /** - * Basic implementation of {@link SelectTableSink}. + * Basic implementation of {@link StreamTableSink} for select job to collect the result to local. */ -public class SelectTableSinkBase implements SelectTableSink { +public abstract class SelectTableSinkBase implements StreamTableSink { private final TableSchema tableSchema; - private final CollectSinkOperatorFactory factory; - private final CollectResultIterator iterator; + protected final DataFormatConverters.DataFormatConverter converter; + + private final CollectSinkOperatorFactory factory; + private final CollectResultIterator iterator; @SuppressWarnings("unchecked") - public SelectTableSinkBase(TableSchema tableSchema) { - this.tableSchema = SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp( - SelectTableSinkSchemaConverter.changeDefaultConversionClass(tableSchema)); + public SelectTableSinkBase(TableSchema schema, TypeSerializer typeSerializer) { + this.tableSchema = schema; + this.converter = DataFormatConverters.getConverterForDataType(this.tableSchema.toPhysicalRowDataType()); - TypeSerializer typeSerializer = (TypeSerializer) TypeInfoDataTypeConverter - .fromDataTypeToTypeInfo(this.tableSchema.toRowDataType()) - .createSerializer(new ExecutionConfig()); String accumulatorName = "tableResultCollect_" + UUID.randomUUID(); - this.factory = new CollectSinkOperatorFactory<>(typeSerializer, accumulatorName); CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); this.iterator = new CollectResultIterator<>(operator.getOperatorIdFuture(), typeSerializer, accumulatorName); } @Override - public DataType getConsumedDataType() { - return tableSchema.toRowDataType(); + public TableSchema getTableSchema() { + return tableSchema; } @Override - public TableSchema getTableSchema() { - return tableSchema; + public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + throw new UnsupportedOperationException(); } - protected DataStreamSink consumeDataStream(DataStream dataStream) { - CollectStreamSink sink = new CollectStreamSink<>(dataStream, factory); + @Override + public DataStreamSink consumeDataStream(DataStream dataStream) { + CollectStreamSink sink = new CollectStreamSink<>(dataStream, factory); dataStream.getExecutionEnvironment().addOperator(sink.getTransformation()); return sink.name("Select table sink"); } - @Override - public void setJobClient(JobClient jobClient) { - iterator.setJobClient(jobClient); + public SelectResultProvider getSelectResultProvider() { + return new SelectResultProvider() { + @Override + public void setJobClient(JobClient jobClient) { + iterator.setJobClient(jobClient); + } + + @Override + public Iterator getResultIterator() { + return new RowIteratorWrapper(iterator); + } + }; } - @Override - public Iterator getResultIterator() { - return iterator; + /** + * An Iterator wrapper class that converts Iterator<T> to Iterator<Row>. + */ + private class RowIteratorWrapper implements Iterator, AutoCloseable { + private final CollectResultIterator iterator; + + public RowIteratorWrapper(CollectResultIterator iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Row next() { + return convertToRow(iterator.next()); + } + + @Override + public void close() throws Exception { + iterator.close(); + } + } + + protected abstract Row convertToRow(T element); + + /** + * Create RowDataTypeInfo based on given table schema. + */ + protected static RowDataTypeInfo createRowDataTypeInfo(TableSchema tableSchema) { + return new RowDataTypeInfo( + Stream.of(tableSchema.getFieldDataTypes()).map(DataType::getLogicalType).toArray(LogicalType[]::new)); } } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java index 6615b5766e3e7..97db01720ff12 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/SelectTableSinkSchemaConverter.java @@ -50,7 +50,7 @@ public static TableSchema changeDefaultConversionClass(TableSchema tableSchema) * Convert time attributes (proc time / event time) to regular timestamp * and build a new {@link TableSchema}. */ - static TableSchema convertTimeAttributeToRegularTimestamp(TableSchema tableSchema) { + public static TableSchema convertTimeAttributeToRegularTimestamp(TableSchema tableSchema) { DataType[] dataTypes = tableSchema.getFieldDataTypes(); String[] oldNames = tableSchema.getFieldNames(); diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/StreamSelectTableSink.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/StreamSelectTableSink.java index 833ef406ee466..be2b09d30727a 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/StreamSelectTableSink.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/sinks/StreamSelectTableSink.java @@ -18,28 +18,42 @@ package org.apache.flink.table.planner.sinks; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectTableSink; -import org.apache.flink.table.sinks.AppendStreamTableSink; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.sinks.RetractStreamTableSink; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; /** - * A {@link SelectTableSink} for streaming select job. + * A {@link RetractStreamTableSink} for streaming select job to collect the result to local. * - *

NOTE: - * Currently, only insert changes (AppendStreamTableSink) is supported. - * Once FLINK-16998 is finished, all kinds of changes will be supported. + *

{@link RowData} contains {@link RowKind} attribute which + * can represents all kind of changes. The boolean flag is useless here, + * only because {@link RetractStreamTableSink} requires Tuple2<Boolean, T> type. */ -public class StreamSelectTableSink extends SelectTableSinkBase implements AppendStreamTableSink { +public class StreamSelectTableSink + extends SelectTableSinkBase> + implements RetractStreamTableSink { public StreamSelectTableSink(TableSchema tableSchema) { - super(tableSchema); + super(tableSchema, new TupleTypeInfo>( + Types.BOOLEAN, + createRowDataTypeInfo(tableSchema)).createSerializer(new ExecutionConfig())); } @Override - public DataStreamSink consumeDataStream(DataStream dataStream) { - return super.consumeDataStream(dataStream); + public TypeInformation getRecordType() { + return createRowDataTypeInfo(getTableSchema()); + } + + @Override + protected Row convertToRow(Tuple2 tuple2) { + // convert Tuple2 to Row + return converter.toExternal(tuple2.f1); } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala index ff38b37afc731..5b7bb17e58cd6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/BatchPlanner.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.delegation import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.api.internal.SelectTableSink import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException, TableSchema} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor @@ -31,7 +30,7 @@ import org.apache.flink.table.planner.plan.nodes.process.DAGProcessContext import org.apache.flink.table.planner.plan.optimize.{BatchCommonSubGraphBasedOptimizer, Optimizer} import org.apache.flink.table.planner.plan.reuse.DeadlockBreakupProcessor import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil} -import org.apache.flink.table.planner.sinks.BatchSelectTableSink +import org.apache.flink.table.planner.sinks.{BatchSelectTableSink, SelectTableSinkBase} import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil} import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef} @@ -80,7 +79,7 @@ class BatchPlanner( } } - override def createSelectTableSink(tableSchema: TableSchema): SelectTableSink = { + override protected def createSelectTableSink(tableSchema: TableSchema): SelectTableSinkBase[_] = { new BatchSelectTableSink(tableSchema) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala index 2fe000127413f..0dc62a6d029ba 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala @@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting import org.apache.flink.api.dag.Transformation import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.config.ExecutionConfigOptions -import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException} +import org.apache.flink.table.api.{TableConfig, TableEnvironment, TableException, TableSchema} import org.apache.flink.table.catalog._ import org.apache.flink.table.connector.sink.DynamicTableSink import org.apache.flink.table.delegation.{Executor, Parser, Planner} @@ -41,8 +41,8 @@ import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel import org.apache.flink.table.planner.plan.optimize.Optimizer import org.apache.flink.table.planner.plan.reuse.SubplanReuser import org.apache.flink.table.planner.plan.utils.SameRelObjectShuttle -import org.apache.flink.table.planner.sinks.DataStreamTableSink import org.apache.flink.table.planner.sinks.TableSinkUtils.{inferSinkPhysicalSchema, validateLogicalPhysicalTypesCompatible, validateSchemaAndApplyImplicitCast, validateTableSink} +import org.apache.flink.table.planner.sinks.{DataStreamTableSink, SelectTableSinkBase, SelectTableSinkSchemaConverter} import org.apache.flink.table.planner.utils.JavaScalaConversionUtil import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter @@ -193,6 +193,21 @@ abstract class PlannerBase( "UnregisteredSink", ConnectorCatalogTable.sink(s.getSink, !isStreamingMode)) + case s: SelectSinkOperation => + val input = getRelBuilder.queryOperation(s.getChild).build() + // convert query schema to sink schema + val sinkSchema = SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp( + SelectTableSinkSchemaConverter.changeDefaultConversionClass(s.getChild.getTableSchema)) + // validate query schema and sink schema, and apply cast if possible + val query = validateSchemaAndApplyImplicitCast(input, sinkSchema, getTypeFactory) + val sink = createSelectTableSink(sinkSchema) + s.setSelectResultProvider(sink.getSelectResultProvider) + LogicalLegacySink.create( + query, + sink, + "collect", + ConnectorCatalogTable.sink(sink, !isStreamingMode)) + case catalogSink: CatalogSinkModifyOperation => val input = getRelBuilder.queryOperation(modifyOperation.getChild).build() val identifier = catalogSink.getTableIdentifier @@ -309,6 +324,14 @@ abstract class PlannerBase( */ protected def translateToPlan(execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] + /** + * Creates a [[SelectTableSinkBase]] for a select query. + * + * @param tableSchema the table schema of select result. + * @return The sink to fetch the select result. + */ + protected def createSelectTableSink(tableSchema: TableSchema): SelectTableSinkBase[_] + private def getTableSink( objectIdentifier: ObjectIdentifier, dynamicOptions: JMap[String, String]) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala index 784f012ed835c..993be3e4e4fae 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/delegation/StreamPlanner.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.delegation import org.apache.flink.api.dag.Transformation -import org.apache.flink.table.api.internal.SelectTableSink import org.apache.flink.table.api.{ExplainDetail, TableConfig, TableException, TableSchema} import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, ObjectIdentifier} import org.apache.flink.table.delegation.Executor @@ -29,7 +28,7 @@ import org.apache.flink.table.planner.plan.`trait`._ import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, StreamExecNode} import org.apache.flink.table.planner.plan.optimize.{Optimizer, StreamCommonSubGraphBasedOptimizer} import org.apache.flink.table.planner.plan.utils.{ExecNodePlanDumper, FlinkRelOptUtil} -import org.apache.flink.table.planner.sinks.StreamSelectTableSink +import org.apache.flink.table.planner.sinks.{SelectTableSinkBase, StreamSelectTableSink} import org.apache.flink.table.planner.utils.{DummyStreamExecutionEnvironment, ExecutorUtils, PlanUtil} import org.apache.calcite.plan.{ConventionTraitDef, RelTrait, RelTraitDef} @@ -71,7 +70,7 @@ class StreamPlanner( } } - override def createSelectTableSink(tableSchema: TableSchema): SelectTableSink = { + override protected def createSelectTableSink(tableSchema: TableSchema): SelectTableSinkBase[_] = { new StreamSelectTableSink(tableSchema) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala index bc6b46344f484..d14a6808cb752 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala @@ -22,22 +22,24 @@ import org.apache.flink.api.common.typeinfo.Types.STRING import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecutionEnvironment} -import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableEnvironmentInternal} import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment => ScalaStreamTableEnvironment, _} +import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableEnvironmentInternal} import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory import org.apache.flink.table.planner.runtime.utils.{TableEnvUtil, TestingAppendSink} import org.apache.flink.table.planner.utils.TableTestUtil.{readFromResource, replaceStageId} import org.apache.flink.table.planner.utils.{TableTestUtil, TestTableSourceSinks, TestTableSourceWithTime} -import org.apache.flink.types.Row +import org.apache.flink.types.{Row, RowKind} import org.apache.flink.util.{FileUtils, TestLogger} + import org.apache.flink.shaded.guava18.com.google.common.collect.Lists -import org.hamcrest.Matchers.containsString + import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.junit.rules.{ExpectedException, TemporaryFolder} import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Assert, Before, Rule, Test} + import _root_.java.io.{File, FileFilter} import _root_.java.lang.{Long => JLong} import _root_.java.util @@ -599,14 +601,35 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends @Test def testExecuteSelectWithUpdateChanges(): Unit = { - if (!isStreaming) { - return + val tableResult = tEnv.sqlQuery("select count(*) as c from MyTable").execute() + assertTrue(tableResult.getJobClient.isPresent) + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind) + assertEquals( + TableSchema.builder().field("c", DataTypes.BIGINT().notNull()).build(), + tableResult.getTableSchema) + val expected = if (isStreaming) { + util.Arrays.asList( + Row.ofKind(RowKind.INSERT, JLong.valueOf(1)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(1)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(2)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(2)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(3)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(3)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(4)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(4)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(5)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(5)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(6)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(6)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(7)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(7)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(8)) + ) + } else { + util.Arrays.asList(Row.of(JLong.valueOf(8))) } - // TODO Once FLINK-16998 is finished, all kinds of changes will be supported. - thrown.expect(classOf[TableException]) - thrown.expectMessage(containsString( - "AppendStreamTableSink doesn't support consuming update changes")) - tEnv.executeSql("select count(*) from MyTable") + val actual = Lists.newArrayList(tableResult.collect()) + assertEquals(expected, actual) } @Test diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableITCase.scala index 01f400d3743d9..f301529bb6d24 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/api/TableITCase.scala @@ -19,21 +19,21 @@ package org.apache.flink.table.api import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment -import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.api.bridge.java.StreamTableEnvironment +import org.apache.flink.table.api.internal.TableEnvironmentImpl import org.apache.flink.table.planner.utils.TestTableSourceSinks -import org.apache.flink.types.Row +import org.apache.flink.types.{Row, RowKind} import org.apache.flink.util.TestLogger import org.apache.flink.shaded.guava18.com.google.common.collect.Lists -import org.hamcrest.Matchers.containsString import org.junit.Assert.{assertEquals, assertTrue} import org.junit.rules.{ExpectedException, TemporaryFolder} import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Before, Rule, Test} +import _root_.java.lang.{Long => JLong} import _root_.java.util @RunWith(classOf[Parameterized]) @@ -104,14 +104,35 @@ class TableITCase(tableEnvName: String, isStreaming: Boolean) extends TestLogger @Test def testExecuteWithUpdateChanges(): Unit = { - if (!isStreaming) { - return + val tableResult = tEnv.sqlQuery("select count(*) as c from MyTable").execute() + assertTrue(tableResult.getJobClient.isPresent) + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind) + assertEquals( + TableSchema.builder().field("c", DataTypes.BIGINT().notNull()).build(), + tableResult.getTableSchema) + val expected = if (isStreaming) { + util.Arrays.asList( + Row.ofKind(RowKind.INSERT, JLong.valueOf(1)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(1)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(2)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(2)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(3)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(3)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(4)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(4)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(5)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(5)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(6)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(6)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(7)), + Row.ofKind(RowKind.UPDATE_BEFORE, JLong.valueOf(7)), + Row.ofKind(RowKind.UPDATE_AFTER, JLong.valueOf(8)) + ) + } else { + util.Arrays.asList(Row.of(JLong.valueOf(8))) } - // TODO Once FLINK-16998 is finished, all kinds of changes will be supported. - thrown.expect(classOf[TableException]) - thrown.expectMessage(containsString( - "AppendStreamTableSink doesn't support consuming update changes")) - tEnv.sqlQuery("select count(*) from MyTable").execute() + val actual = Lists.newArrayList(tableResult.collect()) + assertEquals(expected, actual) } } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala index 73c16aaf39774..8e980b2163dea 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/TableSourceITCase.scala @@ -239,7 +239,7 @@ class TableSourceITCase extends BatchTestBase { row( false, -128, -32768, -2147483648, -9223372036854775808L, "3.4", "3.4", "6.10", 12, 12, "1970-09-30", "01:01:01.123", "1970-09-30T01:01:01.123456", - "1970-09-30T01:01:01.123456Z", "[4, 5]", row(null, "b", "4.56"), "{k4=4, k2=2}"), + "1970-09-30T01:01:01.123456Z", "[4, 5]", row(null, "b", "4.56"), "{k2=2, k4=4}"), row( true, 0, 0, 0, 0, "0.12", "0.12", "7.10", 123, 123, "1990-12-24", "08:10:24.123", "1990-12-24T08:10:24.123", diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala index 54e62d79a7390..ca16518e645fb 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/AggregationITCase.scala @@ -371,7 +371,7 @@ class AggregationITCase extends BatchTestBase { "1,{1=1}\n" + "2,{2=1, 3=1}\n" + "3,{4=1, 5=1, 6=1}\n" + - "4,{8=1, 9=1, 10=1, 7=1}\n" + + "4,{7=1, 8=1, 9=1, 10=1}\n" + "5,{11=1, 12=1, 13=1, 14=1, 15=1}\n" + "6,{16=1, 17=1, 18=1, 19=1, 20=1, 21=1}" val results = executeQuery(t) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala index f948eb38543b8..b4c2fe5c09d4e 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/table/CalcITCase.scala @@ -551,7 +551,7 @@ class CalcITCase extends BatchTestBase { .select(map('a, 'b, 'c, 'd)) val result4 = executeQuery(t4) val expected4 = "{AAA=123.45, BBB=234.56}\n" + - "{DDD=456.78, CCC=345.67}\n" + + "{CCC=345.67, DDD=456.78}\n" + "{EEE=567.89, FFF=678.99}\n" TestBaseUtils.compareResultAsText(result4.asJava, expected4) } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java index 5b360b35ce647..7785a61080861 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/BatchSelectTableSink.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.accumulators.SerializedListAccumulator; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.Utils; @@ -28,7 +29,7 @@ import org.apache.flink.core.execution.JobClient; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectTableSink; +import org.apache.flink.table.api.internal.SelectResultProvider; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import org.apache.flink.util.AbstractID; @@ -41,13 +42,12 @@ import java.util.concurrent.ExecutionException; /** - * A {@link SelectTableSink} for batch select job. + * A {@link BatchTableSink} for batch select job to collect the result to local. */ -public class BatchSelectTableSink implements BatchTableSink, SelectTableSink { +public class BatchSelectTableSink implements BatchTableSink { private final TableSchema tableSchema; private final String accumulatorName; private final TypeSerializer typeSerializer; - private JobClient jobClient; public BatchSelectTableSink(TableSchema tableSchema) { this.tableSchema = @@ -66,6 +66,11 @@ public TableSchema getTableSchema() { return tableSchema; } + @Override + public TableSink configure(String[] fieldNames, TypeInformation[] fieldTypes) { + throw new UnsupportedOperationException(); + } + @Override public DataSink consumeDataSet(DataSet dataSet) { return dataSet.output( @@ -74,14 +79,24 @@ public DataSink consumeDataSet(DataSet dataSet) { .setParallelism(1); } - @Override - public void setJobClient(JobClient jobClient) { - this.jobClient = Preconditions.checkNotNull(jobClient, "jobClient should not be null"); + public SelectResultProvider getSelectResultProvider() { + return new SelectResultProvider() { + private JobClient jobClient; + + @Override + public void setJobClient(JobClient jobClient) { + this.jobClient = Preconditions.checkNotNull(jobClient, "jobClient should not be null"); + } + + @Override + public Iterator getResultIterator() { + Preconditions.checkNotNull(jobClient, "jobClient is null, please call setJobClient first."); + return collectResult(jobClient); + } + }; } - @Override - public Iterator getResultIterator() { - Preconditions.checkNotNull(jobClient, "jobClient is null, please call setJobClient first."); + private Iterator collectResult(JobClient jobClient) { JobExecutionResult jobExecutionResult; try { jobExecutionResult = jobClient.getJobExecutionResult( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java index 1734feab2ae8f..b1fb7ed86d055 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sinks/StreamSelectTableSink.java @@ -19,7 +19,11 @@ package org.apache.flink.table.sinks; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; @@ -28,29 +32,28 @@ import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; import org.apache.flink.table.api.TableSchema; -import org.apache.flink.table.api.internal.SelectTableSink; -import org.apache.flink.table.types.DataType; +import org.apache.flink.table.api.internal.SelectResultProvider; import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; import java.util.Iterator; import java.util.UUID; /** - * A {@link SelectTableSink} for streaming select job. - * - *

NOTES: Currently, only insert changes is supported. - * Once FLINK-16998 is finished, all kinds of changes will be supported. + * A {@link RetractStreamTableSink} for streaming select job to collect the result to local. */ -public class StreamSelectTableSink implements AppendStreamTableSink, SelectTableSink { +public class StreamSelectTableSink implements RetractStreamTableSink { private final TableSchema tableSchema; - private final CollectSinkOperatorFactory factory; - private final CollectResultIterator iterator; + private final CollectSinkOperatorFactory> factory; + private final CollectResultIterator> iterator; public StreamSelectTableSink(TableSchema tableSchema) { this.tableSchema = SelectTableSinkSchemaConverter.convertTimeAttributeToRegularTimestamp(tableSchema); - TypeSerializer typeSerializer = this.tableSchema.toRowType().createSerializer(new ExecutionConfig()); + TypeInformation> tupleTypeInfo = + new TupleTypeInfo<>(Types.BOOLEAN, this.tableSchema.toRowType()); + TypeSerializer> typeSerializer = tupleTypeInfo.createSerializer(new ExecutionConfig()); String accumulatorName = "tableResultCollect_" + UUID.randomUUID(); this.factory = new CollectSinkOperatorFactory<>(typeSerializer, accumulatorName); @@ -59,8 +62,8 @@ public StreamSelectTableSink(TableSchema tableSchema) { } @Override - public DataType getConsumedDataType() { - return tableSchema.toRowDataType(); + public TypeInformation getRecordType() { + return tableSchema.toRowType(); } @Override @@ -69,19 +72,59 @@ public TableSchema getTableSchema() { } @Override - public DataStreamSink consumeDataStream(DataStream dataStream) { - CollectStreamSink sink = new CollectStreamSink<>(dataStream, factory); + public TableSink> configure(String[] fieldNames, TypeInformation[] fieldTypes) { + throw new UnsupportedOperationException(); + } + + @Override + public DataStreamSink consumeDataStream(DataStream> dataStream) { + CollectStreamSink sink = new CollectStreamSink<>(dataStream, factory); dataStream.getExecutionEnvironment().addOperator(sink.getTransformation()); return sink.name("Streaming select table sink"); } - @Override - public void setJobClient(JobClient jobClient) { - iterator.setJobClient(jobClient); + public SelectResultProvider getSelectResultProvider() { + return new SelectResultProvider() { + + @Override + public void setJobClient(JobClient jobClient) { + iterator.setJobClient(jobClient); + } + + @Override + public Iterator getResultIterator() { + return new RowIteratorWrapper(iterator); + } + }; } - @Override - public Iterator getResultIterator() { - return iterator; + /** + * An Iterator wrapper class that converts Iterator<Tuple2<Boolean, Row>> to Iterator<Row>. + */ + private static class RowIteratorWrapper implements Iterator, AutoCloseable { + private final CollectResultIterator> iterator; + public RowIteratorWrapper(CollectResultIterator> iterator) { + this.iterator = iterator; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public Row next() { + // convert Tuple2 to Row + Tuple2 tuple2 = iterator.next(); + RowKind rowKind = tuple2.f0 ? RowKind.INSERT : RowKind.DELETE; + Row row = tuple2.f1; + row.setKind(rowKind); + return row; + } + + @Override + public void close() throws Exception { + iterator.close(); + } } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala index 5d5fc12a1c5e5..9dd09a0a3f0fe 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala @@ -588,13 +588,15 @@ abstract class TableEnvImpl( val dataSink = writeToSinkAndTranslate(operation, tableSink) try { val jobClient = execute(JCollections.singletonList(dataSink), "collect") - tableSink.setJobClient(jobClient) + val selectResultProvider = tableSink.getSelectResultProvider + selectResultProvider.setJobClient(jobClient) TableResultImpl.builder .jobClient(jobClient) .resultKind(ResultKind.SUCCESS_WITH_CONTENT) .tableSchema(tableSchema) - .data(tableSink.getResultIterator) - .setPrintStyle(PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN)) + .data(selectResultProvider.getResultIterator) + .setPrintStyle( + PrintStyle.tableau(PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, false)) .build } catch { case e: Exception => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala index 57b80440d0da5..eb9fd8a4b3e5a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala @@ -23,7 +23,6 @@ import org.apache.flink.api.dag.Transformation import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api._ -import org.apache.flink.table.api.internal.SelectTableSink import org.apache.flink.table.calcite._ import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, _} import org.apache.flink.table.delegation.{Executor, Parser, Planner} @@ -122,10 +121,6 @@ class StreamPlanner( }.filter(Objects.nonNull).asJava } - override def createSelectTableSink(tableSchema: TableSchema): SelectTableSink = { - new StreamSelectTableSink(tableSchema) - } - override def explain(operations: util.List[Operation], extraDetails: ExplainDetail*): String = { require(operations.asScala.nonEmpty, "operations should not be empty") val astWithUpdatesAsRetractionTuples = operations.asScala.map { @@ -193,6 +188,11 @@ class StreamPlanner( case s: UnregisteredSinkModifyOperation[_] => writeToSink(s.getChild, s.getSink, "UnregisteredSink") + case s: SelectSinkOperation => + val sink = new StreamSelectTableSink(s.getChild.getTableSchema) + s.setSelectResultProvider(sink.getSelectResultProvider) + writeToSink(s.getChild, sink, "collect") + case catalogSink: CatalogSinkModifyOperation => getTableSink(catalogSink.getTableIdentifier) .map(sink => { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala index a55927fe843f2..e9c5fe6c90f80 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala @@ -33,12 +33,11 @@ import org.apache.flink.table.sinks.CsvTableSink import org.apache.flink.table.sources.CsvTableSource import org.apache.flink.table.utils.TableTestUtil.{readFromResource, replaceStageId} import org.apache.flink.table.utils.{TestTableSourceWithTime, TestingOverwritableTableSink} -import org.apache.flink.types.Row +import org.apache.flink.types.{Row, RowKind} import org.apache.flink.util.FileUtils import org.apache.flink.shaded.guava18.com.google.common.collect.Lists -import org.hamcrest.Matchers.containsString import org.junit.Assert.{assertEquals, assertFalse, assertTrue} import org.junit.rules.{ExpectedException, TemporaryFolder} import org.junit.runner.RunWith @@ -519,11 +518,31 @@ class TableEnvironmentITCase(tableEnvName: String) { @Test def testExecuteSelectWithUpdateChanges(): Unit = { - // TODO Once FLINK-16998 is finished, all kinds of changes will be supported. - thrown.expect(classOf[TableException]) - thrown.expectMessage(containsString( - "AppendStreamTableSink requires that Table has only insert changes.")) - tEnv.executeSql("select count(*) from MyTable") + val tableResult = tEnv.sqlQuery("select count(*) as c from MyTable").execute() + assertTrue(tableResult.getJobClient.isPresent) + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind) + assertEquals( + TableSchema.builder().field("c", DataTypes.BIGINT().notNull()).build(), + tableResult.getTableSchema) + val expected = util.Arrays.asList( + Row.ofKind(RowKind.INSERT, JLong.valueOf(1)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(1)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(2)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(2)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(3)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(3)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(4)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(4)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(5)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(5)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(6)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(6)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(7)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(7)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(8)) + ) + val actual = Lists.newArrayList(tableResult.collect()) + assertEquals(expected, actual) } @Test diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala index 6849ac08c81d2..0736ac2f984e6 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableITCase.scala @@ -22,17 +22,17 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api.TableEnvironmentITCase.getPersonCsvTableSource import org.apache.flink.table.api.bridge.java.StreamTableEnvironment import org.apache.flink.table.api.internal.{TableEnvironmentImpl, TableEnvironmentInternal} -import org.apache.flink.types.Row +import org.apache.flink.types.{Row, RowKind} import org.apache.flink.shaded.guava18.com.google.common.collect.Lists -import org.hamcrest.Matchers.containsString import org.junit.Assert.{assertEquals, assertTrue} import org.junit.rules.{ExpectedException, TemporaryFolder} import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.{Before, Rule, Test} +import java.lang.{Long => JLong} import java.util @RunWith(classOf[Parameterized]) @@ -101,11 +101,31 @@ class TableITCase(tableEnvName: String) { @Test def testExecuteWithUpdateChanges(): Unit = { - // TODO Once FLINK-16998 is finished, all kinds of changes will be supported. - thrown.expect(classOf[TableException]) - thrown.expectMessage(containsString( - "AppendStreamTableSink requires that Table has only insert changes.")) - tEnv.executeSql("select count(*) from MyTable") + val tableResult = tEnv.sqlQuery("select count(*) as c from MyTable").execute() + assertTrue(tableResult.getJobClient.isPresent) + assertEquals(ResultKind.SUCCESS_WITH_CONTENT, tableResult.getResultKind) + assertEquals( + TableSchema.builder().field("c", DataTypes.BIGINT().notNull()).build(), + tableResult.getTableSchema) + val expected = util.Arrays.asList( + Row.ofKind(RowKind.INSERT, JLong.valueOf(1)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(1)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(2)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(2)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(3)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(3)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(4)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(4)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(5)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(5)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(6)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(6)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(7)), + Row.ofKind(RowKind.DELETE, JLong.valueOf(7)), + Row.ofKind(RowKind.INSERT, JLong.valueOf(8)) + ) + val actual = Lists.newArrayList(tableResult.collect()) + assertEquals(expected, actual) } }