From 481f69eae0ca74f99f358389058b439986bf24e6 Mon Sep 17 00:00:00 2001 From: Sanjeev Kulkarni Date: Mon, 21 May 2018 00:51:28 -0700 Subject: [PATCH] Make the current reference sinks usable (#1814) * Make the current reference sinks usable * removed unused imports --- .../apache/pulsar/io/aerospike/AerospikeSink.java | 15 +++++++++------ .../apache/pulsar/io/cassandra/CassandraSink.java | 14 ++++++++------ .../org/apache/pulsar/io/kafka/KafkaSink.java | 13 ++++++++----- 3 files changed, 25 insertions(+), 17 deletions(-) diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java index f1390c12d52fd..18e851944bf75 100644 --- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java +++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java @@ -33,7 +33,6 @@ import com.aerospike.client.policy.WritePolicy; import org.apache.pulsar.common.util.KeyValue; import org.apache.pulsar.io.core.SimpleSink; -import org.apache.pulsar.io.core.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,9 +42,10 @@ import java.util.concurrent.LinkedBlockingDeque; /** - * Simple AeroSpike sink + * A Simple abstract class for Aerospike sink + * Users need to implement extractKeyValue function to use this sink */ -public class AerospikeSink extends SimpleSink> { +public abstract class AerospikeSink extends SimpleSink { private static final Logger LOG = LoggerFactory.getLogger(AerospikeSink.class); @@ -83,10 +83,11 @@ public void close() throws Exception { } @Override - public CompletableFuture write(KeyValue record) { + public CompletableFuture write(byte[] record) { CompletableFuture future = new CompletableFuture<>(); - Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), record.getKey().toString()); - Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(record.getValue())); + KeyValue keyValue = extractKeyValue(record); + Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString()); + Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(keyValue.getValue())); AWriteListener listener = null; try { listener = queue.take(); @@ -154,4 +155,6 @@ public void onFailure(AerospikeException e) { } } } + + public abstract KeyValue extractKeyValue(byte[] message); } \ No newline at end of file diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java index 9aa09e92a815d..710feb7a6001a 100644 --- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java +++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java @@ -29,7 +29,6 @@ import com.google.common.util.concurrent.Futures; import org.apache.pulsar.common.util.KeyValue; import org.apache.pulsar.io.core.SimpleSink; -import org.apache.pulsar.io.core.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,10 +36,10 @@ import java.util.concurrent.CompletableFuture; /** - * Simple Cassandra sink - * Takes in a KeyValue and writes it to a predefined keyspace/columnfamily/columnname. + * A Simple abstract class for Cassandra sink + * Users need to implement extractKeyValue function to use this sink */ -public class CassandraSink extends SimpleSink> { +public abstract class CassandraSink extends SimpleSink { private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class); @@ -72,8 +71,9 @@ public void close() throws Exception { } @Override - public CompletableFuture write(KeyValue record) { - BoundStatement bound = statement.bind(record.getKey(), record.getValue()); + public CompletableFuture write(byte[] record) { + KeyValue keyValue = extractKeyValue(record); + BoundStatement bound = statement.bind(keyValue.getKey(), keyValue.getValue()); ResultSetFuture future = session.executeAsync(bound); CompletableFuture completable = new CompletableFuture(); Futures.addCallback(future, @@ -108,4 +108,6 @@ private void createClient(String roots) { session = cluster.connect(); session.execute("USE " + cassandraSinkConfig.getKeyspace()); } + + public abstract KeyValue extractKeyValue(byte[] message); } \ No newline at end of file diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java index 08ca65274afcd..5aa98941602c7 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java @@ -25,7 +25,6 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.pulsar.common.util.KeyValue; import org.apache.pulsar.io.core.SimpleSink; -import org.apache.pulsar.io.core.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,9 +36,10 @@ import java.util.concurrent.Future; /** - * Simple Kafka Sink to publish messages to a Kafka topic + * A Simple abstract class for Kafka sink + * Users need to implement extractKeyValue function to use this sink */ -public class KafkaSink extends SimpleSink> { +public abstract class KafkaSink extends SimpleSink { private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); @@ -48,8 +48,9 @@ public class KafkaSink extends SimpleSink> { private KafkaSinkConfig kafkaSinkConfig; @Override - public CompletableFuture write(KeyValue message) { - ProducerRecord record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), message.getValue()); + public CompletableFuture write(byte[] message) { + KeyValue keyValue = extractKeyValue(message); + ProducerRecord record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(), keyValue.getValue()); LOG.debug("Record sending to kafka, record={}.", record); Future f = producer.send(record); return CompletableFuture.supplyAsync(() -> { @@ -91,4 +92,6 @@ public void open(Map config) throws Exception { LOG.info("Kafka sink started."); } + + public abstract KeyValue extractKeyValue(byte[] message); } \ No newline at end of file