From 7e0c20a488a86e6a0108d05aa8f5688c7aa0ca30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Wed, 18 Sep 2024 19:20:01 +0800 Subject: [PATCH] [Feature][Connector-V2] jdbc saphana source tablepath support view and synonym (#7670) --- .../jdbc/catalog/AbstractJdbcCatalog.java | 36 ++++++ .../jdbc/catalog/saphana/SapHanaCatalog.java | 115 ++++++++++++++++++ .../connectors/seatunnel/jdbc/JdbcHanaIT.java | 46 ++++++- .../jdbc_sap_hana_test_view_and_synonym.conf | 59 +++++++++ 4 files changed, 255 insertions(+), 1 deletion(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java index e971c138930..260be79042c 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java @@ -260,6 +260,14 @@ protected String getListDatabaseSql() { throw new UnsupportedOperationException(); } + protected String getListViewSql(String databaseName) { + throw new UnsupportedOperationException(); + } + + protected String getListSynonymSql(String databaseName) { + throw new UnsupportedOperationException(); + } + protected String getDatabaseWithConditionSql(String databaseName) { throw CommonError.unsupportedMethod(this.catalogName, "getDatabaseWithConditionSql"); } @@ -331,6 +339,34 @@ public List listTables(String databaseName) } } + public List listViews(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + String dbUrl = getUrlFromDatabaseName(databaseName); + try { + return queryString(dbUrl, getListViewSql(databaseName), this::getTableName); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + + public List listSynonym(String databaseName) + throws CatalogException, DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(this.catalogName, databaseName); + } + String dbUrl = getUrlFromDatabaseName(databaseName); + try { + return queryString(dbUrl, getListSynonymSql(databaseName), this::getTableName); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + @Override public boolean tableExists(TablePath tablePath) throws CatalogException { String databaseName = tablePath.getDatabaseName(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java index 70b01b397e2..fce7e78eebb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/saphana/SapHanaCatalog.java @@ -21,22 +21,35 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper; +import org.apache.commons.lang3.StringUtils; + import lombok.extern.slf4j.Slf4j; import java.sql.Connection; import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Optional; import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter.appendColumnSizeIfNeed; @@ -123,6 +136,18 @@ protected String getListTableSql(String databaseName) { "SELECT TABLE_NAME FROM TABLES WHERE SCHEMA_NAME = '%s'", databaseName); } + @Override + public String getListViewSql(String databaseName) { + return String.format( + "SELECT VIEW_NAME FROM SYS.VIEWS WHERE SCHEMA_NAME = '%s'", databaseName); + } + + @Override + public String getListSynonymSql(String databaseName) { + return String.format( + "SELECT SYNONYM_NAME FROM SYNONYMS WHERE SCHEMA_NAME = '%s'", databaseName); + } + @Override protected String getTableName(ResultSet rs) throws SQLException { return rs.getString(1); @@ -134,6 +159,96 @@ protected String getSelectColumnsSql(TablePath tablePath) { SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName()); } + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + try { + if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { + return querySQLResultExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + getTableWithConditionSql(tablePath)) + || querySQLResultExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + String.format( + getListViewSql(tablePath.getDatabaseName()) + + " AND VIEW_NAME = '%s'", + tablePath.getTableName())) + || querySQLResultExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + String.format( + getListSynonymSql(tablePath.getDatabaseName()) + + " AND SYNONYM_NAME = '%s'", + tablePath.getSchemaAndTableName())); + } + return querySQLResultExists( + this.getUrlFromDatabaseName(tablePath.getDatabaseName()), + getTableWithConditionSql(tablePath)); + } catch (DatabaseNotExistException e) { + return false; + } catch (SQLException e) { + throw new SeaTunnelException("Failed to querySQLResult", e); + } + } + + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + String dbUrl; + if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { + dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + } else { + dbUrl = getUrlFromDatabaseName(defaultDatabase); + } + Connection conn = getConnection(dbUrl); + TablePath originalTablePath = tablePath; + if (listSynonym(tablePath.getDatabaseName()).contains(tablePath.getTableName())) { + String sql = + String.format( + "SELECT SYNONYM_NAME, SCHEMA_NAME, OBJECT_NAME, OBJECT_SCHEMA FROM SYNONYMS WHERE SCHEMA_NAME = '%s' AND SYNONYM_NAME = '%s' ", + tablePath.getDatabaseName(), tablePath.getTableName()); + try (PreparedStatement statement = conn.prepareStatement(sql); + final ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + final String refDatabaseName = resultSet.getString("OBJECT_SCHEMA"); + final String refTableName = resultSet.getString("OBJECT_NAME"); + tablePath = TablePath.of(refDatabaseName, refTableName); + } + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting SYNONYM %s", tablePath.getFullName()), e); + } + } + try { + DatabaseMetaData metaData = conn.getMetaData(); + Optional primaryKey = getPrimaryKey(metaData, tablePath); + List constraintKeys = getConstraintKeys(metaData, tablePath); + try (PreparedStatement ps = conn.prepareStatement(getSelectColumnsSql(tablePath)); + ResultSet resultSet = ps.executeQuery()) { + + TableSchema.Builder builder = TableSchema.builder(); + buildColumnsWithErrorCheck(tablePath, resultSet, builder); + // add primary key + primaryKey.ifPresent(builder::primaryKey); + // add constraint key + constraintKeys.forEach(builder::constraintKey); + TableIdentifier tableIdentifier = getTableIdentifier(originalTablePath); + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + "", + catalogName); + } + } catch (SeaTunnelRuntimeException e) { + throw e; + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + @Override protected Column buildColumn(ResultSet resultSet) throws SQLException { String columnName = resultSet.getString("COLUMN_NAME"); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java index d2b0667795f..49b4cb17635 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcHanaIT.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper; @@ -38,6 +39,7 @@ import lombok.SneakyThrows; import java.sql.Date; +import java.sql.Statement; import java.time.Duration; import java.time.LocalDate; import java.time.temporal.ChronoUnit; @@ -56,7 +58,9 @@ public class JdbcHanaIT extends AbstractJdbcIT { private static final String SOURCE_TABLE = "ALLDATATYPES"; private static final List CONFIG_FILE = - Lists.newArrayList("/jdbc_sap_hana_source_and_sink.conf"); + Lists.newArrayList( + "/jdbc_sap_hana_source_and_sink.conf", + "/jdbc_sap_hana_test_view_and_synonym.conf"); // TODO The current Docker image cannot handle the annotated type normally, // but the corresponding type can be handled normally on the standard HANA service @@ -214,6 +218,46 @@ protected void createSchemaIfNeeded() { } } + protected void createNeededTables() { + try (Statement statement = connection.createStatement()) { + String createTemplate = jdbcCase.getCreateSql(); + + String createSource = + String.format( + createTemplate, + buildTableInfoWithSchema( + jdbcCase.getDatabase(), + jdbcCase.getSchema(), + jdbcCase.getSourceTable())); + statement.execute(createSource); + + if (!jdbcCase.isUseSaveModeCreateTable()) { + if (jdbcCase.getSinkCreateSql() != null) { + createTemplate = jdbcCase.getSinkCreateSql(); + } + String createSink = + String.format( + createTemplate, + buildTableInfoWithSchema( + jdbcCase.getDatabase(), + jdbcCase.getSchema(), + jdbcCase.getSinkTable())); + statement.execute(createSink); + } + // create view and synonym + String createViewSql = + "CREATE VIEW TEST.ALLDATATYPES_VIEW AS SELECT * FROM TEST.ALLDATATYPES;"; + String createSynonymSql = + "CREATE SYNONYM TEST.ALLDATATYPES_SYNONYM FOR TEST.ALLDATATYPES;"; + statement.execute(createViewSql); + statement.execute(createSynonymSql); + connection.commit(); + } catch (Exception exception) { + log.error(ExceptionUtils.getMessage(exception)); + throw new SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception); + } + } + @Override GenericContainer initContainer() { GenericContainer container = diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf new file mode 100644 index 00000000000..aec552c208c --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-6/src/test/resources/jdbc_sap_hana_test_view_and_synonym.conf @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + Jdbc { + url = "jdbc:sap://e2e_saphana:39017" + driver = "com.sap.db.jdbc.Driver" + connection_check_timeout_sec = 1000 + user = "SYSTEM" + password = "testPassword123" + "table_list"=[ + { + "table_path"="TEST.ALLDATATYPES_VIEW" + }, + { + "table_path"="TEST.ALLDATATYPES_SYNONYM" + } + ] + } + +} + +transform { +} + +sink { + Jdbc { + url = "jdbc:sap://e2e_saphana:39017" + driver = "com.sap.db.jdbc.Driver" + connection_check_timeout_sec = 1000 + user = "SYSTEM" + password = "testPassword123" + database = "TEST" + table = "${table_name}_sink" + generate_sink_sql = true + schema_save_mode = RECREATE_SCHEMA + data_save_mode = DROP_DATA + } +} +