Skip to content

Commit

Permalink
[Hotfix][Connector-V2] Fix the batch write with paimon (apache#6865)
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai authored May 20, 2024
1 parent 8f2049b commit 9ec971d
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public PaimonSinkWriter(
this.table = table;
this.tableWriteBuilder =
JobContextUtil.isBatchJob(jobContext)
? this.table.newBatchWriteBuilder().withOverwrite()
? this.table.newBatchWriteBuilder()
: this.table.newStreamWriteBuilder();
this.tableWrite = tableWriteBuilder.newWrite();
this.seaTunnelRowType = seaTunnelRowType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,40 @@ public void startUp() throws Exception {}
@Override
public void tearDown() throws Exception {}

@TestTemplate
public void testSinkWithMultipleInBatchMode(TestContainer container) throws Exception {
Container.ExecResult execOneResult =
container.executeJob("/fake_cdc_sink_paimon_case9.conf");
Assertions.assertEquals(0, execOneResult.getExitCode());

Container.ExecResult execTwoResult =
container.executeJob("/fake_cdc_sink_paimon_case10.conf");
Assertions.assertEquals(0, execTwoResult.getExitCode());

given().ignoreExceptions()
.await()
.atLeast(100L, TimeUnit.MILLISECONDS)
.atMost(30L, TimeUnit.SECONDS)
.untilAsserted(
() -> {
// copy paimon to local
container.executeExtraCommands(containerExtendedFactory);
List<PaimonRecord> paimonRecords =
loadPaimonData("seatunnel_namespace9", TARGET_TABLE);
Assertions.assertEquals(3, paimonRecords.size());
paimonRecords.forEach(
paimonRecord -> {
if (paimonRecord.getPkId() == 1) {
Assertions.assertEquals("A", paimonRecord.getName());
}
if (paimonRecord.getPkId() == 2
|| paimonRecord.getPkId() == 3) {
Assertions.assertEquals("CCC", paimonRecord.getName());
}
});
});
}

@TestTemplate
public void testFakeCDCSinkPaimon(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/fake_cdc_sink_paimon_case1.conf");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [2, "CCC", 100]
},
{
kind = INSERT
fields = [3, "CCC", 100]
}
]
}
}

sink {
Paimon {
warehouse = "file:///tmp/paimon"
database = "seatunnel_namespace9"
table = "st_test"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 2
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
]
}
}

sink {
Paimon {
warehouse = "file:///tmp/paimon"
database = "seatunnel_namespace9"
table = "st_test"
}
}

0 comments on commit 9ec971d

Please sign in to comment.