Skip to content

Commit

Permalink
[fix][io] KCA: Option to use kafka connector's SourceConnector class …
Browse files Browse the repository at this point in the history
…to create task and task config (apache#19772)
  • Loading branch information
dlg99 authored Mar 10, 2023
1 parent 9feb85b commit 90b0f0a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.io.kafka.connect;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import io.confluent.connect.avro.AvroConverter;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
Expand All @@ -33,7 +35,9 @@
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
Expand All @@ -55,6 +59,7 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> {

// kafka connect related variables
private SourceTaskContext sourceTaskContext;
private SourceConnector connector;
@Getter
private SourceTask sourceTask;
public Converter keyConverter;
Expand All @@ -71,6 +76,8 @@ public abstract class AbstractKafkaConnectSource<T> implements Source<T> {
// number of outstandingRecords that have been polled but not been acked
private final AtomicInteger outstandingRecords = new AtomicInteger(0);

public static final String CONNECTOR_CLASS = "kafkaConnectorSourceClass";

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
Map<String, String> stringConfig = new HashMap<>();
Expand All @@ -80,12 +87,6 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
}
});

// get the source class name from config and create source task from reflection
sourceTask = Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG))
.asSubclass(SourceTask.class)
.getDeclaredConstructor()
.newInstance();

topicNamespace = stringConfig.get(PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG);

// initialize the key and value converter
Expand Down Expand Up @@ -129,8 +130,36 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws

sourceTaskContext = new PulsarIOSourceTaskContext(offsetReader, pulsarKafkaWorkerConfig);

final Map<String, String> taskConfig;
if (config.get(CONNECTOR_CLASS) != null) {
String kafkaConnectorFQClassName = config.get(CONNECTOR_CLASS).toString();
Class<?> clazz = Class.forName(kafkaConnectorFQClassName);
connector = (SourceConnector) clazz.getConstructor().newInstance();

Class<? extends Task> taskClass = connector.taskClass();
sourceTask = (SourceTask) taskClass.getConstructor().newInstance();

connector.initialize(new PulsarKafkaSinkContext());
connector.start(stringConfig);

List<Map<String, String>> configs = connector.taskConfigs(1);
checkNotNull(configs);
checkArgument(configs.size() == 1);
taskConfig = configs.get(0);
} else {
// for backward compatibility with old configuration
// that use the task directly

// get the source class name from config and create source task from reflection
sourceTask = Class.forName(stringConfig.get(TaskConfig.TASK_CLASS_CONFIG))
.asSubclass(SourceTask.class)
.getDeclaredConstructor()
.newInstance();
taskConfig = stringConfig;
}

sourceTask.initialize(sourceTaskContext);
sourceTask.start(stringConfig);
sourceTask.start(taskConfig);
}

@Override
Expand Down Expand Up @@ -178,6 +207,11 @@ public void close() {
sourceTask = null;
}

if (connector != null) {
connector.stop();
connector = null;
}

if (offsetStore != null) {
offsetStore.stop();
offsetStore = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;

import java.io.File;
import java.io.OutputStream;
import java.nio.file.Files;
Expand All @@ -47,7 +46,6 @@
@Slf4j
public class KafkaConnectSourceTest extends ProducerConsumerBase {

private Map<String, Object> config = new HashMap<>();
private String offsetTopicName;
// The topic to publish data to, for kafkaSource
private String topicName;
Expand All @@ -62,18 +60,10 @@ protected void setup() throws Exception {
super.internalSetup();
super.producerBaseSetup();

config.put(TaskConfig.TASK_CLASS_CONFIG, "org.apache.kafka.connect.file.FileStreamSourceTask");
config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");

this.offsetTopicName = "persistent://my-property/my-ns/kafka-connect-source-offset";
config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetTopicName);

this.topicName = "persistent://my-property/my-ns/kafka-connect-source";
config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
tempFile = File.createTempFile("some-file-name", null);
config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString());
config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG, String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
tempFile.deleteOnExit();

this.context = mock(SourceContext.class);
this.client = PulsarClient.builder()
Expand All @@ -91,16 +81,44 @@ protected void cleanup() throws Exception {
tempFile.delete();
super.internalCleanup();
}
protected void completedFlush(Throwable error, Void result) {
if (error != null) {
log.error("Failed to flush {} offsets to storage: ", this, error);
} else {
log.info("Finished flushing {} offsets to storage", this);
}

@Test
public void testOpenAndReadConnectorConfig() throws Exception {
Map<String, Object> config = getConfig();
config.put(AbstractKafkaConnectSource.CONNECTOR_CLASS,
"org.apache.kafka.connect.file.FileStreamSourceConnector");

testOpenAndReadTask(config);
}

@Test
public void testOpenAndRead() throws Exception {
public void testOpenAndReadTaskDirect() throws Exception {
Map<String, Object> config = getConfig();

config.put(TaskConfig.TASK_CLASS_CONFIG,
"org.apache.kafka.connect.file.FileStreamSourceTask");

testOpenAndReadTask(config);
}

private Map<String, Object> getConfig() {
Map<String, Object> config = new HashMap<>();

config.put(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.storage.StringConverter");
config.put(PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.storage.StringConverter");

config.put(PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, offsetTopicName);

config.put(FileStreamSourceConnector.TOPIC_CONFIG, topicName);
config.put(FileStreamSourceConnector.FILE_CONFIG, tempFile.getAbsoluteFile().toString());
config.put(FileStreamSourceConnector.TASK_BATCH_SIZE_CONFIG,
String.valueOf(FileStreamSourceConnector.DEFAULT_TASK_BATCH_SIZE));
return config;
}

private void testOpenAndReadTask(Map<String, Object> config) throws Exception {
kafkaConnectSource = new KafkaConnectSource();
kafkaConnectSource.open(config, context);

Expand Down

0 comments on commit 90b0f0a

Please sign in to comment.