Skip to content

Commit

Permalink
[FLINK-33726][sql-client] print time cost for streaming queries
Browse files Browse the repository at this point in the history
  • Loading branch information
JingGe committed Dec 7, 2023
1 parent b6380d5 commit 4eb5b58
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,7 @@ public void displayResults() throws SqlExecutionException {
resultFuture.get();
cleanUpQuery = false; // job finished successfully
} catch (CancellationException e) {
terminal.writer()
.println(
"Query terminated, received a total of "
+ receivedRowCount.get()
+ " "
+ getRowTerm(receivedRowCount));
terminal.flush();
printTerminatedFooter(receivedRowCount);
} catch (ExecutionException e) {
if (e.getCause() instanceof SqlExecutionException) {
throw (SqlExecutionException) e.getCause();
Expand All @@ -114,6 +108,27 @@ public void displayResults() throws SqlExecutionException {
}
}

private void printTerminatedFooter(AtomicInteger receivedRowCount) {
if (!resultDescriptor.isPrintQueryTimeCost()) {
terminal.writer()
.println(
"Query terminated, received a total of "
+ receivedRowCount.get()
+ " "
+ getRowTerm(receivedRowCount));
} else {
terminal.writer()
.println(
"Query terminated, received a total of "
+ receivedRowCount.get()
+ " "
+ getRowTerm(receivedRowCount)
+ calculateTimeCostInPrintFormat(
queryBeginTime, System.currentTimeMillis()));
}
terminal.flush();
}

@Override
public void close() {
this.displayResultExecutorService.shutdown();
Expand Down Expand Up @@ -188,7 +203,7 @@ private void printBatchFooter(long numRows) {
calculateTimeCostInPrintFormat(queryBeginTime, System.currentTimeMillis());
terminal.writer().println(numRows + " " + rowTerm + " in set" + timeCost);
}
terminal.writer().flush();
terminal.flush();
}

private void printStreamingResults(AtomicInteger receivedRowCount) {
Expand All @@ -200,11 +215,7 @@ private void printStreamingResults(AtomicInteger receivedRowCount) {
false,
true);

// print filed names
style.printBorderLine(terminal.writer());
style.printColumnNamesTableauRow(terminal.writer());
style.printBorderLine(terminal.writer());
terminal.flush();
printStreamingTableHeader(style);

while (true) {
final TypedResult<List<RowData>> result = collectResult.retrieveChanges();
Expand All @@ -223,14 +234,7 @@ private void printStreamingResults(AtomicInteger receivedRowCount) {
if (receivedRowCount.get() > 0) {
style.printBorderLine(terminal.writer());
}
String rowTerm = getRowTerm(receivedRowCount);
terminal.writer()
.println(
"Received a total of "
+ receivedRowCount.get()
+ " "
+ rowTerm);
terminal.flush();
printStreamingFooter(receivedRowCount);
return;
case PAYLOAD:
List<RowData> changes = result.getPayload();
Expand All @@ -248,6 +252,33 @@ private void printStreamingResults(AtomicInteger receivedRowCount) {
}
}

private void printStreamingTableHeader(TableauStyle style) {
// print filed names
style.printBorderLine(terminal.writer());
style.printColumnNamesTableauRow(terminal.writer());
style.printBorderLine(terminal.writer());
terminal.flush();
}

private void printStreamingFooter(AtomicInteger receivedRowCount) {
String rowTerm = getRowTerm(receivedRowCount);
if (!resultDescriptor.isPrintQueryTimeCost()) {
terminal.writer()
.println("Received a total of " + receivedRowCount.get() + " " + rowTerm);
} else {
String timeCost =
calculateTimeCostInPrintFormat(queryBeginTime, System.currentTimeMillis());
terminal.writer()
.println(
"Received a total of "
+ receivedRowCount.get()
+ " "
+ rowTerm
+ timeCost);
}
terminal.flush();
}

private List<RowData> waitBatchResults() {
List<RowData> resultRows = new ArrayList<>();
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,62 @@ void testStreamingResult() {
assertThat(collectResult.closed).isFalse();
}

@Test
void testStreamingResultWithDisplayingTimeCost() {
final Configuration testConfig = new Configuration();
testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);

ResultDescriptor resultDescriptor =
new ResultDescriptor(CliClientTestUtils.createTestClient(schema), testConfig);
TestChangelogResult collectResult = createNewTestChangelogResult();
CliTableauResultView view =
new CliTableauResultView(
terminal, resultDescriptor, collectResult, System.currentTimeMillis());
view.displayResults();

view.close();
// note: the expected result may look irregular because every CJK(Chinese/Japanese/Korean)
// character's
// width < 2 in IDE by default, every CJK character usually's width is 2, you can open this
// source file
// by vim or just cat the file to check the regular result.
assertThat(terminalOutput.toString())
.contains(
"+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ System.lineSeparator()
+ "| op | boolean | int | bigint | varchar | decimal(10, 5) | timestamp | binary |"
+ System.lineSeparator()
+ "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ System.lineSeparator()
+ "| +I | <NULL> | 1 | 2 | abc | 1.23000 | 2020-03-01 18:39:14.000000 | x'32333485365d737e' |"
+ System.lineSeparator()
+ "| -U | FALSE | <NULL> | 0 | | 1.00000 | 2020-03-01 18:39:14.100000 | x'649e207983' |"
+ System.lineSeparator()
+ "| +U | TRUE | 2147483647 | <NULL> | abcdefg | 12345.00000 | 2020-03-01 18:39:14.120000 | x'92e90102' |"
+ System.lineSeparator()
+ "| -D | FALSE | -2147483648 | 9223372036854775807 | <NULL> | 12345.06789 | 2020-03-01 18:39:14.123000 | x'32333485365d737e' |"
+ System.lineSeparator()
+ "| +I | TRUE | 100 | -9223372036854775808 | abcdefg111 | <NULL> | 2020-03-01 18:39:14.123456 | x'6e17fffe' |"
+ System.lineSeparator()
+ "| -D | <NULL> | -1 | -1 | abcdefghijklmnopqrstuvwxyz1... | -12345.06789 | <NULL> | <NULL> |"
+ System.lineSeparator()
+ "| +I | <NULL> | -1 | -1 | 这是一段中文 | -12345.06789 | 2020-03-04 18:39:14.000000 | x'fdfeff00010203' |"
+ System.lineSeparator()
+ "| -D | <NULL> | -1 | -1 | これは日本語をテストするた... | -12345.06789 | 2020-03-04 18:39:14.000000 | x'fdfeff00010203' |"
+ System.lineSeparator()
+ "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ System.lineSeparator()
+ "Received a total of 8 rows");

String[] outputLines = terminalOutput.toString().split("\\r?\\n");
assertThat(outputLines[outputLines.length - 1])
.matches("Received a total of 8 rows \\(\\d+\\.\\d{2} seconds\\)");

// Job is finished. Don't need to close the job manually.
assertThat(collectResult.closed).isFalse();
}

@Test
void testEmptyStreamingResult() {
final Configuration testConfig = new Configuration();
Expand Down Expand Up @@ -501,6 +557,41 @@ void testEmptyStreamingResult() {
assertThat(collectResult.closed).isFalse();
}

@Test
void testEmptyStreamingResultWithDisplayingTimeCost() {
final Configuration testConfig = new Configuration();
testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);

ResultDescriptor resultDescriptor =
new ResultDescriptor(CliClientTestUtils.createTestClient(schema), testConfig);
TestChangelogResult collectResult = new TestChangelogResult(TypedResult::endOfStream);

CliTableauResultView view =
new CliTableauResultView(
terminal, resultDescriptor, collectResult, System.currentTimeMillis());

view.displayResults();
view.close();

assertThat(terminalOutput.toString())
.contains(
"+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ System.lineSeparator()
+ "| op | boolean | int | bigint | varchar | decimal(10, 5) | timestamp | binary |"
+ System.lineSeparator()
+ "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ System.lineSeparator()
+ "Received a total of 0 row");

String[] outputLines = terminalOutput.toString().split("\\r?\\n");
assertThat(outputLines[outputLines.length - 1])
.matches("Received a total of 0 row \\(\\d+\\.\\d{2} seconds\\)");

// Job is finished. Don't need to close the job manually.
assertThat(collectResult.closed).isFalse();
}

@Test
void testCancelStreamingResult() throws Exception {
final Configuration testConfig = new Configuration();
Expand Down Expand Up @@ -554,6 +645,62 @@ void testCancelStreamingResult() throws Exception {
assertThat(collectResult.closed).isTrue();
}

@Test
void testCancelStreamingResultWithDisplayingTimeCost() throws Exception {
final Configuration testConfig = new Configuration();
testConfig.set(EXECUTION_RESULT_MODE, ResultMode.TABLEAU);
testConfig.set(RUNTIME_MODE, RuntimeExecutionMode.STREAMING);

TestChangelogResult collectResult =
new TestChangelogResult(
() ->
TypedResult.payload(
streamingData.subList(0, streamingData.size() / 2)),
TypedResult::empty);
ResultDescriptor resultDescriptor =
new ResultDescriptor(CliClientTestUtils.createTestClient(schema), testConfig);
CliTableauResultView view =
new CliTableauResultView(
terminal, resultDescriptor, collectResult, System.currentTimeMillis());

// submit result display in another thread
Future<?> furture = EXECUTOR_RESOURCE.getExecutor().submit(view::displayResults);

// wait until we processed first result
CommonTestUtils.waitUntilCondition(() -> collectResult.fetchCount.get() > 1, 50L);

// send signal to cancel
terminal.raise(Terminal.Signal.INT);
furture.get(10, TimeUnit.SECONDS);
view.close();

assertThat(terminalOutput.toString())
.contains(
"+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ System.lineSeparator()
+ "| op | boolean | int | bigint | varchar | decimal(10, 5) | timestamp | binary |"
+ System.lineSeparator()
+ "+----+---------+-------------+----------------------+--------------------------------+----------------+----------------------------+--------------------------------+"
+ System.lineSeparator()
+ "| +I | <NULL> | 1 | 2 | abc | 1.23000 | 2020-03-01 18:39:14.000000 | x'32333485365d737e' |"
+ System.lineSeparator()
+ "| -U | FALSE | <NULL> | 0 | | 1.00000 | 2020-03-01 18:39:14.100000 | x'649e207983' |"
+ System.lineSeparator()
+ "| +U | TRUE | 2147483647 | <NULL> | abcdefg | 12345.00000 | 2020-03-01 18:39:14.120000 | x'92e90102' |"
+ System.lineSeparator()
+ "| -D | FALSE | -2147483648 | 9223372036854775807 | <NULL> | 12345.06789 | 2020-03-01 18:39:14.123000 | x'32333485365d737e' |"
+ System.lineSeparator()
+ "Query terminated, received a total of 4 rows");

String[] outputLines = terminalOutput.toString().split("\\r?\\n");
assertThat(outputLines[outputLines.length - 1])
.matches(
"Query terminated, received a total of 4 rows \\(\\d+\\.\\d{2} seconds\\)");

// close job manually
assertThat(collectResult.closed).isTrue();
}

@Test
void testFailedStreamingResult() {
final Configuration testConfig = new Configuration();
Expand Down
4 changes: 4 additions & 0 deletions flink-table/flink-sql-client/src/test/resources/sql/set.q
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,10 @@ SET 'sql-client.execution.result-mode' = 'tableau';
[INFO] Execute statement succeed.
!info
SET 'sql-client.display.print-time-cost' = 'false';
[INFO] Execute statement succeed.
!info
create function func1 as 'LowerUDF' LANGUAGE JAVA;
[INFO] Execute statement succeed.
!info
Expand Down

0 comments on commit 4eb5b58

Please sign in to comment.