Skip to content

Commit

Permalink
Make the KakfaSource abstract by allowing users to implement method t…
Browse files Browse the repository at this point in the history
…o extract value (apache#1842)
  • Loading branch information
srkukarni authored and sijie committed May 24, 2018
1 parent a183d08 commit e249493
Showing 1 changed file with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
/**
* Simple Kafka Source to transfer messages from a Kafka topic
*/
public class KafkaSource<V> extends PushSource<V> {
public abstract class KafkaSource<V> extends PushSource<V> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);

private Consumer<String, V> consumer;
private Consumer<byte[], byte[]> consumer;
private Properties props;
private KafkaSourceConfig kafkaSourceConfig;
Thread runnerThread;
Expand Down Expand Up @@ -103,14 +103,14 @@ public void start() {
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
ConsumerRecords<String, V> consumerRecords;
ConsumerRecords<byte[], byte[]> consumerRecords;
while(true){
consumerRecords = consumer.poll(1000);
CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
int index = 0;
for (ConsumerRecord<String, V> consumerRecord : consumerRecords) {
for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
KafkaRecord<V> record = new KafkaRecord<>(consumerRecord);
KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
consumeFunction.accept(record);
futures[index] = record.getCompletableFuture();
index++;
Expand All @@ -130,13 +130,18 @@ public void start() {
runnerThread.start();
}

public abstract V extractValue(ConsumerRecord<byte[], byte[]> record);

static private class KafkaRecord<V> implements Record<V> {
private final ConsumerRecord<String, V> record;
private final ConsumerRecord<byte[], byte[]> record;
private final V value;
@Getter
private final CompletableFuture<Void> completableFuture = new CompletableFuture();

public KafkaRecord(ConsumerRecord<String, V> record) {
public KafkaRecord(ConsumerRecord<byte[], byte[]> record,
V value) {
this.record = record;
this.value = value;
}
@Override
public String getPartitionId() {
Expand All @@ -150,7 +155,7 @@ public long getRecordSequence() {

@Override
public V getValue() {
return record.value();
return value;
}

@Override
Expand Down

0 comments on commit e249493

Please sign in to comment.