Skip to content

Commit

Permalink
[AMORO-3393]: A stateless catalog manager implement (#3394)
Browse files Browse the repository at this point in the history
* stateless catalog manager

* stateless catalog manager

* ci

* ci

* improve db access

* add catalog meta cache

* add catalog meta cache

* ut

* ut

---------

Co-authored-by: zhangyongxiang.alpha <[email protected]>
  • Loading branch information
baiyangtx and zhangyongxiang.alpha authored Jan 10, 2025
1 parent 4be9821 commit 417fed6
Show file tree
Hide file tree
Showing 31 changed files with 450 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public class AmoroManagementConf {
.defaultValue("admin")
.withDescription("The administrator password");

public static final ConfigOption<Duration> CATALOG_META_CACHE_EXPIRATION_INTERVAL =
ConfigOptions.key("catalog-meta-cache.expiration-interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription("TTL for catalog metadata.");

public static final ConfigOption<Integer> TABLE_MANIFEST_IO_THREAD_COUNT =
ConfigOptions.key("table-manifest-io.thread-count")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.amoro.config.ConfigHelpers;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.exception.AmoroRuntimeException;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.DefaultCatalogManager;
import org.apache.amoro.server.dashboard.DashboardServer;
import org.apache.amoro.server.dashboard.JavalinJsonMapper;
import org.apache.amoro.server.dashboard.response.ErrorResponse;
Expand Down Expand Up @@ -92,6 +94,7 @@ public class AmoroServiceContainer {

private final HighAvailabilityContainer haContainer;
private DataSource dataSource;
private CatalogManager catalogManager;
private DefaultTableService tableService;
private DefaultOptimizingService optimizingService;
private TerminalManager terminalManager;
Expand Down Expand Up @@ -146,8 +149,9 @@ public void startService() throws Exception {
EventsManager.getInstance();
MetricManager.getInstance();

tableService = new DefaultTableService(serviceConfig);
optimizingService = new DefaultOptimizingService(serviceConfig, tableService);
catalogManager = new DefaultCatalogManager(serviceConfig);
tableService = new DefaultTableService(serviceConfig, catalogManager);
optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, tableService);

LOG.info("Setting up AMS table executors...");
AsyncTableExecutors.getInstance().setup(tableService, serviceConfig);
Expand All @@ -164,7 +168,7 @@ public void startService() throws Exception {
addHandlerChain(AsyncTableExecutors.getInstance().getTagsAutoCreatingExecutor());
tableService.initialize();
LOG.info("AMS table service have been initialized");
terminalManager = new TerminalManager(serviceConfig, tableService);
terminalManager = new TerminalManager(serviceConfig, catalogManager, tableService);

initThriftService();
startThriftService();
Expand Down Expand Up @@ -240,8 +244,9 @@ private void startThriftServer(TServer server, String threadName) {

private void initHttpService() {
DashboardServer dashboardServer =
new DashboardServer(serviceConfig, tableService, optimizingService, terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(tableService);
new DashboardServer(
serviceConfig, catalogManager, tableService, optimizingService, terminalManager);
RestCatalogService restCatalogService = new RestCatalogService(catalogManager, tableService);

httpServer =
Javalin.create(
Expand Down Expand Up @@ -333,7 +338,7 @@ private void initThriftService() throws TTransportException {
new AmoroTableMetastore.Processor<>(
ThriftServiceProxy.createProxy(
AmoroTableMetastore.Iface.class,
new TableManagementService(tableService),
new TableManagementService(catalogManager, tableService),
AmoroRuntimeException::normalizeCompatibly));
tableManagementServer =
createThriftServer(
Expand Down Expand Up @@ -536,6 +541,11 @@ public TableService getTableService() {
return this.tableService;
}

@VisibleForTesting
public CatalogManager getCatalogManager() {
return this.catalogManager;
}

@VisibleForTesting
public OptimizerManager getOptimizingService() {
return this.optimizingService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.resource.Resource;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.optimizing.OptimizingQueue;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
Expand Down Expand Up @@ -97,17 +98,22 @@ public class DefaultOptimizingService extends StatedPersistentBase
private final Map<String, OptimizingQueue> optimizingQueueByToken = new ConcurrentHashMap<>();
private final Map<String, OptimizerInstance> authOptimizers = new ConcurrentHashMap<>();
private final OptimizerKeeper optimizerKeeper = new OptimizerKeeper();
private final CatalogManager catalogManager;
private final TableService tableService;
private final RuntimeHandlerChain tableHandlerChain;
private final ExecutorService planExecutor;

public DefaultOptimizingService(Configurations serviceConfig, DefaultTableService tableService) {
public DefaultOptimizingService(
Configurations serviceConfig,
CatalogManager catalogManager,
DefaultTableService tableService) {
this.optimizerTouchTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT);
this.taskAckTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_TASK_ACK_TIMEOUT);
this.maxPlanningParallelism =
serviceConfig.getInteger(AmoroManagementConf.OPTIMIZER_MAX_PLANNING_PARALLELISM);
this.pollingTimeout = serviceConfig.getLong(AmoroManagementConf.OPTIMIZER_POLLING_TIMEOUT);
this.tableService = tableService;
this.catalogManager = catalogManager;
this.tableHandlerChain = new TableRuntimeHandlerImpl();
this.planExecutor =
Executors.newCachedThreadPool(
Expand Down Expand Up @@ -391,7 +397,7 @@ public void dispose() {
}

public boolean canDeleteResourceGroup(String name) {
for (CatalogMeta catalogMeta : tableService.listCatalogMetas()) {
for (CatalogMeta catalogMeta : catalogManager.listCatalogMetas()) {
if (catalogMeta.getCatalogProperties() != null
&& catalogMeta
.getCatalogProperties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.amoro.events.IcebergReportEvent;
import org.apache.amoro.exception.ObjectNotExistsException;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.manager.EventsManager;
Expand Down Expand Up @@ -106,9 +107,11 @@ public class RestCatalogService extends PersistentBase {

private final JavalinJackson jsonMapper;

private final CatalogManager catalogManager;
private final TableService tableService;

public RestCatalogService(TableService tableService) {
public RestCatalogService(CatalogManager catalogManager, TableService tableService) {
this.catalogManager = catalogManager;
this.tableService = tableService;
ObjectMapper objectMapper = jsonMapper();
this.jsonMapper = new JavalinJackson(objectMapper);
Expand Down Expand Up @@ -432,7 +435,7 @@ private void handleTable(

private InternalCatalog getCatalog(String catalog) {
Preconditions.checkNotNull(catalog, "lack required path variables: catalog");
ServerCatalog internalCatalog = tableService.getServerCatalog(catalog);
ServerCatalog internalCatalog = catalogManager.getServerCatalog(catalog);
Preconditions.checkArgument(
internalCatalog instanceof InternalCatalog, "The catalog is not an iceberg rest catalog");
Set<TableFormat> tableFormats = CatalogUtil.tableFormats(internalCatalog.getMetadata());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.amoro.api.TableCommitMeta;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.api.TableMeta;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.InternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.table.TableMetadata;
Expand All @@ -42,9 +43,11 @@

public class TableManagementService implements AmoroTableMetastore.Iface {

private final CatalogManager catalogManager;
private final TableService tableService;

public TableManagementService(TableService tableService) {
public TableManagementService(CatalogManager catalogManager, TableService tableService) {
this.catalogManager = catalogManager;
this.tableService = tableService;
}

Expand All @@ -53,29 +56,29 @@ public void ping() {}

@Override
public List<CatalogMeta> getCatalogs() {
return tableService.listCatalogMetas();
return catalogManager.listCatalogMetas();
}

@Override
public CatalogMeta getCatalog(String name) {
return tableService.getCatalogMeta(name);
return catalogManager.getCatalogMeta(name);
}

@Override
public List<String> getDatabases(String catalogName) {
ServerCatalog serverCatalog = tableService.getServerCatalog(catalogName);
ServerCatalog serverCatalog = catalogManager.getServerCatalog(catalogName);
return serverCatalog.listDatabases();
}

@Override
public void createDatabase(String catalogName, String database) {
InternalCatalog serverCatalog = tableService.getInternalCatalog(catalogName);
InternalCatalog serverCatalog = catalogManager.getInternalCatalog(catalogName);
serverCatalog.createDatabase(database);
}

@Override
public void dropDatabase(String catalogName, String database) {
InternalCatalog serverCatalog = tableService.getInternalCatalog(catalogName);
InternalCatalog serverCatalog = catalogManager.getInternalCatalog(catalogName);
serverCatalog.dropDatabase(database);
}

Expand All @@ -87,15 +90,15 @@ public void createTableMeta(TableMeta tableMeta) {
ServerTableIdentifier identifier =
ServerTableIdentifier.of(
tableMeta.getTableIdentifier(), TableFormat.valueOf(tableMeta.getFormat()));
InternalCatalog catalog = tableService.getInternalCatalog(identifier.getCatalog());
InternalCatalog catalog = catalogManager.getInternalCatalog(identifier.getCatalog());
CatalogMeta catalogMeta = catalog.getMetadata();
TableMetadata tableMetadata = new TableMetadata(identifier, tableMeta, catalogMeta);
tableService.createTable(catalog.name(), tableMetadata);
}

@Override
public List<TableMeta> listTables(String catalogName, String database) {
InternalCatalog serverCatalog = tableService.getInternalCatalog(catalogName);
InternalCatalog serverCatalog = catalogManager.getInternalCatalog(catalogName);
List<TableMetadata> tableMetadataList = serverCatalog.listTableMetadataInDatabase(database);
return tableMetadataList.stream()
.map(TableMetadata::buildTableMeta)
Expand All @@ -104,7 +107,7 @@ public List<TableMeta> listTables(String catalogName, String database) {

@Override
public TableMeta getTable(TableIdentifier tableIdentifier) {
InternalCatalog serverCatalog = tableService.getInternalCatalog(tableIdentifier.getCatalog());
InternalCatalog serverCatalog = catalogManager.getInternalCatalog(tableIdentifier.getCatalog());
TableMetadata tableMetadata =
serverCatalog.loadTableMetadata(
tableIdentifier.getDatabase(), tableIdentifier.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import java.util.List;

/** The CatalogService interface defines the operations that can be performed on catalogs. */
public interface CatalogService {
/** The CatalogManager interface defines the operations that can be performed on catalogs. */
public interface CatalogManager {
/**
* Returns a list of CatalogMeta objects.
*
Expand Down Expand Up @@ -62,6 +62,13 @@ public interface CatalogService {
*/
InternalCatalog getInternalCatalog(String catalogName);

/**
* Retrieves all ExternalCatalogs.
*
* @return a list of ExternalCatalogs
*/
List<ExternalCatalog> getExternalCatalogs();

/**
* Creates a catalog based on the provided catalog meta information. The catalog name is obtained
* from the catalog meta.
Expand Down
Loading

0 comments on commit 417fed6

Please sign in to comment.