Skip to content

Commit

Permalink
[FLINK-35195][test/test-filesystem] test-filesystem Catalog support c…
Browse files Browse the repository at this point in the history
…reate generic table
  • Loading branch information
lsyldliu committed May 7, 2024
1 parent ea4112a commit 84f0632
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import org.apache.flink.connector.file.table.FileSystemTableFactory;
import org.apache.flink.connector.file.table.TestFileSystemTableSource;
import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog;

import java.util.Collections;

/** Test filesystem {@link Factory}. */
@Internal
Expand All @@ -40,9 +44,21 @@ public String factoryIdentifier() {

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final boolean isFileSystemTable =
TestFileSystemCatalog.isFileSystemTable(context.getCatalogTable().getOptions());
if (!isFileSystemTable) {
return FactoryUtil.createDynamicTableSource(
null,
context.getObjectIdentifier(),
context.getCatalogTable(),
Collections.emptyMap(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
}

FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
validate(helper);

return new TestFileSystemTableSource(
context.getObjectIdentifier(),
context.getPhysicalRowDataType(),
Expand All @@ -51,4 +67,21 @@ public DynamicTableSource createDynamicTableSource(Context context) {
discoverDecodingFormat(context, BulkReaderFormatFactory.class),
discoverDecodingFormat(context, DeserializationFormatFactory.class));
}

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
final boolean isFileSystemTable =
TestFileSystemCatalog.isFileSystemTable(context.getCatalogTable().getOptions());
if (!isFileSystemTable) {
return FactoryUtil.createDynamicTableSink(
null,
context.getObjectIdentifier(),
context.getCatalogTable(),
Collections.emptyMap(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
}
return super.createDynamicTableSink(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.file.testutils.catalog;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
Expand Down Expand Up @@ -78,6 +79,8 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.flink.table.file.testutils.TestFileSystemTableFactory.IDENTIFIER;
import static org.apache.flink.util.Preconditions.checkArgument;

/** A catalog implementation for test {@link FileSystem}. */
Expand Down Expand Up @@ -276,6 +279,13 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
}
}

@Internal
public static boolean isFileSystemTable(Map<String, String> properties) {
String connector = properties.get(CONNECTOR.key());
return StringUtils.isNullOrWhitespaceOnly(connector)
|| IDENTIFIER.equalsIgnoreCase(connector);
}

@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
Path path = inferTablePath(catalogPathStr, tablePath);
Expand Down Expand Up @@ -346,7 +356,9 @@ public void createTable(
if (!fs.exists(path)) {
fs.mkdirs(path);
fs.mkdirs(tableSchemaPath);
fs.mkdirs(tableDataPath);
if (isFileSystemTable(catalogTable.getOptions())) {
fs.mkdirs(tableDataPath);
}
}

// write table schema
Expand Down Expand Up @@ -649,16 +661,20 @@ private CatalogBaseTable deserializeTable(
String tableDataPath) {
if (CatalogBaseTable.TableKind.TABLE == tableKind) {
CatalogTable catalogTable = CatalogPropertiesUtil.deserializeCatalogTable(properties);
// put table data path
Map<String, String> options = new HashMap<>(catalogTable.getOptions());
options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath);
if (isFileSystemTable(catalogTable.getOptions())) {
// put table data path
options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath);
}
return catalogTable.copy(options);
} else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind) {
CatalogMaterializedTable catalogMaterializedTable =
CatalogPropertiesUtil.deserializeCatalogMaterializedTable(properties);
// put table data path
Map<String, String> options = new HashMap<>(catalogMaterializedTable.getOptions());
options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath);
if (isFileSystemTable(catalogMaterializedTable.getOptions())) {
// put table data path
options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath);
}
return catalogMaterializedTable.copy(options);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;

import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -44,7 +46,6 @@ public void testReadAndWriteTestFileSystemTable(boolean isStreamingMode) throws
isStreamingMode
? EnvironmentSettings.inStreamingMode()
: EnvironmentSettings.inBatchMode());

tEnv.registerCatalog(TEST_CATALOG, catalog);
tEnv.useCatalog(TEST_CATALOG);

Expand Down Expand Up @@ -84,6 +85,80 @@ public void testReadAndWriteTestFileSystemTable(boolean isStreamingMode) throws
Row.of(1003L, "user3", "ciao", "2021-06-10 10:02:00"),
Row.of(1004L, "user4", "你好", "2021-06-10 10:03:00"));

assertThat(expected).isEqualTo((Lists.newArrayList(result)));
assertThat(Lists.newArrayList(result)).isEqualTo(expected);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testReadDatagenTable(boolean isStreamingMode) {
TableEnvironment tEnv =
TableEnvironment.create(
isStreamingMode
? EnvironmentSettings.inStreamingMode()
: EnvironmentSettings.inBatchMode());
tEnv.registerCatalog(TEST_CATALOG, catalog);
tEnv.useCatalog(TEST_CATALOG);

tEnv.executeSql(
"CREATE TABLE datagenSource (\n"
+ " id BIGINT,\n"
+ " user_name STRING,\n"
+ " message STRING,\n"
+ " log_ts STRING\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'number-of-rows' = '10'\n"
+ ")");

tEnv.getConfig().getConfiguration().setString("parallelism.default", "1");
CloseableIterator<Row> result =
tEnv.executeSql(
String.format(
"SELECT * FROM %s.%s.datagenSource",
TEST_CATALOG, TEST_DEFAULT_DATABASE))
.collect();

// assert query result size
assertThat(Lists.newArrayList(result).size()).isEqualTo(10);
}

@Test
public void testWriteTestValuesSinkTable() throws Exception {
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
tEnv.registerCatalog(TEST_CATALOG, catalog);
tEnv.useCatalog(TEST_CATALOG);

tEnv.executeSql(
"CREATE TABLE valueSink (\n"
+ " id BIGINT,\n"
+ " user_name STRING,\n"
+ " message STRING,\n"
+ " log_ts STRING\n"
+ ") WITH (\n"
+ " 'connector' = 'values'\n"
+ ")");

tEnv.getConfig().getConfiguration().setString("parallelism.default", "1");
tEnv.getConfig().getConfiguration().setString("parallelism.default", "1");
tEnv.executeSql(
String.format(
"INSERT INTO %s.%s.valueSink VALUES\n"
+ "(1001, 'user1', 'hello world', '2021-06-10 10:00:00'),\n"
+ "(1002, 'user2', 'hi', '2021-06-10 10:01:00'),\n"
+ "(1003, 'user3', 'ciao', '2021-06-10 10:02:00'),\n"
+ "(1004, 'user4', '你好', '2021-06-10 10:03:00')",
TEST_CATALOG, TEST_DEFAULT_DATABASE))
.await();

// assert query result size
List<Row> expected =
Arrays.asList(
Row.of(1001L, "user1", "hello world", "2021-06-10 10:00:00"),
Row.of(1002L, "user2", "hi", "2021-06-10 10:01:00"),
Row.of(1003L, "user3", "ciao", "2021-06-10 10:02:00"),
Row.of(1004L, "user4", "你好", "2021-06-10 10:03:00"));

List<Row> actual = TestValuesTableFactory.getRawResults("valueSink");
assertThat(actual).isEqualTo(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,44 @@ public void testCreateAndGetCatalogMaterializedTable() throws Exception {
() -> catalog.createTable(tablePath, EXPECTED_CATALOG_MATERIALIZED_TABLE, false));
}

@Test
public void testCreateAndGetGenericTable() throws Exception {
ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
// test create datagen table
Map<String, String> options = new HashMap<>();
options.put("connector", "datagen");
options.put("number-of-rows", "10");
ResolvedCatalogTable datagenResolvedTable =
new ResolvedCatalogTable(
CatalogTable.newBuilder()
.schema(CREATE_SCHEMA)
.comment("test generic table")
.options(options)
.build(),
CREATE_RESOLVED_SCHEMA);

catalog.createTable(tablePath, datagenResolvedTable, true);

// test table exist
assertThat(catalog.tableExists(tablePath)).isTrue();

// test get table
CatalogBaseTable actualTable = catalog.getTable(tablePath);

// validate table type
assertThat(actualTable.getTableKind()).isEqualTo(CatalogBaseTable.TableKind.TABLE);
// validate schema
assertThat(actualTable.getUnresolvedSchema().resolve(new TestSchemaResolver()))
.isEqualTo(CREATE_RESOLVED_SCHEMA);
// validate options
assertThat(actualTable.getOptions()).isEqualTo(options);

// test create exist table
assertThrows(
TableAlreadyExistException.class,
() -> catalog.createTable(tablePath, datagenResolvedTable, false));
}

@Test
public void testListTable() throws Exception {
ObjectPath tablePath1 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
Expand Down

0 comments on commit 84f0632

Please sign in to comment.