Skip to content

Commit

Permalink
[FLINK-27606][table] Fix CompileException when using UDAF with merge(…
Browse files Browse the repository at this point in the history
…) method

This closes apache#19715
  • Loading branch information
lsyldliu authored Jun 8, 2022
1 parent f011e52 commit 24cb706
Show file tree
Hide file tree
Showing 8 changed files with 492 additions and 15 deletions.
122 changes: 122 additions & 0 deletions flink-end-to-end-tests/flink-end-to-end-tests-sql/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-end-to-end-tests</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.16-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-end-to-end-tests-sql</artifactId>
<name>Flink : E2E Tests : SQL</name>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-end-to-end-tests-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>compile</scope>
</dependency>

<!-- The following dependencies are for connector/format sql-jars that
we copy using the maven-dependency-plugin. When extending the test
to cover more connectors/formats, add a dependency here and an entry
to the dependency-plugin configuration below.
This ensures that all modules we actually need (as defined by the
dependency-plugin configuration) are built before this module. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client-test</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy</id>
<phase>pre-integration-test</phase>
<goals>
<goal>copy</goal>
</goals>
</execution>
</executions>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-client-test</artifactId>
<version>${project.version}</version>
<destFileName>SqlToolbox.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
</artifactItem>
</artifactItems>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<executions>
<execution>
<id>integration-tests</id>
<phase>integration-test</phase>
<goals>
<goal>test</goal>
</goals>
<configuration>
<includes>
<include>**/*ITCase.*</include>
</includes>
<!-- override reuseForks to true to reduce testing time -->
<reuseForks>true</reuseForks>
<systemPropertyVariables>
<rootDir>${project.basedir}/../../</rootDir>
<moduleDir>${project.basedir}</moduleDir>
</systemPropertyVariables>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.sql.codegen;

import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.cache.DownloadCache;
import org.apache.flink.tests.util.flink.ClusterController;
import org.apache.flink.tests.util.flink.FlinkResource;
import org.apache.flink.tests.util.flink.FlinkResourceSetup;
import org.apache.flink.tests.util.flink.LocalStandaloneFlinkResourceFactory;
import org.apache.flink.tests.util.flink.SQLJobSubmission;
import org.apache.flink.util.TestLogger;

import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

/**
* End to End tests for table planner scala-free since 1.15. Due to scala-free of table planner
* introduced, the class in table planner is not visible in distribution runtime, if we use these
* class in execution time, ClassNotFound exception will be thrown. ITCase in table planner can not
* cover it, so we should add E2E test for these case.
*/
@RunWith(Parameterized.class)
public class PlannerScalaFreeITCase extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(PlannerScalaFreeITCase.class);

private static final String SCALA_FREE_E2E_SQL = "scala_free_e2e.sql";

@Parameterized.Parameters(name = "executionMode")
public static Collection<String> data() {
return Arrays.asList("streaming", "batch");
}

private static Configuration getConfiguration() {
// we have to enable checkpoint to trigger flushing for filesystem sink
final Configuration flinkConfig = new Configuration();
flinkConfig.setString("execution.checkpointing.interval", "5s");
return flinkConfig;
}

@Rule
public final FlinkResource flink =
new LocalStandaloneFlinkResourceFactory()
.create(
FlinkResourceSetup.builder()
.addConfiguration(getConfiguration())
.build());

@Rule public final TemporaryFolder tmp = new TemporaryFolder();

private final String executionMode;
private Path result;

@ClassRule public static final DownloadCache DOWNLOAD_CACHE = DownloadCache.get();

private static final Path sqlToolBoxJar = TestUtils.getResource(".*SqlToolbox.jar");

public PlannerScalaFreeITCase(String executionMode) {
this.executionMode = executionMode;
}

@Before
public void before() throws Exception {
DOWNLOAD_CACHE.before();
Path tmpPath = tmp.getRoot().toPath();
LOG.info("The current temporary path: {}", tmpPath);
this.result = tmpPath.resolve("result");
}

@Test
public void testImperativeUdaf() throws Exception {
try (ClusterController clusterController = flink.startCluster(1)) {
// Initialize the SQL statements from "scala_free_e2e.sql" file
Map<String, String> varsMap = new HashMap<>();
varsMap.put("$RESULT", this.result.toAbsolutePath().toString());
varsMap.put("$MODE", this.executionMode);

List<String> sqlLines = initializeSqlLines(varsMap);

// Execute SQL statements in "scala_free_e2e.sql" file
executeSqlStatements(clusterController, sqlLines);

// Wait until all the results flushed to the json file.
LOG.info("Verify the json result.");
checkJsonResultFile();
LOG.info("The codegen SQL client test run successfully.");
}
}

private void executeSqlStatements(ClusterController clusterController, List<String> sqlLines)
throws IOException {
LOG.info("Executing end-to-end SQL statements {}.", sqlLines);
clusterController.submitSQLJob(
new SQLJobSubmission.SQLJobSubmissionBuilder(sqlLines)
.addJar(sqlToolBoxJar)
.build(),
Duration.ofMinutes(2L));
}

private List<String> initializeSqlLines(Map<String, String> vars) throws IOException {
URL url = PlannerScalaFreeITCase.class.getClassLoader().getResource(SCALA_FREE_E2E_SQL);
if (url == null) {
throw new FileNotFoundException(SCALA_FREE_E2E_SQL);
}

List<String> lines = Files.readAllLines(new File(url.getFile()).toPath());
List<String> result = new ArrayList<>();
for (String line : lines) {
for (Map.Entry<String, String> var : vars.entrySet()) {
line = line.replace(var.getKey(), var.getValue());
}
result.add(line);
}

return result;
}

private void checkJsonResultFile() throws Exception {
boolean success = false;
final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(20));
List<String> lines = null;
while (deadline.hasTimeLeft()) {
if (Files.exists(result)) {
lines = readJsonResultFiles(result);
if (lines.size() == 2) {
success = true;
assertThat(
lines.toArray(new String[0]),
arrayContainingInAnyOrder(
"{\"before\":null,\"after\":{\"user_name\":\"Alice\",\"order_cnt\":1},\"op\":\"c\"}",
"{\"before\":null,\"after\":{\"user_name\":\"Bob\",\"order_cnt\":2},\"op\":\"c\"}"));
break;
} else {
LOG.info(
"The target Json {} does not contain enough records, current {} records, left time: {}s",
result,
lines.size(),
deadline.timeLeft().getSeconds());
}
} else {
LOG.info("The target Json {} does not exist now", result);
}
Thread.sleep(500);
}
assertTrue(
String.format(
"Did not get expected results before timeout, actual result: %s.", lines),
success);
}

private static List<String> readJsonResultFiles(Path path) throws IOException {
File filePath = path.toFile();
// list all the non-hidden files
File[] csvFiles = filePath.listFiles((dir, name) -> !name.startsWith("."));
List<String> result = new ArrayList<>();
if (csvFiles != null) {
for (File file : csvFiles) {
result.addAll(Files.readAllLines(file.toPath()));
}
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger

appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n

# Uncomment to enable codegen logging
#loggers = testlogger
#logger.testlogger.name =org.apache.flink.table.planner.codegen
#logger.testlogger.level = TRACE
#logger.testlogger.appenderRefs = TestLogger
Loading

0 comments on commit 24cb706

Please sign in to comment.