Skip to content

Commit

Permalink
[FLINK-32749][gateway] Support config default catalog for catalog sto…
Browse files Browse the repository at this point in the history
…re in gateway (apache#23317)
  • Loading branch information
FangYongs authored Sep 1, 2023
1 parent 9c6b34d commit 1797a70
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.catalog.CatalogStoreHolder;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
import org.apache.flink.table.gateway.api.endpoint.EndpointVersion;
import org.apache.flink.table.gateway.api.session.SessionEnvironment;
Expand Down Expand Up @@ -331,7 +331,13 @@ private static CatalogManager buildCatalogManager(
CatalogStoreFactory.Context catalogStoreFactoryContext =
TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, userClassLoader);
catalogStoreFactory.open(catalogStoreFactoryContext);
CatalogStore catalogStore = catalogStoreFactory.createCatalogStore();
CatalogStoreHolder catalogStore =
CatalogStoreHolder.newBuilder()
.catalogStore(catalogStoreFactory.createCatalogStore())
.classloader(userClassLoader)
.config(configuration)
.factory(catalogStoreFactory)
.build();

CatalogManager.Builder builder =
CatalogManager.newBuilder()
Expand All @@ -341,13 +347,7 @@ private static CatalogManager buildCatalogManager(
.catalogModificationListeners(
TableFactoryUtil.findCatalogModificationListenerList(
configuration, userClassLoader))
.catalogStoreHolder(
CatalogStoreHolder.newBuilder()
.catalogStore(catalogStore)
.classloader(userClassLoader)
.config(configuration)
.factory(catalogStoreFactory)
.build());
.catalogStoreHolder(catalogStore);

// init default catalog
String defaultCatalogName;
Expand All @@ -372,8 +372,19 @@ private static CatalogManager buildCatalogManager(
}

defaultCatalog =
new GenericInMemoryCatalog(
defaultCatalogName, settings.getBuiltInDatabaseName());
catalogStore
.catalogStore()
.getCatalog(defaultCatalogName)
.map(
catalogDescriptor ->
FactoryUtil.createCatalog(
defaultCatalogName,
catalogDescriptor.getConfiguration().toMap(),
catalogStore.config(),
catalogStore.classLoader()))
.orElse(
new GenericInMemoryCatalog(
defaultCatalogName, settings.getBuiltInDatabaseName()));
}
defaultCatalog.open();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.catalog.CatalogDescriptor;
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.catalog.FileCatalogStore;
import org.apache.flink.table.catalog.listener.CatalogFactory1;
import org.apache.flink.table.catalog.listener.CatalogFactory2;
import org.apache.flink.table.catalog.listener.CatalogListener1;
Expand All @@ -35,7 +38,9 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
Expand All @@ -45,7 +50,9 @@
import static org.apache.flink.configuration.PipelineOptions.NAME;
import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_CATALOG_MODIFICATION_LISTENERS;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_CATALOG_NAME;
import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
import static org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND;
import static org.assertj.core.api.Assertions.assertThat;

/** Test {@link SessionContext}. */
Expand Down Expand Up @@ -200,12 +207,41 @@ void testCreateContextWithListeners() {
CatalogListener2.class.getName()));
}

// --------------------------------------------------------------------------------------------
@Test
void testCreateDefaultCatalogFromStore(@TempDir Path catalogFilePath) {
CatalogStore catalogStore = new FileCatalogStore(catalogFilePath.toString());
catalogStore.open();
catalogStore.storeCatalog(
"cat1",
CatalogDescriptor.of(
"cat1",
Configuration.fromMap(
Collections.singletonMap("type", "generic_in_memory"))));
catalogStore.storeCatalog(
"cat2",
CatalogDescriptor.of(
"cat2",
Configuration.fromMap(
Collections.singletonMap("type", "generic_in_memory"))));
catalogStore.close();

private SessionContext createSessionContext() {
Configuration flinkConfig = new Configuration();
flinkConfig.set(OBJECT_REUSE, true);
flinkConfig.set(MAX_PARALLELISM, 16);
flinkConfig.set(TABLE_CATALOG_STORE_KIND, "file");
flinkConfig.setString("table.catalog-store.file.path", catalogFilePath.toString());
SessionContext context1 = createSessionContext(flinkConfig);
assertThat(context1.getSessionState().catalogManager.getCurrentCatalog())
.isEqualTo("default_catalog");
context1.close();

flinkConfig.set(TABLE_CATALOG_NAME, "cat1");
SessionContext context2 = createSessionContext(flinkConfig);
assertThat(context2.getSessionState().catalogManager.getCurrentCatalog()).isEqualTo("cat1");
context2.close();
}

// --------------------------------------------------------------------------------------------

private SessionContext createSessionContext(Configuration flinkConfig) {
DefaultContext defaultContext = new DefaultContext(flinkConfig, Collections.emptyList());
SessionEnvironment environment =
SessionEnvironment.newBuilder()
Expand All @@ -215,4 +251,11 @@ private SessionContext createSessionContext() {
return SessionContext.create(
defaultContext, SessionHandle.create(), environment, EXECUTOR_SERVICE);
}

private SessionContext createSessionContext() {
Configuration flinkConfig = new Configuration();
flinkConfig.set(OBJECT_REUSE, true);
flinkConfig.set(MAX_PARALLELISM, 16);
return createSessionContext(flinkConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,18 +201,15 @@ public CatalogManager build() {
checkNotNull(classLoader, "Class loader cannot be null");
checkNotNull(config, "Config cannot be null");
checkNotNull(catalogStoreHolder, "CatalogStoreHolder cannot be null");
catalogStoreHolder.open();
CatalogManager catalogManager =
new CatalogManager(
defaultCatalogName,
defaultCatalog,
dataTypeFactory != null
? dataTypeFactory
: new DataTypeFactoryImpl(classLoader, config, executionConfig),
new ManagedTableListener(classLoader, config),
catalogModificationListeners,
catalogStoreHolder);
return catalogManager;
return new CatalogManager(
defaultCatalogName,
defaultCatalog,
dataTypeFactory != null
? dataTypeFactory
: new DataTypeFactoryImpl(classLoader, config, executionConfig),
new ManagedTableListener(classLoader, config),
catalogModificationListeners,
catalogStoreHolder);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ public CatalogStoreHolder build() {
checkNotNull(catalogStore, "CatalogStore cannot be null");
checkNotNull(config, "Config cannot be null");
checkNotNull(classLoader, "Class loader cannot be null");
return new CatalogStoreHolder(catalogStore, factory, config, classLoader);
CatalogStoreHolder catalogStoreHolder =
new CatalogStoreHolder(catalogStore, factory, config, classLoader);
catalogStoreHolder.open();
return catalogStoreHolder;
}
}

Expand Down

0 comments on commit 1797a70

Please sign in to comment.