Skip to content

Commit

Permalink
[FLINK-17774][table] Supports all kinds of changes for select result
Browse files Browse the repository at this point in the history
This closes apache#12199
  • Loading branch information
godfreyhe authored and wuchong committed Jun 3, 2020
1 parent 9dbd658 commit 127bb48
Show file tree
Hide file tree
Showing 27 changed files with 557 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,18 @@
package org.apache.flink.table.api.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;

import java.util.Iterator;

/**
* An internal special {@link TableSink} to collect the select query result to local client.
* An internal class which helps the client to get the execute result from a specific sink.
*
* <p>This class is generated by specific sink and brings the result info to a TableResult.
*/
@Internal
public interface SelectTableSink extends TableSink<Row> {

default TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
// disable this to make sure there is only one SelectTableSink instance
// so that the instance can be shared within optimizing and executing in client
throw new UnsupportedOperationException();
}

public interface SelectResultProvider {
/**
* Set the job client associated with the select job to retrieve the result.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.SelectSinkOperation;
import org.apache.flink.table.operations.ShowCatalogsOperation;
import org.apache.flink.table.operations.ShowDatabasesOperation;
import org.apache.flink.table.operations.ShowFunctionsOperation;
import org.apache.flink.table.operations.ShowTablesOperation;
import org.apache.flink.table.operations.ShowViewsOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.UnregisteredSinkModifyOperation;
import org.apache.flink.table.operations.UseCatalogOperation;
import org.apache.flink.table.operations.UseDatabaseOperation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
Expand Down Expand Up @@ -689,24 +689,20 @@ public TableResult executeInternal(List<ModifyOperation> operations) {

@Override
public TableResult executeInternal(QueryOperation operation) {
TableSchema tableSchema = operation.getTableSchema();
SelectTableSink tableSink = planner.createSelectTableSink(tableSchema);
UnregisteredSinkModifyOperation<Row> sinkOperation = new UnregisteredSinkModifyOperation<>(
tableSink,
operation
);
SelectSinkOperation sinkOperation = new SelectSinkOperation(operation);
List<Transformation<?>> transformations = translate(Collections.singletonList(sinkOperation));
Pipeline pipeline = execEnv.createPipeline(transformations, tableConfig, "collect");
try {
JobClient jobClient = execEnv.executeAsync(pipeline);
tableSink.setJobClient(jobClient);
SelectResultProvider resultProvider = sinkOperation.getSelectResultProvider();
resultProvider.setJobClient(jobClient);
return TableResultImpl.builder()
.jobClient(jobClient)
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.tableSchema(tableSchema)
.data(tableSink.getResultIterator())
.tableSchema(operation.getTableSchema())
.data(resultProvider.getResultIterator())
.setPrintStyle(TableResultImpl.PrintStyle.tableau(
PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN))
PrintUtils.MAX_COLUMN_WIDTH, PrintUtils.NULL_COLUMN, isStreamingMode))
.build();
} catch (Exception e) {
throw new TableException("Failed to execute sql", e);
Expand Down Expand Up @@ -1092,7 +1088,7 @@ private TableResult buildResult(String[] headers, DataType[] types, Object[][] r
headers,
types).build())
.data(Arrays.stream(rows).map(Row::of).collect(Collectors.toList()))
.setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, ""))
.setPrintStyle(TableResultImpl.PrintStyle.tableau(Integer.MAX_VALUE, "", false))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,9 @@ public void print() {
if (printStyle instanceof TableauStyle) {
int maxColumnWidth = ((TableauStyle) printStyle).getMaxColumnWidth();
String nullColumn = ((TableauStyle) printStyle).getNullColumn();
boolean printRowKind = ((TableauStyle) printStyle).isPrintRowKind();
PrintUtils.printAsTableauForm(
getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn);
getTableSchema(), it, new PrintWriter(System.out), maxColumnWidth, nullColumn, printRowKind);
} else if (printStyle instanceof RawContentStyle) {
while (it.hasNext()) {
System.out.println(String.join(",", PrintUtils.rowToString(it.next())));
Expand All @@ -116,7 +117,7 @@ public static class Builder {
private TableSchema tableSchema = null;
private ResultKind resultKind = null;
private Iterator<Row> data = null;
private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN);
private PrintStyle printStyle = PrintStyle.tableau(Integer.MAX_VALUE, PrintUtils.NULL_COLUMN, false);

private Builder() {
}
Expand Down Expand Up @@ -197,13 +198,13 @@ public TableResult build() {
*/
public interface PrintStyle {
/**
* Create a tableau print style with given max column width and null column,
* Create a tableau print style with given max column width, null column and change mode indicator,
* which prints the result schema and content as tableau form.
*/
static PrintStyle tableau(int maxColumnWidth, String nullColumn) {
static PrintStyle tableau(int maxColumnWidth, String nullColumn, boolean printRowKind) {
Preconditions.checkArgument(maxColumnWidth > 0, "maxColumnWidth should be greater than 0");
Preconditions.checkNotNull(nullColumn, "nullColumn should not be null");
return new TableauStyle(maxColumnWidth, nullColumn);
return new TableauStyle(maxColumnWidth, nullColumn, printRowKind);
}

/**
Expand All @@ -223,10 +224,12 @@ private static final class TableauStyle implements PrintStyle {

private final int maxColumnWidth;
private final String nullColumn;
private final boolean printRowKind;

private TableauStyle(int maxColumnWidth, String nullColumn) {
private TableauStyle(int maxColumnWidth, String nullColumn, boolean printRowKind) {
this.maxColumnWidth = maxColumnWidth;
this.nullColumn = nullColumn;
this.printRowKind = printRowKind;
}

int getMaxColumnWidth() {
Expand All @@ -236,6 +239,10 @@ int getMaxColumnWidth() {
String getNullColumn() {
return nullColumn;
}

public boolean isPrintRowKind() {
return printRowKind;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.SelectTableSink;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
Expand Down Expand Up @@ -76,14 +74,6 @@ public interface Planner {
*/
List<Transformation<?>> translate(List<ModifyOperation> modifyOperations);

/**
* Creates a {@link SelectTableSink} for a select query.
*
* @param tableSchema the table schema of select result.
* @return The {@link SelectTableSink} for the select query.
*/
SelectTableSink createSelectTableSink(TableSchema tableSchema);

/**
* Returns the AST of the specified Table API and SQL queries and the execution plan
* to compute the result of the given collection of {@link QueryOperation}s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ public interface ModifyOperationVisitor<T> {
T visit(OutputConversionModifyOperation outputConversion);

<U> T visit(UnregisteredSinkModifyOperation<U> unregisteredSink);

T visit(SelectSinkOperation selectOperation);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.SelectResultProvider;

import java.util.Collections;
import java.util.HashMap;

/**
* Special, internal kind of {@link ModifyOperation} that
* collects the content of {@link QueryOperation} to local.
*/
@Internal
public class SelectSinkOperation implements ModifyOperation {

private final QueryOperation child;
// help the client to get the execute result from a specific sink.
private SelectResultProvider resultProvider;

public SelectSinkOperation(QueryOperation child) {
this.child = child;
}

public void setSelectResultProvider(SelectResultProvider resultProvider) {
this.resultProvider = resultProvider;
}

public SelectResultProvider getSelectResultProvider() {
return resultProvider;
}

@Override
public QueryOperation getChild() {
return child;
}

@Override
public <T> T accept(ModifyOperationVisitor<T> visitor) {
return visitor.visit(this);
}

@Override
public String asSummaryString() {
return OperationUtils.formatWithChildren(
"SelectSink",
new HashMap<>(),
Collections.singletonList(child),
Operation::asSummaryString);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.internal.SelectTableSink;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.operations.ModifyOperation;
Expand All @@ -44,11 +42,6 @@ public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations)
return null;
}

@Override
public SelectTableSink createSelectTableSink(TableSchema tableSchema) {
return null;
}

@Override
public String explain(List<Operation> operations, ExplainDetail... extraDetails) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.utils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
Expand All @@ -31,6 +32,8 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Utilities for print formatting.
Expand Down Expand Up @@ -58,44 +61,51 @@ private PrintUtils() {
* | (NULL) | (NULL) | (NULL) |
* +-------------+---------+-------------+
* 3 rows in result
*
* <p>Changelog is not supported until FLINK-16998 is finished.
*/
public static void printAsTableauForm(
TableSchema tableSchema,
Iterator<Row> it,
PrintWriter printWriter) {
printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN);
printAsTableauForm(tableSchema, it, printWriter, MAX_COLUMN_WIDTH, NULL_COLUMN, false);
}

/**
* Displays the result in a tableau form.
*
* <p>For example:
* +-------------+---------+-------------+
* | boolean_col | int_col | varchar_col |
* +-------------+---------+-------------+
* | true | 1 | abc |
* | false | 2 | def |
* | (NULL) | (NULL) | (NULL) |
* +-------------+---------+-------------+
* 3 rows in result
*
* <p>Changelog is not supported until FLINK-16998 is finished.
* <p>For example: (printRowKind is true)
* +-------------+-------------+---------+-------------+
* | row_kind | boolean_col | int_col | varchar_col |
* +-------------+-------------+---------+-------------+
* | +I | true | 1 | abc |
* | -U | false | 2 | def |
* | +U | false | 3 | def |
* | -D | (NULL) | (NULL) | (NULL) |
* +-------------+-------------+---------+-------------+
* 4 rows in result
*/
public static void printAsTableauForm(
TableSchema tableSchema,
Iterator<Row> it,
PrintWriter printWriter,
int maxColumnWidth,
String nullColumn) {
String nullColumn,
boolean printRowKind) {
List<String[]> rows = new ArrayList<>();

// fill field names first
List<TableColumn> columns = tableSchema.getTableColumns();
final List<TableColumn> columns;
if (printRowKind) {
columns = Stream.concat(
Stream.of(TableColumn.of("row_kind", DataTypes.STRING())),
tableSchema.getTableColumns().stream()
).collect(Collectors.toList());
} else {
columns = tableSchema.getTableColumns();
}

rows.add(columns.stream().map(TableColumn::getName).toArray(String[]::new));
while (it.hasNext()) {
rows.add(rowToString(it.next(), nullColumn));
rows.add(rowToString(it.next(), nullColumn, printRowKind));
}

int[] colWidths = columnWidthsByContent(columns, rows, maxColumnWidth);
Expand Down Expand Up @@ -124,20 +134,24 @@ public static void printAsTableauForm(
}

public static String[] rowToString(Row row) {
return rowToString(row, NULL_COLUMN);
return rowToString(row, NULL_COLUMN, false);
}

public static String[] rowToString(Row row, String nullColumn) {
final String[] fields = new String[row.getArity()];
public static String[] rowToString(Row row, String nullColumn, boolean printRowKind) {
final int len = printRowKind ? row.getArity() + 1 : row.getArity();
final List<String> fields = new ArrayList<>(len);
if (printRowKind) {
fields.add(row.getKind().shortString());
}
for (int i = 0; i < row.getArity(); i++) {
final Object field = row.getField(i);
if (field == null) {
fields[i] = nullColumn;
fields.add(nullColumn);
} else {
fields[i] = StringUtils.arrayAwareToString(field);
fields.add(StringUtils.arrayAwareToString(field));
}
}
return fields;
return fields.toArray(new String[0]);
}

private static int[] columnWidthsByContent(
Expand Down
Loading

0 comments on commit 127bb48

Please sign in to comment.