Skip to content

Commit

Permalink
[FLINK-12676][table][sql-client] Add descriptor, validator, and facto…
Browse files Browse the repository at this point in the history
…ry of GenericInMemoryCatalog for table discovery service

This closes apache#8567.
  • Loading branch information
bowenli86 committed May 31, 2019
1 parent ba648a5 commit c691c13
Show file tree
Hide file tree
Showing 13 changed files with 386 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void open() throws CatalogException {

if (!databaseExists(getDefaultDatabase())) {
throw new CatalogException(String.format("Configured default database %s doesn't exist in catalog %s.",
getDefaultDatabase(), getCatalogName()));
getDefaultDatabase(), getName()));
}
}

Expand Down Expand Up @@ -192,7 +192,7 @@ public void createDatabase(String databaseName, CatalogDatabase database, boolea
client.createDatabase(hiveDatabase);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(getCatalogName(), hiveDatabase.getName());
throw new DatabaseAlreadyExistException(getName(), hiveDatabase.getName());
}
} catch (TException e) {
throw new CatalogException(String.format("Failed to create database %s", hiveDatabase.getName()), e);
Expand Down Expand Up @@ -259,7 +259,7 @@ public List<String> listDatabases() throws CatalogException {
return client.getAllDatabases();
} catch (TException e) {
throw new CatalogException(
String.format("Failed to list all databases in %s", getCatalogName()), e);
String.format("Failed to list all databases in %s", getName()), e);
}
}

Expand All @@ -282,10 +282,10 @@ public void dropDatabase(String name, boolean ignoreIfNotExists) throws Database
client.dropDatabase(name, true, ignoreIfNotExists);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(getCatalogName(), name);
throw new DatabaseNotExistException(getName(), name);
}
} catch (InvalidOperationException e) {
throw new DatabaseNotEmptyException(getCatalogName(), name);
throw new DatabaseNotEmptyException(getName(), name);
} catch (TException e) {
throw new CatalogException(String.format("Failed to drop database %s", name), e);
}
Expand All @@ -295,10 +295,10 @@ private Database getHiveDatabase(String databaseName) throws DatabaseNotExistExc
try {
return client.getDatabase(databaseName);
} catch (NoSuchObjectException e) {
throw new DatabaseNotExistException(getCatalogName(), databaseName);
throw new DatabaseNotExistException(getName(), databaseName);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to get database %s from %s", databaseName, getCatalogName()), e);
String.format("Failed to get database %s from %s", databaseName, getName()), e);
}
}

Expand All @@ -319,7 +319,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
checkNotNull(table, "table cannot be null");

if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(getCatalogName(), tablePath.getDatabaseName());
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}

Table hiveTable = instantiateHiveTable(tablePath, table);
Expand All @@ -328,7 +328,7 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
client.createTable(hiveTable);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(getCatalogName(), tablePath);
throw new TableAlreadyExistException(getName(), tablePath);
}
} catch (TException e) {
throw new CatalogException(String.format("Failed to create table %s", tablePath.getFullName()), e);
Expand All @@ -349,14 +349,14 @@ public void renameTable(ObjectPath tablePath, String newTableName, boolean ignor
// alter_table() doesn't throw a clear exception when new table already exists.
// Thus, check the table existence explicitly
if (tableExists(newPath)) {
throw new TableAlreadyExistException(getCatalogName(), newPath);
throw new TableAlreadyExistException(getName(), newPath);
} else {
Table table = getHiveTable(tablePath);
table.setTableName(newTableName);
client.alter_table(tablePath.getDatabaseName(), tablePath.getObjectName(), table);
}
} else if (!ignoreIfNotExists) {
throw new TableNotExistException(getCatalogName(), tablePath);
throw new TableNotExistException(getName(), tablePath);
}
} catch (TException e) {
throw new CatalogException(
Expand Down Expand Up @@ -417,7 +417,7 @@ public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws Ta
ignoreIfNotExists);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(getCatalogName(), tablePath);
throw new TableNotExistException(getName(), tablePath);
}
} catch (TException e) {
throw new CatalogException(
Expand All @@ -432,7 +432,7 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
try {
return client.getAllTables(databaseName);
} catch (UnknownDBException e) {
throw new DatabaseNotExistException(getCatalogName(), databaseName);
throw new DatabaseNotExistException(getName(), databaseName);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to list tables in database %s", databaseName), e);
Expand All @@ -449,7 +449,7 @@ public List<String> listViews(String databaseName) throws DatabaseNotExistExcept
null, // table pattern
TableType.VIRTUAL_VIEW);
} catch (UnknownDBException e) {
throw new DatabaseNotExistException(getCatalogName(), databaseName);
throw new DatabaseNotExistException(getName(), databaseName);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to list views in database %s", databaseName), e);
Expand All @@ -475,7 +475,7 @@ Table getHiveTable(ObjectPath tablePath) throws TableNotExistException {
try {
return client.getTable(tablePath.getDatabaseName(), tablePath.getObjectName());
} catch (NoSuchObjectException e) {
throw new TableNotExistException(getCatalogName(), tablePath);
throw new TableNotExistException(getName(), tablePath);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to get table %s from Hive metastore", tablePath.getFullName()), e);
Expand Down Expand Up @@ -652,7 +652,7 @@ public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partition
client.add_partition(instantiateHivePartition(hiveTable, partitionSpec, partition));
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new PartitionAlreadyExistsException(getCatalogName(), tablePath, partitionSpec);
throw new PartitionAlreadyExistsException(getName(), tablePath, partitionSpec);
}
} catch (TException e) {
throw new CatalogException(
Expand All @@ -672,10 +672,10 @@ public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSp
getOrderedFullPartitionValues(partitionSpec, getFieldNames(hiveTable.getPartitionKeys()), tablePath), true);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
}
} catch (MetaException | TableNotExistException | PartitionSpecInvalidException e) {
throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to drop partition %s of table %s", partitionSpec, tablePath));
Expand Down Expand Up @@ -732,7 +732,7 @@ public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec
Partition hivePartition = getHivePartition(tablePath, partitionSpec);
return instantiateCatalogPartition(hivePartition);
} catch (NoSuchObjectException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to get partition %s of table %s", partitionSpec, tablePath), e);
Expand Down Expand Up @@ -760,7 +760,7 @@ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionS
if (ignoreIfNotExists) {
return;
}
throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec);
throw new PartitionNotExistException(getName(), tablePath, partitionSpec);
}
Partition newHivePartition = instantiateHivePartition(hiveTable, partitionSpec, newPartition);
if (newHivePartition.getSd().getLocation() == null) {
Expand All @@ -773,10 +773,10 @@ public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionS
);
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
}
} catch (InvalidOperationException | MetaException | TableNotExistException | PartitionSpecInvalidException e) {
throw new PartitionNotExistException(getCatalogName(), tablePath, partitionSpec, e);
throw new PartitionNotExistException(getName(), tablePath, partitionSpec, e);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to alter existing partition with new partition %s of table %s",
Expand All @@ -803,7 +803,7 @@ private Partition instantiateHivePartition(Table hiveTable, CatalogPartitionSpec
// validate partition values
for (int i = 0; i < partCols.size(); i++) {
if (StringUtils.isNullOrWhitespaceOnly(partValues.get(i))) {
throw new PartitionSpecInvalidException(getCatalogName(), partCols,
throw new PartitionSpecInvalidException(getName(), partCols,
new ObjectPath(hiveTable.getDbName(), hiveTable.getTableName()), partitionSpec);
}
}
Expand All @@ -827,7 +827,7 @@ private static CatalogPartition instantiateCatalogPartition(Partition hivePartit

private void ensurePartitionedTable(ObjectPath tablePath, Table hiveTable) throws TableNotPartitionedException {
if (hiveTable.getPartitionKeysSize() == 0) {
throw new TableNotPartitionedException(getCatalogName(), tablePath);
throw new TableNotPartitionedException(getName(), tablePath);
}
}

Expand Down Expand Up @@ -870,13 +870,13 @@ private List<String> getOrderedFullPartitionValues(CatalogPartitionSpec partitio
throws PartitionSpecInvalidException {
Map<String, String> spec = partitionSpec.getPartitionSpec();
if (spec.size() != partitionKeys.size()) {
throw new PartitionSpecInvalidException(getCatalogName(), partitionKeys, tablePath, partitionSpec);
throw new PartitionSpecInvalidException(getName(), partitionKeys, tablePath, partitionSpec);
}

List<String> values = new ArrayList<>(spec.size());
for (String key : partitionKeys) {
if (!spec.containsKey(key)) {
throw new PartitionSpecInvalidException(getCatalogName(), partitionKeys, tablePath, partitionSpec);
throw new PartitionSpecInvalidException(getName(), partitionKeys, tablePath, partitionSpec);
} else {
values.add(spec.get(key));
}
Expand Down Expand Up @@ -918,10 +918,10 @@ public void createFunction(ObjectPath functionPath, CatalogFunction function, bo
try {
client.createFunction(hiveFunction);
} catch (NoSuchObjectException e) {
throw new DatabaseNotExistException(getCatalogName(), functionPath.getDatabaseName(), e);
throw new DatabaseNotExistException(getName(), functionPath.getDatabaseName(), e);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new FunctionAlreadyExistException(getCatalogName(), functionPath, e);
throw new FunctionAlreadyExistException(getName(), functionPath, e);
}
} catch (TException e) {
throw new CatalogException(
Expand Down Expand Up @@ -977,7 +977,7 @@ public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists)
client.dropFunction(functionPath.getDatabaseName(), functionPath.getObjectName());
} catch (NoSuchObjectException e) {
if (!ignoreIfNotExists) {
throw new FunctionNotExistException(getCatalogName(), functionPath, e);
throw new FunctionNotExistException(getName(), functionPath, e);
}
} catch (TException e) {
throw new CatalogException(
Expand All @@ -992,7 +992,7 @@ public List<String> listFunctions(String databaseName) throws DatabaseNotExistEx
// client.getFunctions() returns empty list when the database doesn't exist
// thus we need to explicitly check whether the database exists or not
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(getCatalogName(), databaseName);
throw new DatabaseNotExistException(getName(), databaseName);
}

try {
Expand Down Expand Up @@ -1024,7 +1024,7 @@ public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotEx
return new HiveCatalogFunction(function.getClassName());
}
} catch (NoSuchObjectException e) {
throw new FunctionNotExistException(getCatalogName(), functionPath, e);
throw new FunctionNotExistException(getName(), functionPath, e);
} catch (TException e) {
throw new CatalogException(
String.format("Failed to get function %s", functionPath.getFullName()), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,23 +138,14 @@ public Catalog createCatalog(String name, Map<String, String> properties) {

final Optional<String> defaultDatabase = params.getOptionalString(CATALOG_DEFAULT_DATABASE);

if (defaultDatabase.isPresent()) {
return new TestCatalog(name, defaultDatabase.get());
} else {
return new TestCatalog(name);
}
return new TestCatalog(name, defaultDatabase.orElse(GenericInMemoryCatalog.DEFAULT_DB));
}
}

/**
* Test catalog.
*/
public static class TestCatalog extends GenericInMemoryCatalog {

public TestCatalog(String name) {
super(name);
}

public TestCatalog(String name, String defaultDatabase) {
super(name, defaultDatabase);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ public void testExecutionConfig() throws Exception {

@Test
public void testCatalogs() throws Exception {
final String catalogName = "catalog2";
final String catalogName = "inmemorycatalog";
final ExecutionContext<?> context = createCatalogExecutionContext();
final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();

assertEquals(tableEnv.getCurrentCatalog(), catalogName);
assertEquals(tableEnv.getCurrentDatabase(), "test-default-database");
assertEquals(tableEnv.getCurrentDatabase(), "mydatabase");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ execution:
max-failures-per-interval: 10
failure-rate-interval: 99000
delay: 1000
current-catalog: catalog2
current-database: test-default-database
current-catalog: inmemorycatalog
current-database: mydatabase

deployment:
response-timeout: 5000
Expand All @@ -123,4 +123,7 @@ catalogs:
type: DependencyTest
- name: catalog2
type: DependencyTest
default-database: test-default-database
default-database: mydatabase
- name: inmemorycatalog
type: generic_in_memory
default-database: mydatabase
Loading

0 comments on commit c691c13

Please sign in to comment.