Skip to content

Commit

Permalink
[Feature][Connector-V2] jdbc saphana source tablepath support view an…
Browse files Browse the repository at this point in the history
…d synonym (apache#7670)
  • Loading branch information
chl-wxp authored Sep 18, 2024
1 parent 4f812e1 commit 7e0c20a
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ protected String getListDatabaseSql() {
throw new UnsupportedOperationException();
}

protected String getListViewSql(String databaseName) {
throw new UnsupportedOperationException();
}

protected String getListSynonymSql(String databaseName) {
throw new UnsupportedOperationException();
}

protected String getDatabaseWithConditionSql(String databaseName) {
throw CommonError.unsupportedMethod(this.catalogName, "getDatabaseWithConditionSql");
}
Expand Down Expand Up @@ -331,6 +339,34 @@ public List<String> listTables(String databaseName)
}
}

public List<String> listViews(String databaseName)
throws CatalogException, DatabaseNotExistException {
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(this.catalogName, databaseName);
}
String dbUrl = getUrlFromDatabaseName(databaseName);
try {
return queryString(dbUrl, getListViewSql(databaseName), this::getTableName);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", catalogName), e);
}
}

public List<String> listSynonym(String databaseName)
throws CatalogException, DatabaseNotExistException {
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(this.catalogName, databaseName);
}
String dbUrl = getUrlFromDatabaseName(databaseName);
try {
return queryString(dbUrl, getListSynonymSql(databaseName), this::getTableName);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", catalogName), e);
}
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
String databaseName = tablePath.getDatabaseName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,35 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper;

import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeConverter.appendColumnSizeIfNeed;

Expand Down Expand Up @@ -123,6 +136,18 @@ protected String getListTableSql(String databaseName) {
"SELECT TABLE_NAME FROM TABLES WHERE SCHEMA_NAME = '%s'", databaseName);
}

@Override
public String getListViewSql(String databaseName) {
return String.format(
"SELECT VIEW_NAME FROM SYS.VIEWS WHERE SCHEMA_NAME = '%s'", databaseName);
}

@Override
public String getListSynonymSql(String databaseName) {
return String.format(
"SELECT SYNONYM_NAME FROM SYNONYMS WHERE SCHEMA_NAME = '%s'", databaseName);
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
return rs.getString(1);
Expand All @@ -134,6 +159,96 @@ protected String getSelectColumnsSql(TablePath tablePath) {
SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
return querySQLResultExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
getTableWithConditionSql(tablePath))
|| querySQLResultExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
String.format(
getListViewSql(tablePath.getDatabaseName())
+ " AND VIEW_NAME = '%s'",
tablePath.getTableName()))
|| querySQLResultExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
String.format(
getListSynonymSql(tablePath.getDatabaseName())
+ " AND SYNONYM_NAME = '%s'",
tablePath.getSchemaAndTableName()));
}
return querySQLResultExists(
this.getUrlFromDatabaseName(tablePath.getDatabaseName()),
getTableWithConditionSql(tablePath));
} catch (DatabaseNotExistException e) {
return false;
} catch (SQLException e) {
throw new SeaTunnelException("Failed to querySQLResult", e);
}
}

public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(catalogName, tablePath);
}
String dbUrl;
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
} else {
dbUrl = getUrlFromDatabaseName(defaultDatabase);
}
Connection conn = getConnection(dbUrl);
TablePath originalTablePath = tablePath;
if (listSynonym(tablePath.getDatabaseName()).contains(tablePath.getTableName())) {
String sql =
String.format(
"SELECT SYNONYM_NAME, SCHEMA_NAME, OBJECT_NAME, OBJECT_SCHEMA FROM SYNONYMS WHERE SCHEMA_NAME = '%s' AND SYNONYM_NAME = '%s' ",
tablePath.getDatabaseName(), tablePath.getTableName());
try (PreparedStatement statement = conn.prepareStatement(sql);
final ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
final String refDatabaseName = resultSet.getString("OBJECT_SCHEMA");
final String refTableName = resultSet.getString("OBJECT_NAME");
tablePath = TablePath.of(refDatabaseName, refTableName);
}
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting SYNONYM %s", tablePath.getFullName()), e);
}
}
try {
DatabaseMetaData metaData = conn.getMetaData();
Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, tablePath);
List<ConstraintKey> constraintKeys = getConstraintKeys(metaData, tablePath);
try (PreparedStatement ps = conn.prepareStatement(getSelectColumnsSql(tablePath));
ResultSet resultSet = ps.executeQuery()) {

TableSchema.Builder builder = TableSchema.builder();
buildColumnsWithErrorCheck(tablePath, resultSet, builder);
// add primary key
primaryKey.ifPresent(builder::primaryKey);
// add constraint key
constraintKeys.forEach(builder::constraintKey);
TableIdentifier tableIdentifier = getTableIdentifier(originalTablePath);
return CatalogTable.of(
tableIdentifier,
builder.build(),
buildConnectorOptions(tablePath),
Collections.emptyList(),
"",
catalogName);
}
} catch (SeaTunnelRuntimeException e) {
throw e;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s", tablePath.getFullName()), e);
}
}

@Override
protected Column buildColumn(ResultSet resultSet) throws SQLException {
String columnName = resultSet.getString("COLUMN_NAME");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaTypeMapper;

Expand All @@ -38,6 +39,7 @@
import lombok.SneakyThrows;

import java.sql.Date;
import java.sql.Statement;
import java.time.Duration;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
Expand All @@ -56,7 +58,9 @@ public class JdbcHanaIT extends AbstractJdbcIT {
private static final String SOURCE_TABLE = "ALLDATATYPES";

private static final List<String> CONFIG_FILE =
Lists.newArrayList("/jdbc_sap_hana_source_and_sink.conf");
Lists.newArrayList(
"/jdbc_sap_hana_source_and_sink.conf",
"/jdbc_sap_hana_test_view_and_synonym.conf");

// TODO The current Docker image cannot handle the annotated type normally,
// but the corresponding type can be handled normally on the standard HANA service
Expand Down Expand Up @@ -214,6 +218,46 @@ protected void createSchemaIfNeeded() {
}
}

protected void createNeededTables() {
try (Statement statement = connection.createStatement()) {
String createTemplate = jdbcCase.getCreateSql();

String createSource =
String.format(
createTemplate,
buildTableInfoWithSchema(
jdbcCase.getDatabase(),
jdbcCase.getSchema(),
jdbcCase.getSourceTable()));
statement.execute(createSource);

if (!jdbcCase.isUseSaveModeCreateTable()) {
if (jdbcCase.getSinkCreateSql() != null) {
createTemplate = jdbcCase.getSinkCreateSql();
}
String createSink =
String.format(
createTemplate,
buildTableInfoWithSchema(
jdbcCase.getDatabase(),
jdbcCase.getSchema(),
jdbcCase.getSinkTable()));
statement.execute(createSink);
}
// create view and synonym
String createViewSql =
"CREATE VIEW TEST.ALLDATATYPES_VIEW AS SELECT * FROM TEST.ALLDATATYPES;";
String createSynonymSql =
"CREATE SYNONYM TEST.ALLDATATYPES_SYNONYM FOR TEST.ALLDATATYPES;";
statement.execute(createViewSql);
statement.execute(createSynonymSql);
connection.commit();
} catch (Exception exception) {
log.error(ExceptionUtils.getMessage(exception));
throw new SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception);
}
}

@Override
GenericContainer<?> initContainer() {
GenericContainer<?> container =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Jdbc {
url = "jdbc:sap://e2e_saphana:39017"
driver = "com.sap.db.jdbc.Driver"
connection_check_timeout_sec = 1000
user = "SYSTEM"
password = "testPassword123"
"table_list"=[
{
"table_path"="TEST.ALLDATATYPES_VIEW"
},
{
"table_path"="TEST.ALLDATATYPES_SYNONYM"
}
]
}

}

transform {
}

sink {
Jdbc {
url = "jdbc:sap://e2e_saphana:39017"
driver = "com.sap.db.jdbc.Driver"
connection_check_timeout_sec = 1000
user = "SYSTEM"
password = "testPassword123"
database = "TEST"
table = "${table_name}_sink"
generate_sink_sql = true
schema_save_mode = RECREATE_SCHEMA
data_save_mode = DROP_DATA
}
}

0 comments on commit 7e0c20a

Please sign in to comment.