Skip to content

Commit

Permalink
[FLINK-30819][sql-client] Fix sql client print an empty line between …
Browse files Browse the repository at this point in the history
…the multi-line statements

This closes apache#21786
  • Loading branch information
fsk119 committed Feb 1, 2023
1 parent d657395 commit db82f82
Show file tree
Hide file tree
Showing 19 changed files with 449 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,12 @@
package org.apache.flink.table.client.cli;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.SqlParserEOFException;
import org.apache.flink.table.client.SqlClientException;
import org.apache.flink.table.client.cli.parser.Command;
import org.apache.flink.table.client.cli.parser.SqlCommandParserImpl;
import org.apache.flink.table.client.cli.parser.SqlMultiLineParser;
import org.apache.flink.table.client.config.SqlClientOptions;
import org.apache.flink.table.client.gateway.ClientResult;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
import org.apache.flink.table.client.gateway.SqlExecutionException;
import org.apache.flink.table.utils.print.PrintStyle;

import org.jline.reader.EndOfFileException;
import org.jline.reader.LineReader;
Expand All @@ -41,7 +34,6 @@
import org.jline.terminal.Terminal;
import org.jline.utils.AttributedStringBuilder;
import org.jline.utils.AttributedStyle;
import org.jline.utils.InfoCmp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,14 +49,6 @@
import java.nio.file.Path;
import java.util.function.Supplier;

import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_EXECUTE_STATEMENT;
import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_FINISH_STATEMENT;
import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_STATEMENT_SUBMITTED;
import static org.apache.flink.table.client.cli.CliStrings.MESSAGE_SUBMITTING_STATEMENT;
import static org.apache.flink.table.client.config.ResultMode.TABLEAU;
import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;

/** SQL CLI client. */
public class CliClient implements AutoCloseable {

Expand All @@ -91,8 +75,6 @@ public class CliClient implements AutoCloseable {

private boolean isRunning;

private final SqlMultiLineParser parser;

/**
* Creates a CLI instance with a custom terminal. Make sure to close the CLI instance afterwards
* using {@link #close()}.
Expand All @@ -107,7 +89,6 @@ public CliClient(
this.executor = executor;
this.inputTransformer = inputTransformer;
this.historyFilePath = historyFilePath;
this.parser = new SqlMultiLineParser(new SqlCommandParserImpl());
}

/**
Expand Down Expand Up @@ -160,7 +141,8 @@ public boolean executeInitialization(String content) {

// --------------------------------------------------------------------------------------------

enum ExecutionMode {
/** Mode of the execution. */
public enum ExecutionMode {
INTERACTIVE_EXECUTION,

NON_INTERACTIVE_EXECUTION,
Expand All @@ -182,52 +164,46 @@ private void executeInteractive() {
// print welcome
terminal.writer().append(CliStrings.MESSAGE_WELCOME);

LineReader lineReader = createLineReader(terminal, true);
getAndExecuteStatements(lineReader, ExecutionMode.INTERACTIVE_EXECUTION);
LineReader lineReader = createLineReader(terminal, ExecutionMode.INTERACTIVE_EXECUTION);
getAndExecuteStatements(lineReader, false);
}

private boolean getAndExecuteStatements(LineReader lineReader, ExecutionMode mode) {
private boolean getAndExecuteStatements(LineReader lineReader, boolean exitOnFailure) {
// begin reading loop
boolean exitOnFailure = !mode.equals(ExecutionMode.INTERACTIVE_EXECUTION);
isRunning = true;
String buffer = "";
String prompt = NEWLINE_PROMPT;
String line = "";
String line;

// make some space to previous command
terminal.writer().append("\n");
SqlMultiLineParser parser = (SqlMultiLineParser) lineReader.getParser();
while (isRunning) {
// make some space to previous command
terminal.writer().append("\n");
terminal.flush();
try {
// read a statement from terminal and parse it
line = lineReader.readLine(prompt, null, inputTransformer, null);
line = lineReader.readLine(NEWLINE_PROMPT, null, inputTransformer, null);
if (line.trim().isEmpty()) {
continue;
}
if (parser.getStatementType().isPresent()) {
line = buffer + line;
boolean success = executeStatement(line, mode);
if (exitOnFailure && !success) {
return false;
}

Printer printer = parser.getPrinter();
boolean success = print(printer);
if (exitOnFailure && !success) {
return false;
}
} catch (UserInterruptException e) {
// user cancelled line with Ctrl+C
} catch (EndOfFileException | IOError e) {
// user cancelled application with Ctrl+D or kill
break;
} catch (SqlParserEOFException e) {
prompt = null;
buffer = line;
continue;
} catch (SqlExecutionException e) {
// print the detailed information on about the parse errors in the terminal.
printExecutionException(e);
if (exitOnFailure) {
return false;
}
} catch (Throwable t) {
throw new SqlClientException("Could not read from command line.", t);
}
// clear the buffer
buffer = "";
prompt = NEWLINE_PROMPT;
// make some space to previous command
terminal.writer().append("\n");
terminal.flush();
}
// finish all statements.
return true;
Expand All @@ -247,25 +223,23 @@ private boolean executeFile(String content, OutputStream outputStream, Execution
SqlMultiLineParser.formatSqlFile(content).getBytes());
Terminal dumbTerminal =
TerminalUtils.createDumbTerminal(inputStream, outputStream)) {
LineReader lineReader = createLineReader(dumbTerminal, false);
return getAndExecuteStatements(lineReader, mode);
LineReader lineReader = createLineReader(dumbTerminal, mode);
return getAndExecuteStatements(lineReader, true);
} catch (Throwable e) {
printExecutionException(e);
return false;
}
}

private boolean executeStatement(String statement, ExecutionMode executionMode) {
private boolean print(Printer printer) {
try {
final Thread thread = Thread.currentThread();
final Terminal.SignalHandler previousHandler =
terminal.handle(Terminal.Signal.INT, (signal) -> thread.interrupt());
try {
if (executionMode == ExecutionMode.INITIALIZATION) {
executor.configureSession(statement);
printInfo(MESSAGE_EXECUTE_STATEMENT);
} else {
executeInExecutionMode(statement, executionMode);
printer.print(terminal);
if (printer.isQuitCommand()) {
isRunning = false;
}
} finally {
terminal.handle(Terminal.Signal.INT, previousHandler);
Expand All @@ -277,129 +251,6 @@ private boolean executeStatement(String statement, ExecutionMode executionMode)
return true;
}

private void executeInExecutionMode(String statement, ExecutionMode executionMode) {
Command command =
parser.getStatementType()
.orElseThrow(
() -> new SqlExecutionException("The statement should be parsed."));
switch (command) {
case QUIT:
callQuit();
break;
case CLEAR:
callClear();
break;
case HELP:
callHelp();
break;
default:
call(statement, executionMode);
}
}

// --------------------------------------------------------------------------------------------
// Call command
// --------------------------------------------------------------------------------------------

private void callQuit() {
printInfo(CliStrings.MESSAGE_QUIT);
isRunning = false;
}

private void callClear() {
if (TerminalUtils.isPlainTerminal(terminal)) {
for (int i = 0; i < 200; i++) { // large number of empty lines
terminal.writer().println();
}
} else {
terminal.puts(InfoCmp.Capability.clear_screen);
}
}

private void callHelp() {
terminal.writer().println(CliStrings.MESSAGE_HELP);
terminal.flush();
}

private void call(String statement, ExecutionMode executionMode) {
try (ClientResult result = executor.executeStatement(statement)) {
if (result.isQueryResult()) {
printQuery(result, executionMode);
} else if (result.getJobId() != null) {
printJob(result.getJobId());
} else {
print(result);
}
}
}

// --------------------------------------------------------------------------------------------
// Print results
// --------------------------------------------------------------------------------------------

private void printQuery(ClientResult queryResult, ExecutionMode executionMode) {
final ResultDescriptor resultDesc =
new ResultDescriptor(queryResult, executor.getSessionConfig());
if (executionMode.equals(ExecutionMode.NON_INTERACTIVE_EXECUTION)
&& !resultDesc.isTableauMode()) {
throw new SqlExecutionException(
String.format(
"In non-interactive mode, it only supports to use %s as value of %s when execute query. Please add 'SET %s=%s;' in the sql file.",
TABLEAU,
EXECUTION_RESULT_MODE.key(),
EXECUTION_RESULT_MODE.key(),
TABLEAU));
}

if (resultDesc.isTableauMode()) {
try (CliTableauResultView tableauResultView =
new CliTableauResultView(terminal, resultDesc)) {
tableauResultView.displayResults();
}
} else {
final CliResultView<?> view;
if (resultDesc.isMaterialized()) {
view = new CliTableResultView(terminal, resultDesc);
} else {
view = new CliChangelogResultView(terminal, resultDesc);
}

// enter view
view.open();

// view left
printInfo(CliStrings.MESSAGE_RESULT_QUIT);
}
}

private void printJob(JobID jobID) {
if (executor.getSessionConfig().get(TABLE_DML_SYNC)) {
printInfo(MESSAGE_FINISH_STATEMENT);
} else {
terminal.writer()
.println(CliStrings.messageInfo(MESSAGE_SUBMITTING_STATEMENT).toAnsi());
terminal.writer().println(CliStrings.messageInfo(MESSAGE_STATEMENT_SUBMITTED).toAnsi());
terminal.writer().println(String.format("Job ID: %s\n", jobID));
terminal.flush();
}
}

private void print(ClientResult result) {
if (result.getResultKind() == ResultKind.SUCCESS) {
// print more meaningful message than tableau OK result
printInfo(MESSAGE_EXECUTE_STATEMENT);
} else {
// print tableau if result has content
PrintStyle.tableauWithDataInferredColumnWidths(
result.getResultSchema(),
result.getRowDataToStringConverter(),
Integer.MAX_VALUE,
true,
false)
.print(result, terminal.writer());
}
}

// --------------------------------------------------------------------------------------------
// Utils
// --------------------------------------------------------------------------------------------
Expand All @@ -412,11 +263,6 @@ private void printExecutionException(Throwable t) {
terminal.flush();
}

private void printInfo(String message) {
terminal.writer().println(CliStrings.messageInfo(message).toAnsi());
terminal.flush();
}

private void closeTerminal() {
try {
terminal.close();
Expand All @@ -426,15 +272,18 @@ private void closeTerminal() {
}
}

private LineReader createLineReader(Terminal terminal, boolean enableSqlCompleter) {
private LineReader createLineReader(Terminal terminal, ExecutionMode mode) {
SqlMultiLineParser parser =
new SqlMultiLineParser(new SqlCommandParserImpl(), executor, mode);

// initialize line lineReader
LineReaderBuilder builder =
LineReaderBuilder.builder()
.terminal(terminal)
.appName(CliStrings.CLI_NAME)
.parser(parser);

if (enableSqlCompleter) {
if (mode == ExecutionMode.INTERACTIVE_EXECUTION) {
builder.completer(new SqlCompleter(executor));
}
LineReader lineReader = builder.build();
Expand Down
Loading

0 comments on commit db82f82

Please sign in to comment.