Skip to content

Commit

Permalink
Make the current reference sinks usable (apache#1814)
Browse files Browse the repository at this point in the history
* Make the current reference sinks usable

* removed unused imports
  • Loading branch information
srkukarni authored and sijie committed May 21, 2018
1 parent d7d6ea7 commit 481f69e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.aerospike.client.policy.WritePolicy;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,9 +42,10 @@
import java.util.concurrent.LinkedBlockingDeque;

/**
* Simple AeroSpike sink
* A Simple abstract class for Aerospike sink
* Users need to implement extractKeyValue function to use this sink
*/
public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
public abstract class AerospikeSink<K, V> extends SimpleSink<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(AerospikeSink.class);

Expand Down Expand Up @@ -83,10 +83,11 @@ public void close() throws Exception {
}

@Override
public CompletableFuture<Void> write(KeyValue<K, V> record) {
public CompletableFuture<Void> write(byte[] record) {
CompletableFuture<Void> future = new CompletableFuture<>();
Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), record.getKey().toString());
Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(record.getValue()));
KeyValue<K, V> keyValue = extractKeyValue(record);
Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString());
Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(keyValue.getValue()));
AWriteListener listener = null;
try {
listener = queue.take();
Expand Down Expand Up @@ -154,4 +155,6 @@ public void onFailure(AerospikeException e) {
}
}
}

public abstract KeyValue<K, V> extractKeyValue(byte[] message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,17 @@
import com.google.common.util.concurrent.Futures;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Simple Cassandra sink
* Takes in a KeyValue and writes it to a predefined keyspace/columnfamily/columnname.
* A Simple abstract class for Cassandra sink
* Users need to implement extractKeyValue function to use this sink
*/
public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
public abstract class CassandraSink<K, V> extends SimpleSink<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);

Expand Down Expand Up @@ -72,8 +71,9 @@ public void close() throws Exception {
}

@Override
public CompletableFuture<Void> write(KeyValue<K, V> record) {
BoundStatement bound = statement.bind(record.getKey(), record.getValue());
public CompletableFuture<Void> write(byte[] record) {
KeyValue<K, V> keyValue = extractKeyValue(record);
BoundStatement bound = statement.bind(keyValue.getKey(), keyValue.getValue());
ResultSetFuture future = session.executeAsync(bound);
CompletableFuture<Void> completable = new CompletableFuture<Void>();
Futures.addCallback(future,
Expand Down Expand Up @@ -108,4 +108,6 @@ private void createClient(String roots) {
session = cluster.connect();
session.execute("USE " + cassandraSinkConfig.getKeyspace());
}

public abstract KeyValue<K, V> extractKeyValue(byte[] message);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,9 +36,10 @@
import java.util.concurrent.Future;

/**
* Simple Kafka Sink to publish messages to a Kafka topic
* A Simple abstract class for Kafka sink
* Users need to implement extractKeyValue function to use this sink
*/
public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
public abstract class KafkaSink<K, V> extends SimpleSink<byte[]> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);

Expand All @@ -48,8 +48,9 @@ public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
private KafkaSinkConfig kafkaSinkConfig;

@Override
public CompletableFuture<Void> write(KeyValue<K, V> message) {
ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), message.getValue());
public CompletableFuture<Void> write(byte[] message) {
KeyValue<K, V> keyValue = extractKeyValue(message);
ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(), keyValue.getValue());
LOG.debug("Record sending to kafka, record={}.", record);
Future f = producer.send(record);
return CompletableFuture.supplyAsync(() -> {
Expand Down Expand Up @@ -91,4 +92,6 @@ public void open(Map<String, Object> config) throws Exception {

LOG.info("Kafka sink started.");
}

public abstract KeyValue<K, V> extractKeyValue(byte[] message);
}

0 comments on commit 481f69e

Please sign in to comment.