Skip to content

Commit

Permalink
Flink: Backport apache#9078 to v1.16 and v1.15 (apache#9151)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgpoh authored Nov 27, 2023
1 parent 1a073dd commit f246614
Show file tree
Hide file tree
Showing 6 changed files with 388 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;

public class MiniFlinkClusterExtension {

private static final int DEFAULT_TM_NUM = 1;
private static final int DEFAULT_PARALLELISM = 4;

public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
new Configuration()
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);

private MiniFlinkClusterExtension() {}

/**
* It will start a mini cluster with classloader.check-leaked-classloader=false, so that we won't
* break the unit tests because of the class loader leak issue. In our iceberg integration tests,
* there're some that will assert the results after finished the flink jobs, so actually we may
* access the class loader that has been closed by the flink task managers if we enable the switch
* classloader.check-leaked-classloader by default.
*/
public static MiniClusterExtension createWithClassloaderCheckDisabled() {
return new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(DEFAULT_TM_NUM)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());
}
}
130 changes: 130 additions & 0 deletions flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/TestBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import java.nio.file.Path;
import java.util.List;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.hive.TestHiveMetastore;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

public abstract class TestBase extends TestBaseUtils {

@RegisterExtension
public static MiniClusterExtension miniClusterResource =
MiniFlinkClusterExtension.createWithClassloaderCheckDisabled();

@TempDir Path temporaryDirectory;

private static TestHiveMetastore metastore = null;
protected static HiveConf hiveConf = null;
protected static HiveCatalog catalog = null;

private volatile TableEnvironment tEnv = null;

@BeforeAll
public static void startMetastore() {
TestBase.metastore = new TestHiveMetastore();
metastore.start();
TestBase.hiveConf = metastore.hiveConf();
TestBase.catalog =
(HiveCatalog)
CatalogUtil.loadCatalog(
HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf);
}

@AfterAll
public static void stopMetastore() throws Exception {
metastore.stop();
TestBase.catalog = null;
}

protected TableEnvironment getTableEnv() {
if (tEnv == null) {
synchronized (this) {
if (tEnv == null) {
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build();

TableEnvironment env = TableEnvironment.create(settings);
env.getConfig()
.getConfiguration()
.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, false);
tEnv = env;
}
}
}
return tEnv;
}

protected static TableResult exec(TableEnvironment env, String query, Object... args) {
return env.executeSql(String.format(query, args));
}

protected TableResult exec(String query, Object... args) {
return exec(getTableEnv(), query, args);
}

protected List<Row> sql(String query, Object... args) {
TableResult tableResult = exec(query, args);
try (CloseableIterator<Row> iter = tableResult.collect()) {
return Lists.newArrayList(iter);
} catch (Exception e) {
throw new RuntimeException("Failed to collect table result", e);
}
}

protected void assertSameElements(Iterable<Row> expected, Iterable<Row> actual) {
Assertions.assertThat(actual).isNotNull().containsExactlyInAnyOrderElementsOf(expected);
}

protected void assertSameElements(String message, Iterable<Row> expected, Iterable<Row> actual) {
Assertions.assertThat(actual)
.isNotNull()
.as(message)
.containsExactlyInAnyOrderElementsOf(expected);
}

/**
* We can not drop currently used catalog after FLINK-29677, so we have make sure that we do not
* use the current catalog before dropping it. This method switches to the 'default_catalog' and
* drops the one requested.
*
* @param catalogName The catalog to drop
* @param ifExists If we should use the 'IF EXISTS' when dropping the catalog
*/
protected void dropCatalog(String catalogName, boolean ifExists) {
sql("USE CATALOG default_catalog");
sql("DROP CATALOG %s %s", ifExists ? "IF EXISTS" : "", catalogName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,33 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/** Test for {@link TableLoader}. */
public class TestCatalogTableLoader extends FlinkTestBase {
public class TestCatalogTableLoader extends TestBase {

private static File warehouse = null;
private static final TableIdentifier IDENTIFIER = TableIdentifier.of("default", "my_table");
private static final Schema SCHEMA =
new Schema(Types.NestedField.required(1, "f1", Types.StringType.get()));

@BeforeClass
@BeforeAll
public static void createWarehouse() throws IOException {
warehouse = File.createTempFile("warehouse", null);
Assert.assertTrue(warehouse.delete());
Assertions.assertThat(warehouse.delete()).isTrue();
hiveConf.set("my_key", "my_value");
}

@AfterClass
@AfterAll
public static void dropWarehouse() throws IOException {
if (warehouse != null && warehouse.exists()) {
Path warehousePath = new Path(warehouse.getAbsolutePath());
FileSystem fs = warehousePath.getFileSystem(hiveConf);
Assert.assertTrue("Failed to delete " + warehousePath, fs.delete(warehousePath, true));
Assertions.assertThat(fs.delete(warehousePath, true))
.as("Failed to delete " + warehousePath)
.isTrue();
}
}

Expand Down Expand Up @@ -97,7 +98,7 @@ private static void validateHadoopConf(Table table) {
.as("FileIO should be a HadoopFileIO")
.isInstanceOf(HadoopFileIO.class);
HadoopFileIO hadoopIO = (HadoopFileIO) io;
Assert.assertEquals("my_value", hadoopIO.conf().get("my_key"));
Assertions.assertThat(hadoopIO.conf().get("my_key")).isEqualTo("my_value");
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;

public class MiniFlinkClusterExtension {

private static final int DEFAULT_TM_NUM = 1;
private static final int DEFAULT_PARALLELISM = 4;

public static final Configuration DISABLE_CLASSLOADER_CHECK_CONFIG =
new Configuration()
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);

private MiniFlinkClusterExtension() {}

/**
* It will start a mini cluster with classloader.check-leaked-classloader=false, so that we won't
* break the unit tests because of the class loader leak issue. In our iceberg integration tests,
* there're some that will assert the results after finished the flink jobs, so actually we may
* access the class loader that has been closed by the flink task managers if we enable the switch
* classloader.check-leaked-classloader by default.
*/
public static MiniClusterExtension createWithClassloaderCheckDisabled() {
return new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(DEFAULT_TM_NUM)
.setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());
}
}
Loading

0 comments on commit f246614

Please sign in to comment.