Skip to content

Commit

Permalink
[FLINK-31223][test] Abstract SqlClientTestBase.
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Apr 25, 2024
1 parent 63f02a0 commit 6392f81
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,49 +19,30 @@
package org.apache.flink.table.client;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.client.cli.TerminalUtils;
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointExtension;
import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;

import org.jline.terminal.Size;
import org.jline.terminal.Terminal;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;
import static org.apache.flink.configuration.DeploymentOptions.TARGET;
import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link SqlClient}. */
class SqlClientTest {

@TempDir private Path tempFolder;
class SqlClientTest extends SqlClientTestBase {

@RegisterExtension
@Order(1)
Expand All @@ -78,34 +59,6 @@ class SqlClientTest {
private static final SqlGatewayRestEndpointExtension SQL_GATEWAY_REST_ENDPOINT_EXTENSION =
new SqlGatewayRestEndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);

private Map<String, String> originalEnv;

private String historyPath;

@BeforeEach
void before() throws IOException {
originalEnv = System.getenv();

// prepare conf dir
File confFolder = Files.createTempDirectory(tempFolder, "conf").toFile();
File confYaml = new File(confFolder, "config.yaml");
if (!confYaml.createNewFile()) {
throw new IOException("Can't create testing config.yaml file.");
}

// adjust the test environment for the purposes of this test
Map<String, String> map = new HashMap<>(System.getenv());
map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
CommonTestUtils.setEnv(map);

historyPath = Files.createTempFile(tempFolder, "history", "").toFile().getPath();
}

@AfterEach
void after() {
CommonTestUtils.setEnv(originalEnv);
}

@Test
void testEmbeddedWithOptions() throws Exception {
String[] args = new String[] {"embedded", "-hist", historyPath};
Expand Down Expand Up @@ -332,42 +285,4 @@ private void runTestCliHelp(String[] args, String expected) throws Exception {
.toURI())));
assertThat(runSqlClient(args)).isEqualTo(actual);
}

private String runSqlClient(String[] args) throws Exception {
return runSqlClient(args, "QUIT;\n", false);
}

private String runSqlClient(String[] args, String statements, boolean printInput)
throws Exception {
try (OutputStream out = new ByteArrayOutputStream();
Terminal terminal =
TerminalUtils.createDumbTerminal(
new ByteArrayInputStream(
statements.getBytes(StandardCharsets.UTF_8)),
out)) {
if (printInput) {
// The default terminal has an empty size. Here increase the terminal to allow
// the line reader print the input string.
terminal.setSize(new Size(160, 80));
}
SqlClient.startClient(args, () -> terminal);
return out.toString().replace("\r\n", System.lineSeparator());
}
}

private String createSqlFile(List<String> statements, String name) throws IOException {
// create sql file
File sqlFileFolder = Files.createTempDirectory(tempFolder, "sql-file").toFile();
File sqlFile = new File(sqlFileFolder, name);
if (!sqlFile.createNewFile()) {
throw new IOException(String.format("Can't create testing %s.", name));
}
String sqlFilePath = sqlFile.getPath();
Files.write(
Paths.get(sqlFilePath),
statements,
StandardCharsets.UTF_8,
StandardOpenOption.APPEND);
return sqlFilePath;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.client;

import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.client.cli.TerminalUtils;

import org.jline.terminal.Size;
import org.jline.terminal.Terminal;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.flink.configuration.ConfigConstants.ENV_FLINK_CONF_DIR;

/** Base class for test {@link SqlClient}. */
class SqlClientTestBase {
@TempDir private Path tempFolder;

protected String historyPath;

protected Map<String, String> originalEnv;

@BeforeEach
void before() throws IOException {
originalEnv = System.getenv();

// prepare conf dir
File confFolder = Files.createTempDirectory(tempFolder, "conf").toFile();
File confYaml = new File(confFolder, "config.yaml");
if (!confYaml.createNewFile()) {
throw new IOException("Can't create testing config.yaml file.");
}
writeConfigOptionsToConfYaml(confYaml.toPath());
// adjust the test environment for the purposes of this test
Map<String, String> map = new HashMap<>(System.getenv());
map.put(ENV_FLINK_CONF_DIR, confFolder.getAbsolutePath());
CommonTestUtils.setEnv(map);

historyPath = Files.createTempFile(tempFolder, "history", "").toFile().getPath();
}

@AfterEach
void after() {
CommonTestUtils.setEnv(originalEnv);
}

protected String createSqlFile(List<String> statements, String name) throws IOException {
// create sql file
File sqlFileFolder = Files.createTempDirectory(tempFolder, "sql-file").toFile();
File sqlFile = new File(sqlFileFolder, name);
if (!sqlFile.createNewFile()) {
throw new IOException(String.format("Can't create testing %s.", name));
}
String sqlFilePath = sqlFile.getPath();
Files.write(
Paths.get(sqlFilePath),
statements,
StandardCharsets.UTF_8,
StandardOpenOption.APPEND);
return sqlFilePath;
}

public static String runSqlClient(String[] args) throws Exception {
return runSqlClient(args, "QUIT;\n", false);
}

public static String runSqlClient(String[] args, String statements, boolean printInput)
throws Exception {
try (OutputStream out = new ByteArrayOutputStream();
Terminal terminal =
TerminalUtils.createDumbTerminal(
new ByteArrayInputStream(
statements.getBytes(StandardCharsets.UTF_8)),
out)) {
if (printInput) {
// The default terminal has an empty size. Here increase the terminal to allow
// the line reader print the input string.
terminal.setSize(new Size(160, 80));
}
SqlClient.startClient(args, () -> terminal);
return out.toString().replace("\r\n", System.lineSeparator());
}
}

protected void writeConfigOptionsToConfYaml(Path confYamlPath) throws IOException {
// no-op for default.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.function.Consumer;
import java.util.function.Supplier;

import static org.apache.flink.table.gateway.rest.util.SqlGatewayRestEndpointTestUtils.getBaseConfig;
Expand All @@ -41,6 +42,8 @@ public class SqlGatewayRestEndpointExtension implements BeforeAllCallback, After

private final Supplier<SqlGatewayService> serviceSupplier;

private final Consumer<Configuration> flinkConfConsumer;

private SqlGatewayRestEndpoint sqlGatewayRestEndpoint;
private SqlGatewayService sqlGatewayService;
private String targetAddress;
Expand All @@ -59,13 +62,22 @@ public SqlGatewayService getSqlGatewayService() {
}

public SqlGatewayRestEndpointExtension(Supplier<SqlGatewayService> serviceSupplier) {
this(serviceSupplier, (conf) -> {});
}

public SqlGatewayRestEndpointExtension(
Supplier<SqlGatewayService> serviceSupplier,
Consumer<Configuration> flinkConfConsumer) {
this.serviceSupplier = serviceSupplier;
this.flinkConfConsumer = flinkConfConsumer;
}

@Override
public void beforeAll(ExtensionContext context) {
String address = InetAddress.getLoopbackAddress().getHostAddress();
Configuration config = getBaseConfig(getFlinkConfig(address, address, "0"));
Configuration flinkConfig = getFlinkConfig(address, address, "0");
flinkConfConsumer.accept(flinkConfig);
Configuration config = getBaseConfig(flinkConfig);

try {
sqlGatewayService = serviceSupplier.get();
Expand Down

0 comments on commit 6392f81

Please sign in to comment.