Skip to content

Commit

Permalink
Flink: remove the creation of default database in FlinkCatalog open m…
Browse files Browse the repository at this point in the history
…ethod (apache#7795)
  • Loading branch information
Fokko authored Jun 26, 2023
1 parent fbed421 commit f5bb0c0
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,7 @@ public FlinkCatalog(
}

@Override
public void open() throws CatalogException {
// Create the default database if it does not exist.
try {
createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
} catch (DatabaseAlreadyExistException e) {
// Ignore the exception if it's already exist.
}
}
public void open() throws CatalogException {}

@Override
public void close() throws CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
import java.util.Objects;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.types.Row;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -74,19 +73,6 @@ public void testCreateNamespace() {
"Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace));
}

@Test
public void testDefaultDatabase() {
sql("USE CATALOG %s", catalogName);
sql("SHOW TABLES");

Assert.assertEquals(
"Should use the current catalog", getTableEnv().getCurrentCatalog(), catalogName);
Assert.assertEquals(
"Should use the configured default namespace",
getTableEnv().getCurrentDatabase(),
"default");
}

@Test
public void testDropEmptyDatabase() {
Assert.assertFalse(
Expand Down Expand Up @@ -126,11 +112,11 @@ public void testDropNonEmptyNamespace() {
"Table should exist",
validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl")));

AssertHelpers.assertThrowsCause(
"Should fail if trying to delete a non-empty database",
DatabaseNotEmptyException.class,
String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName),
() -> sql("DROP DATABASE %s", flinkDatabase));
Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
.cause()
.isInstanceOf(DatabaseNotEmptyException.class)
.hasMessage(
String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName));

sql("DROP TABLE %s.tl", flinkDatabase);
}
Expand Down Expand Up @@ -174,22 +160,17 @@ public void testListNamespace() {
List<Row> databases = sql("SHOW DATABASES");

if (isHadoopCatalog) {
Assert.assertEquals("Should have 2 database", 2, databases.size());
Assert.assertEquals(
"Should have db and default database",
Sets.newHashSet("default", "db"),
Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0)));
Assert.assertEquals("Should have 1 database", 1, databases.size());
Assert.assertEquals("Should have db database", "db", databases.get(0).getField(0));

if (!baseNamespace.isEmpty()) {
// test namespace not belongs to this catalog
validationNamespaceCatalog.createNamespace(
Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
databases = sql("SHOW DATABASES");
Assert.assertEquals("Should have 2 database", 2, databases.size());
Assert.assertEquals("Should have 1 database", 1, databases.size());
Assert.assertEquals(
"Should have db and default database",
Sets.newHashSet("default", "db"),
Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0)));
"Should have db and default database", "db", databases.get(0).getField(0));
}
} else {
// If there are multiple classes extends FlinkTestBase, TestHiveMetastore may loose the
Expand Down Expand Up @@ -301,10 +282,12 @@ public void testHadoopNotSupportMeta() {
"Namespace should not already exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));

AssertHelpers.assertThrowsCause(
"Should fail if trying to create database with location in hadoop catalog.",
UnsupportedOperationException.class,
String.format("Cannot create namespace %s: metadata is not supported", icebergNamespace),
() -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase));
Assertions.assertThatThrownBy(
() -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase))
.cause()
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
String.format(
"Cannot create namespace %s: metadata is not supported", icebergNamespace));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,7 @@ public FlinkCatalog(
}

@Override
public void open() throws CatalogException {
// Create the default database if it does not exist.
try {
createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
} catch (DatabaseAlreadyExistException e) {
// Ignore the exception if it's already exist.
}
}
public void open() throws CatalogException {}

@Override
public void close() throws CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
import java.util.Objects;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.types.Row;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand Down Expand Up @@ -74,19 +73,6 @@ public void testCreateNamespace() {
"Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace));
}

@Test
public void testDefaultDatabase() {
sql("USE CATALOG %s", catalogName);
sql("SHOW TABLES");

Assert.assertEquals(
"Should use the current catalog", getTableEnv().getCurrentCatalog(), catalogName);
Assert.assertEquals(
"Should use the configured default namespace",
getTableEnv().getCurrentDatabase(),
"default");
}

@Test
public void testDropEmptyDatabase() {
Assert.assertFalse(
Expand Down Expand Up @@ -126,11 +112,11 @@ public void testDropNonEmptyNamespace() {
"Table should exist",
validationCatalog.tableExists(TableIdentifier.of(icebergNamespace, "tl")));

AssertHelpers.assertThrowsCause(
"Should fail if trying to delete a non-empty database",
DatabaseNotEmptyException.class,
String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName),
() -> sql("DROP DATABASE %s", flinkDatabase));
Assertions.assertThatThrownBy(() -> sql("DROP DATABASE %s", flinkDatabase))
.cause()
.isInstanceOf(DatabaseNotEmptyException.class)
.hasMessage(
String.format("Database %s in catalog %s is not empty.", DATABASE, catalogName));

sql("DROP TABLE %s.tl", flinkDatabase);
}
Expand Down Expand Up @@ -174,22 +160,17 @@ public void testListNamespace() {
List<Row> databases = sql("SHOW DATABASES");

if (isHadoopCatalog) {
Assert.assertEquals("Should have 2 database", 2, databases.size());
Assert.assertEquals(
"Should have db and default database",
Sets.newHashSet("default", "db"),
Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0)));
Assert.assertEquals("Should have 1 database", 1, databases.size());
Assert.assertEquals("Should have db database", "db", databases.get(0).getField(0));

if (!baseNamespace.isEmpty()) {
// test namespace not belongs to this catalog
validationNamespaceCatalog.createNamespace(
Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
databases = sql("SHOW DATABASES");
Assert.assertEquals("Should have 2 database", 2, databases.size());
Assert.assertEquals("Should have 1 database", 1, databases.size());
Assert.assertEquals(
"Should have db and default database",
Sets.newHashSet("default", "db"),
Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0)));
"Should have db and default database", "db", databases.get(0).getField(0));
}
} else {
// If there are multiple classes extends FlinkTestBase, TestHiveMetastore may loose the
Expand Down Expand Up @@ -301,10 +282,12 @@ public void testHadoopNotSupportMeta() {
"Namespace should not already exist",
validationNamespaceCatalog.namespaceExists(icebergNamespace));

AssertHelpers.assertThrowsCause(
"Should fail if trying to create database with location in hadoop catalog.",
UnsupportedOperationException.class,
String.format("Cannot create namespace %s: metadata is not supported", icebergNamespace),
() -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase));
Assertions.assertThatThrownBy(
() -> sql("CREATE DATABASE %s WITH ('prop'='value')", flinkDatabase))
.cause()
.isInstanceOf(UnsupportedOperationException.class)
.hasMessage(
String.format(
"Cannot create namespace %s: metadata is not supported", icebergNamespace));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,7 @@ public FlinkCatalog(
}

@Override
public void open() throws CatalogException {
// Create the default database if it does not exist.
try {
createDatabase(getDefaultDatabase(), ImmutableMap.of(), true);
} catch (DatabaseAlreadyExistException e) {
// Ignore the exception if it's already exist.
}
}
public void open() throws CatalogException {}

@Override
public void close() throws CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
import org.junit.After;
Expand Down Expand Up @@ -74,19 +73,6 @@ public void testCreateNamespace() {
"Database should be created", validationNamespaceCatalog.namespaceExists(icebergNamespace));
}

@Test
public void testDefaultDatabase() {
sql("USE CATALOG %s", catalogName);
sql("SHOW TABLES");

Assert.assertEquals(
"Should use the current catalog", getTableEnv().getCurrentCatalog(), catalogName);
Assert.assertEquals(
"Should use the configured default namespace",
getTableEnv().getCurrentDatabase(),
"default");
}

@Test
public void testDropEmptyDatabase() {
Assert.assertFalse(
Expand Down Expand Up @@ -174,22 +160,17 @@ public void testListNamespace() {
List<Row> databases = sql("SHOW DATABASES");

if (isHadoopCatalog) {
Assert.assertEquals("Should have 2 database", 2, databases.size());
Assert.assertEquals(
"Should have db and default database",
Sets.newHashSet("default", "db"),
Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0)));
Assert.assertEquals("Should have 1 database", 1, databases.size());
Assert.assertEquals("Should have db database", "db", databases.get(0).getField(0));

if (!baseNamespace.isEmpty()) {
// test namespace not belongs to this catalog
validationNamespaceCatalog.createNamespace(
Namespace.of(baseNamespace.level(0), "UNKNOWN_NAMESPACE"));
databases = sql("SHOW DATABASES");
Assert.assertEquals("Should have 2 database", 2, databases.size());
Assert.assertEquals("Should have 1 database", 1, databases.size());
Assert.assertEquals(
"Should have db and default database",
Sets.newHashSet("default", "db"),
Sets.newHashSet(databases.get(0).getField(0), databases.get(1).getField(0)));
"Should have db and default database", "db", databases.get(0).getField(0));
}
} else {
// If there are multiple classes extends FlinkTestBase, TestHiveMetastore may loose the
Expand Down

0 comments on commit f5bb0c0

Please sign in to comment.