Skip to content

Commit

Permalink
[Feature][CONNECTORS-V2-Paimon] Paimon Sink supported truncate table (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
hawk9821 authored Sep 4, 2024
1 parent adf26c2 commit 4f3df22
Show file tree
Hide file tree
Showing 10 changed files with 626 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

import java.io.Closeable;
Expand Down Expand Up @@ -183,6 +184,35 @@ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
}
}

@Override
public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
try {
Identifier identifier = toIdentifier(tablePath);
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
Schema schema = buildPaimonSchema(table.schema());
dropTable(tablePath, ignoreIfNotExists);
catalog.createTable(identifier, schema, ignoreIfNotExists);
} catch (org.apache.paimon.catalog.Catalog.TableNotExistException e) {
throw new TableNotExistException(this.catalogName, tablePath);
} catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException e) {
throw new DatabaseAlreadyExistException(this.catalogName, tablePath.getDatabaseName());
} catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException e) {
throw new DatabaseNotExistException(this.catalogName, tablePath.getDatabaseName());
}
}

private Schema buildPaimonSchema(@NonNull org.apache.paimon.schema.TableSchema schema) {
Schema.Builder builder = Schema.newBuilder();
schema.fields()
.forEach(field -> builder.column(field.name(), field.type(), field.description()));
builder.options(schema.options());
builder.primaryKey(schema.primaryKeys());
builder.partitionKeys(schema.partitionKeys());
builder.comment(schema.comment());
return builder.build();
}

@Override
public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
throws DatabaseNotExistException, CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public class PaimonRecord {
public Long pkId;
public String name;
public Integer score;
public String dt;
public Timestamp oneTime;
public Timestamp twoTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,43 @@ public void testFakeSinkPaimonWithFullTypeAndReadWithFilter(TestContainer contai
Assertions.assertEquals(0, readResult4.getExitCode());
}

@TestTemplate
public void testSinkPaimonTruncateTable(TestContainer container) throws Exception {
Container.ExecResult writeResult =
container.executeJob("/fake_sink_paimon_truncate_with_local_case1.conf");
Assertions.assertEquals(0, writeResult.getExitCode());
Container.ExecResult readResult =
container.executeJob("/fake_sink_paimon_truncate_with_local_case2.conf");
Assertions.assertEquals(0, readResult.getExitCode());
given().ignoreExceptions()
.await()
.atLeast(100L, TimeUnit.MILLISECONDS)
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
// copy paimon to local
container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecord> paimonRecords =
loadPaimonData("seatunnel_namespace10", TARGET_TABLE);
Assertions.assertEquals(2, paimonRecords.size());
paimonRecords.forEach(
paimonRecord -> {
if (paimonRecord.getPkId() == 1) {
Assertions.assertEquals("Aa", paimonRecord.getName());
}
if (paimonRecord.getPkId() == 2) {
Assertions.assertEquals("Bb", paimonRecord.getName());
}
Assertions.assertEquals(200, paimonRecord.getScore());
});
List<Long> ids =
paimonRecords.stream()
.map(PaimonRecord::getPkId)
.collect(Collectors.toList());
Assertions.assertFalse(ids.contains(3L));
});
}

protected final ContainerExtendedFactory containerExtendedFactory =
container -> {
if (isWindows) {
Expand Down Expand Up @@ -568,7 +605,7 @@ private void extractFilesWin() {
}

private List<PaimonRecord> loadPaimonData(String dbName, String tbName) throws Exception {
Table table = getTable(dbName, tbName);
FileStoreTable table = (FileStoreTable) getTable(dbName, tbName);
ReadBuilder readBuilder = table.newReadBuilder();
TableScan.Plan plan = readBuilder.newScan().plan();
TableRead tableRead = readBuilder.newRead();
Expand All @@ -582,7 +619,12 @@ private List<PaimonRecord> loadPaimonData(String dbName, String tbName) throws E
try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
reader.forEachRemaining(
row -> {
result.add(new PaimonRecord(row.getLong(0), row.getString(1).toString()));
PaimonRecord paimonRecord =
new PaimonRecord(row.getLong(0), row.getString(1).toString());
if (table.schema().fieldNames().contains("score")) {
paimonRecord.setScore(row.getInt(2));
}
result.add(paimonRecord);
log.info("key_id:" + row.getLong(0) + ", name:" + row.getString(1));
});
}
Expand Down Expand Up @@ -611,7 +653,7 @@ private Identifier getIdentifier(String dbName, String tbName) {
private Catalog getCatalog() {
Options options = new Options();
if (isWindows) {
options.set("warehouse", "file://" + CATALOG_DIR_WIN);
options.set("warehouse", CATALOG_DIR_WIN);
} else {
options.set("warehouse", "file://" + CATALOG_DIR);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
Expand All @@ -50,6 +51,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.awaitility.Awaitility.given;

Expand Down Expand Up @@ -204,4 +206,131 @@ public void testFakeCDCSinkPaimonWithHiveCatalogAndRead(TestContainer container)
container.executeJob("/paimon_to_assert_with_hivecatalog.conf");
Assertions.assertEquals(0, readResult.getExitCode());
}

@TestTemplate
public void testSinkPaimonHdfsTruncateTable(TestContainer container) throws Exception {
Container.ExecResult writeResult =
container.executeJob("/fake_sink_paimon_truncate_with_hdfs_case1.conf");
Assertions.assertEquals(0, writeResult.getExitCode());
Container.ExecResult readResult =
container.executeJob("/fake_sink_paimon_truncate_with_hdfs_case2.conf");
Assertions.assertEquals(0, readResult.getExitCode());
given().ignoreExceptions()
.await()
.atLeast(100L, TimeUnit.MILLISECONDS)
.atMost(180L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
PaimonSinkConfig paimonSinkConfig =
new PaimonSinkConfig(
ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
PaimonCatalogLoader paimonCatalogLoader =
new PaimonCatalogLoader(paimonSinkConfig);
Catalog catalog = paimonCatalogLoader.loadCatalog();
List<PaimonRecord> paimonRecords =
loadPaimonData(catalog, "seatunnel_namespace11", "st_test");
Assertions.assertEquals(2, paimonRecords.size());
paimonRecords.forEach(
paimonRecord -> {
if (paimonRecord.getPkId() == 1) {
Assertions.assertEquals("Aa", paimonRecord.getName());
}
if (paimonRecord.getPkId() == 2) {
Assertions.assertEquals("Bb", paimonRecord.getName());
}
Assertions.assertEquals(200, paimonRecord.getScore());
});
List<Long> ids =
paimonRecords.stream()
.map(PaimonRecord::getPkId)
.collect(Collectors.toList());
Assertions.assertFalse(ids.contains(3L));
});
}

@TestTemplate
public void testSinkPaimonHiveTruncateTable(TestContainer container) throws Exception {
Container.ExecResult writeResult =
container.executeJob("/fake_sink_paimon_truncate_with_hive_case1.conf");
Assertions.assertEquals(0, writeResult.getExitCode());
Container.ExecResult readResult =
container.executeJob("/fake_sink_paimon_truncate_with_hive_case2.conf");
Assertions.assertEquals(0, readResult.getExitCode());
given().ignoreExceptions()
.await()
.atLeast(100L, TimeUnit.MILLISECONDS)
.atMost(180L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
PaimonSinkConfig paimonSinkConfig =
new PaimonSinkConfig(
ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
PaimonCatalogLoader paimonCatalogLoader =
new PaimonCatalogLoader(paimonSinkConfig);
Catalog catalog = paimonCatalogLoader.loadCatalog();
List<PaimonRecord> paimonRecords =
loadPaimonData(catalog, "seatunnel_namespace12", "st_test");
Assertions.assertEquals(2, paimonRecords.size());
paimonRecords.forEach(
paimonRecord -> {
if (paimonRecord.getPkId() == 1) {
Assertions.assertEquals("Aa", paimonRecord.getName());
}
if (paimonRecord.getPkId() == 2) {
Assertions.assertEquals("Bb", paimonRecord.getName());
}
Assertions.assertEquals(200, paimonRecord.getScore());
});
List<Long> ids =
paimonRecords.stream()
.map(PaimonRecord::getPkId)
.collect(Collectors.toList());
Assertions.assertFalse(ids.contains(3L));
});
}

@TestTemplate
public void testSinkPaimonHiveTruncateTable1(TestContainer container) throws Exception {
PaimonSinkConfig paimonSinkConfig =
new PaimonSinkConfig(ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
PaimonCatalogLoader paimonCatalogLoader = new PaimonCatalogLoader(paimonSinkConfig);
Catalog catalog = paimonCatalogLoader.loadCatalog();
List<PaimonRecord> paimonRecords =
loadPaimonData(catalog, "seatunnel_namespace11", "st_test");
Assertions.assertEquals(2, paimonRecords.size());
paimonRecords.forEach(
paimonRecord -> {
if (paimonRecord.getPkId() == 1) {
Assertions.assertEquals("Aa", paimonRecord.getName());
}
if (paimonRecord.getPkId() == 2) {
Assertions.assertEquals("Bb", paimonRecord.getName());
}
Assertions.assertEquals(200, paimonRecord.getScore());
});
List<Long> ids =
paimonRecords.stream().map(PaimonRecord::getPkId).collect(Collectors.toList());
Assertions.assertFalse(ids.contains(3L));
}

private List<PaimonRecord> loadPaimonData(Catalog catalog, String dbName, String tbName)
throws Exception {
FileStoreTable table = (FileStoreTable) catalog.getTable(Identifier.create(dbName, tbName));
ReadBuilder readBuilder = table.newReadBuilder();
TableScan.Plan plan = readBuilder.newScan().plan();
TableRead tableRead = readBuilder.newRead();
List<PaimonRecord> result = new ArrayList<>();
try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
reader.forEachRemaining(
row -> {
PaimonRecord paimonRecord =
new PaimonRecord(row.getLong(0), row.getString(1).toString());
if (table.schema().fieldNames().contains("score")) {
paimonRecord.setScore(row.getInt(2));
}
result.add(paimonRecord);
});
}
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "A_1", 100]
},
{
kind = DELETE
fields = [2, "B", 100]
}
]
}
}

sink {
Paimon {
warehouse = "hdfs:///tmp/paimon"
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
dfs.client.use.datanode.hostname = "true"
}
}
}
Loading

0 comments on commit 4f3df22

Please sign in to comment.