Skip to content

Commit

Permalink
[Fix][Connector-V2] Fix known directory create and delete ignore issu…
Browse files Browse the repository at this point in the history
…es (apache#7700)
  • Loading branch information
corgy-w authored Sep 23, 2024
1 parent 867f126 commit e2fb679
Show file tree
Hide file tree
Showing 13 changed files with 374 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,10 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
// Create the index
checkNotNull(tablePath, "tablePath cannot be null");
if (tableExists(tablePath)) {
if (ignoreIfExists) {
return;
} else {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(catalogName, tablePath, null);
}
return;
}
ezsClient.createIndex(tablePath.getTableName());
}
Expand All @@ -188,8 +187,11 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
checkNotNull(tablePath);
if (!tableExists(tablePath) && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
if (!tableExists(tablePath)) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
return;
}
try {
ezsClient.dropIndex(tablePath.getTableName());
Expand All @@ -205,13 +207,21 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
createTable(tablePath, null, ignoreIfExists);
try {
createTable(tablePath, null, ignoreIfExists);
} catch (TableAlreadyExistException ex) {
throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName());
}
}

@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
dropTable(tablePath, ignoreIfNotExists);
try {
dropTable(tablePath, ignoreIfNotExists);
} catch (TableNotExistException ex) {
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
}
}

private Map<String, String> buildTableOptions(TablePath tablePath) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,24 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
// Create the index
checkNotNull(tablePath, "tablePath cannot be null");
if (tableExists(tablePath)) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(catalogName, tablePath);
}
return;
}
esRestClient.createIndex(tablePath.getTableName());
}

@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
checkNotNull(tablePath);
if (!tableExists(tablePath) && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
if (!tableExists(tablePath)) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
return;
}
try {
esRestClient.dropIndex(tablePath.getTableName());
Expand All @@ -205,13 +214,21 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
createTable(tablePath, null, ignoreIfExists);
try {
createTable(tablePath, null, ignoreIfExists);
} catch (TableAlreadyExistException ex) {
throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName());
}
}

@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
dropTable(tablePath, ignoreIfNotExists);
try {
dropTable(tablePath, ignoreIfNotExists);
} catch (TableNotExistException ex) {
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ public CatalogTable getTable(TablePath tablePath)
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
checkNotNull(tablePath, "tablePath cannot be null");
if (tableExists(tablePath)) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(catalogName, tablePath);
}
return;
}
hbaseClient.createTable(
tablePath.getDatabaseName(),
tablePath.getTableName(),
Expand All @@ -130,34 +136,46 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
checkNotNull(tablePath);
if (!tableExists(tablePath) && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
if (!tableExists(tablePath)) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
return;
}
hbaseClient.dropTable(tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
if (databaseExists(tablePath.getDatabaseName()) && !ignoreIfExists) {
throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName());
if (databaseExists(tablePath.getDatabaseName())) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName());
}
return;
}
hbaseClient.createNamespace(tablePath.getDatabaseName());
}

@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
if (!databaseExists(tablePath.getDatabaseName()) && !ignoreIfNotExists) {
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
if (!databaseExists(tablePath.getDatabaseName())) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
}
return;
}
hbaseClient.deleteNamespace(tablePath.getDatabaseName());
}

@Override
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
if (!tableExists(tablePath) && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
if (!tableExists(tablePath)) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
return;
}
hbaseClient.truncateTable(tablePath.getDatabaseName(), tablePath.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hbase.source.HbaseSourceSplit;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void createTable(
List<String> columnFamilies,
boolean ignoreIfExists) {
try {
if (!databaseExists(databaseName)) {
if (!databaseExists(databaseName) && !StringUtils.isBlank(databaseName)) {
admin.createNamespace(NamespaceDescriptor.create(databaseName).build());
}
TableName table = TableName.valueOf(databaseName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,15 @@ private FieldType convertToFieldType(Column column, PrimaryKey primaryKey) {
@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
}
if (!tableExists(tablePath)) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
return;
}
this.client.dropCollection(
DropCollectionParam.newBuilder()
.withDatabaseName(tablePath.getDatabaseName())
Expand All @@ -358,6 +367,12 @@ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
@Override
public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
throws DatabaseAlreadyExistException, CatalogException {
if (databaseExists(tablePath.getDatabaseName())) {
if (!ignoreIfExists) {
throw new DatabaseAlreadyExistException(catalogName, tablePath.getDatabaseName());
}
return;
}
R<RpcStatus> response =
this.client.createDatabase(
CreateDatabaseParam.newBuilder()
Expand All @@ -372,6 +387,12 @@ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
if (!databaseExists(tablePath.getDatabaseName())) {
if (!ignoreIfNotExists) {
throw new DatabaseNotExistException(catalogName, tablePath.getDatabaseName());
}
return;
}
this.client.dropDatabase(
DropDatabaseParam.newBuilder()
.withDatabaseName(tablePath.getDatabaseName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,15 +150,24 @@ private Map<String, String> buildTableOptions(TablePath tablePath) {
public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
checkNotNull(tablePath, "tablePath cannot be null");
if (tableExists(tablePath)) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(catalogName, tablePath);
}
return;
}
typesenseClient.createCollection(tablePath.getTableName());
}

@Override
public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
checkNotNull(tablePath);
if (!tableExists(tablePath) && !ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
if (!tableExists(tablePath)) {
if (!ignoreIfNotExists) {
throw new TableNotExistException(catalogName, tablePath);
}
return;
}
try {
typesenseClient.dropCollection(tablePath.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class TypesenseClient {

public static TypesenseClient createInstance(ReadonlyConfig config) {
List<String> hosts = config.get(TypesenseConnectionConfig.HOSTS);
String protocol = config.get(TypesenseConnectionConfig.protocol);
String protocol = config.get(TypesenseConnectionConfig.PROTOCOL);
String apiKey = config.get(TypesenseConnectionConfig.APIKEY);
return createInstance(hosts, apiKey, protocol);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class TypesenseConnectionConfig {
.noDefaultValue()
.withDescription("Typesense api key");

public static final Option<String> protocol =
public static final Option<String> PROTOCOL =
Options.key("protocol")
.stringType()
.defaultValue("http")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;

import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.easysearch.catalog.EasysearchCatalog;
import org.apache.seatunnel.connectors.seatunnel.easysearch.client.EasysearchClient;
import org.apache.seatunnel.connectors.seatunnel.easysearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
Expand All @@ -34,6 +42,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
Expand All @@ -57,7 +66,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -76,6 +84,10 @@ public class EasysearchIT extends TestSuiteBase implements TestResource {

private EasysearchClient easysearchClient;

private Config easysearchConfig;

private Catalog catalog;

@BeforeEach
@Override
public void startUp() throws Exception {
Expand Down Expand Up @@ -107,17 +119,18 @@ public void startUp() throws Exception {
private void initConnection() {
String host = easysearchServer.getContainerIpAddress();
String endpoint = String.format("https://%s:%d", host, PORT);
easysearchClient =
EasysearchClient.createInstance(
Lists.newArrayList(endpoint),
Optional.of("admin"),
Optional.of("admin"),
false,
false,
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
Map<String, Object> config = new HashMap<>();
config.put("username", "admin");
config.put("password", "admin");
config.put("hosts", Lists.newArrayList(endpoint));
config.put("tls_verify_certificate", false);
config.put("tls_verify_hostname", false);

easysearchConfig = ConfigFactory.parseMap(config);

easysearchClient = EasysearchClient.createInstance(easysearchConfig);
catalog = new EasysearchCatalog("easysearch", "default", easysearchConfig);
catalog.open();
createIndexDocs();
}

Expand Down Expand Up @@ -148,6 +161,35 @@ public void testEasysearch(TestContainer container) throws IOException, Interrup
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}

@TestTemplate
@Disabled("Easysearch catalog not yet realized, see EasysearchCatalogFactory.class")
public void testCatalog(TestContainer container) {
// always exist
Exception exception =
Assertions.assertThrows(
Exception.class,
() -> catalog.createDatabase(TablePath.of("", "st_index"), false));
Assertions.assertTrue(
exception instanceof DatabaseAlreadyExistException
|| exception instanceof CatalogException);

Assertions.assertDoesNotThrow(
() -> catalog.createDatabase(TablePath.of("", "st_index"), true));

// create
Assertions.assertDoesNotThrow(
() -> catalog.createTable(TablePath.of("", "tmp_index"), null, false));
Assertions.assertDoesNotThrow(
() -> catalog.dropDatabase(TablePath.of("", "tmp_index"), false));
Exception tmpIndex =
Assertions.assertThrows(
Exception.class,
() -> catalog.dropDatabase(TablePath.of("", "tmp_index"), false));
Assertions.assertTrue(
tmpIndex instanceof DatabaseNotExistException
|| tmpIndex instanceof CatalogException);
}

private List<String> generateTestDataSet() throws JsonProcessingException {
String[] fields =
new String[] {
Expand Down Expand Up @@ -275,6 +317,9 @@ public void tearDown() {
if (Objects.nonNull(easysearchClient)) {
easysearchClient.close();
}
if (Objects.nonNull(catalog)) {
catalog.close();
}
easysearchServer.close();
}
}
Loading

0 comments on commit e2fb679

Please sign in to comment.