Skip to content

Commit 7d8028a

Browse files
authored
[Improve][JdbcSource] Optimize catalog-table metadata merge logic (apache#5828)
1 parent f69f773 commit 7d8028a

File tree

2 files changed

+359
-49
lines changed

2 files changed

+359
-49
lines changed

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java

+64-49
Original file line numberDiff line numberDiff line change
@@ -173,64 +173,58 @@ private static CatalogTable getCatalogTable(
173173
return jdbcCatalog.getTable(tableConfig.getQuery());
174174
}
175175

176-
private static CatalogTable mergeCatalogTable(
177-
CatalogTable tableOfPath, CatalogTable tableOfQuery) {
178-
String catalogName =
179-
tableOfQuery.getTableId() == null
180-
? DEFAULT_CATALOG_NAME
181-
: tableOfQuery.getTableId().getCatalogName();
182-
TableIdentifier tableIdentifier =
183-
TableIdentifier.of(
184-
catalogName,
185-
tableOfPath.getTableId().getDatabaseName(),
186-
tableOfPath.getTableId().getSchemaName(),
187-
tableOfPath.getTableId().getTableName());
188-
176+
static CatalogTable mergeCatalogTable(CatalogTable tableOfPath, CatalogTable tableOfQuery) {
189177
TableSchema tableSchemaOfPath = tableOfPath.getTableSchema();
190178
Map<String, Column> columnsOfPath =
191179
tableSchemaOfPath.getColumns().stream()
192-
.collect(Collectors.toMap(Column::getName, Function.identity()));
193-
Set<String> columnKeysOfPath = columnsOfPath.keySet();
180+
.collect(
181+
Collectors.toMap(
182+
Column::getName,
183+
Function.identity(),
184+
(o1, o2) -> o1,
185+
LinkedHashMap::new));
194186
TableSchema tableSchemaOfQuery = tableOfQuery.getTableSchema();
195187
Map<String, Column> columnsOfQuery =
196188
tableSchemaOfQuery.getColumns().stream()
197-
.collect(Collectors.toMap(Column::getName, Function.identity()));
189+
.collect(
190+
Collectors.toMap(
191+
Column::getName,
192+
Function.identity(),
193+
(o1, o2) -> o1,
194+
LinkedHashMap::new));
198195
Set<String> columnKeysOfQuery = columnsOfQuery.keySet();
199196

200-
if (columnKeysOfPath.equals(columnKeysOfQuery)) {
201-
boolean schemaEquals =
202-
columnKeysOfPath.stream()
203-
.allMatch(
204-
key ->
205-
columnsOfPath
206-
.get(key)
207-
.getDataType()
208-
.equals(columnsOfQuery.get(key).getDataType()));
209-
if (schemaEquals) {
210-
return CatalogTable.of(
211-
tableIdentifier,
212-
TableSchema.builder()
213-
.primaryKey(tableSchemaOfPath.getPrimaryKey())
214-
.constraintKey(tableSchemaOfPath.getConstraintKeys())
215-
.columns(tableSchemaOfQuery.getColumns())
216-
.build(),
217-
tableOfPath.getOptions(),
218-
tableOfPath.getPartitionKeys(),
219-
tableOfPath.getComment(),
220-
tableIdentifier.getCatalogName());
221-
}
197+
List<Column> columnsOfMerge =
198+
tableSchemaOfQuery.getColumns().stream()
199+
.filter(
200+
column ->
201+
columnsOfPath.containsKey(column.getName())
202+
&& columnsOfPath
203+
.get(column.getName())
204+
.getDataType()
205+
.equals(
206+
columnsOfQuery
207+
.get(column.getName())
208+
.getDataType()))
209+
.map(column -> columnsOfPath.get(column.getName()))
210+
.collect(Collectors.toList());
211+
boolean schemaIncludeAllColumns = columnsOfMerge.size() == columnKeysOfQuery.size();
212+
boolean schemaEquals =
213+
schemaIncludeAllColumns && columnsOfMerge.size() == columnsOfPath.size();
214+
if (schemaEquals) {
215+
return tableOfPath;
222216
}
223217

224218
PrimaryKey primaryKeyOfPath = tableSchemaOfPath.getPrimaryKey();
225219
List<ConstraintKey> constraintKeysOfPath = tableSchemaOfPath.getConstraintKeys();
226220
List<String> partitionKeysOfPath = tableOfPath.getPartitionKeys();
227-
PrimaryKey primaryKeyOfQuery = null;
228-
List<ConstraintKey> constraintKeysOfQuery = new ArrayList<>();
229-
List<String> partitionKeysOfQuery = new ArrayList<>();
221+
PrimaryKey primaryKeyOfMerge = null;
222+
List<ConstraintKey> constraintKeysOfMerge = new ArrayList<>();
223+
List<String> partitionKeysOfMerge = new ArrayList<>();
230224

231225
if (primaryKeyOfPath != null
232226
&& columnKeysOfQuery.containsAll(primaryKeyOfPath.getColumnNames())) {
233-
primaryKeyOfQuery = primaryKeyOfPath;
227+
primaryKeyOfMerge = primaryKeyOfPath;
234228
}
235229
if (constraintKeysOfPath != null) {
236230
for (ConstraintKey constraintKey : constraintKeysOfPath) {
@@ -239,26 +233,47 @@ private static CatalogTable mergeCatalogTable(
239233
.map(e -> e.getColumnName())
240234
.collect(Collectors.toSet());
241235
if (columnKeysOfQuery.containsAll(constraintKeyFields)) {
242-
constraintKeysOfQuery.add(constraintKey);
236+
constraintKeysOfMerge.add(constraintKey);
243237
}
244238
}
245239
}
246240
if (partitionKeysOfPath != null && columnKeysOfQuery.containsAll(partitionKeysOfPath)) {
247-
partitionKeysOfQuery = partitionKeysOfPath;
241+
partitionKeysOfMerge = partitionKeysOfPath;
242+
}
243+
if (schemaIncludeAllColumns) {
244+
return CatalogTable.of(
245+
tableOfPath.getTableId(),
246+
TableSchema.builder()
247+
.primaryKey(primaryKeyOfMerge)
248+
.constraintKey(constraintKeysOfMerge)
249+
.columns(columnsOfMerge)
250+
.build(),
251+
tableOfPath.getOptions(),
252+
partitionKeysOfMerge,
253+
tableOfPath.getComment());
248254
}
249255

256+
String catalogName =
257+
tableOfQuery.getTableId() == null
258+
? DEFAULT_CATALOG_NAME
259+
: tableOfQuery.getTableId().getCatalogName();
260+
TableIdentifier tableIdentifier =
261+
TableIdentifier.of(
262+
catalogName,
263+
tableOfPath.getTableId().getDatabaseName(),
264+
tableOfPath.getTableId().getSchemaName(),
265+
tableOfPath.getTableId().getTableName());
250266
CatalogTable mergedCatalogTable =
251267
CatalogTable.of(
252268
tableIdentifier,
253269
TableSchema.builder()
254-
.primaryKey(primaryKeyOfQuery)
255-
.constraintKey(constraintKeysOfQuery)
270+
.primaryKey(primaryKeyOfMerge)
271+
.constraintKey(constraintKeysOfMerge)
256272
.columns(tableSchemaOfQuery.getColumns())
257273
.build(),
258274
tableOfPath.getOptions(),
259-
partitionKeysOfQuery,
260-
tableOfPath.getComment(),
261-
tableIdentifier.getCatalogName());
275+
partitionKeysOfMerge,
276+
tableOfPath.getComment());
262277

263278
log.info("Merged catalog table of path {}", tableOfPath.getTableId().toTablePath());
264279
return mergedCatalogTable;

0 commit comments

Comments
 (0)