Skip to content

Commit

Permalink
[fix][connector] KCA connectors: fix offset mapping when sanitizeTopi…
Browse files Browse the repository at this point in the history
…cName=true (apache#15950)
  • Loading branch information
nicoloboschi authored Jun 6, 2022
1 parent 722c56d commit 49ee8a6
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ private void ackUntil(Record<GenericObject> lastNotFlushed, java.util.function.C
@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final int partition = sourceRecord.getPartitionIndex().orElse(0);
final String topic = sourceRecord.getTopicName().orElse(topicName);
final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
final Object key;
final Object value;
final Schema keySchema;
Expand Down Expand Up @@ -300,7 +300,7 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
// keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
timestamp = sourceRecord.getMessage().get().getPublishTime();
}
return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName),
return new SinkRecord(topic,
partition,
keySchema,
key,
Expand All @@ -313,7 +313,7 @@ protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {

@VisibleForTesting
protected long currentOffset(String topic, int partition) {
return taskContext.currentOffset(topic, partition);
return taskContext.currentOffset(sanitizeNameIfNeeded(topic, sanitizeTopicName), partition);
}

// Replace all non-letter, non-digit characters with underscore.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

@SuppressWarnings({"unchecked", "rawtypes"})
Expand Down Expand Up @@ -201,6 +202,8 @@ public void sanitizeTest() throws Exception {
sink.write(record);
sink.flush();

assertTrue(sink.currentOffset("persistent://a-b/c-d/fake-topic.a", 0) > 0L);

assertEquals(status.get(), 1);
assertEquals(resultCaptor.getResult().topic(), "persistent___a_b_c_d_fake_topic_a");

Expand Down

0 comments on commit 49ee8a6

Please sign in to comment.