Skip to content

Commit

Permalink
Modifying sink interface to be generic (apache#1792)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and merlimat committed May 17, 2018
1 parent 920ecba commit 5e4f2bb
Show file tree
Hide file tree
Showing 9 changed files with 34 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.sink.DefaultRuntimeSink;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.RuntimeSink;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
Expand Down Expand Up @@ -106,7 +104,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private Record currentRecord;

private Source source;
private RuntimeSink sink;
private Sink sink;

public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
Expand Down Expand Up @@ -524,10 +522,8 @@ public void setupOutput() throws Exception {
Thread.currentThread().getContextClassLoader());
}

if (object instanceof RuntimeSink) {
this.sink = (RuntimeSink) object;
} else if (object instanceof Sink) {
this.sink = DefaultRuntimeSink.of((Sink) object);
if (object instanceof Sink) {
this.sink = (Sink) object;
} else {
throw new RuntimeException("Sink does not implement correct interface");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.io.core.RecordContext;
import org.apache.pulsar.io.core.Sink;

import java.util.Base64;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class PulsarSink<T> implements RuntimeSink<T> {
public class PulsarSink<T> implements Sink<T> {

private PulsarClient client;
private PulsarSinkConfig pulsarSinkConfig;
Expand Down Expand Up @@ -206,11 +207,6 @@ public void open(Map<String, Object> config) throws Exception {
this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic());
}

@Override
public CompletableFuture<Void> write(T value) {
return null;
}

@Override
public void write(RecordContext recordContext, T value) throws Exception {

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.WritePolicy;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -44,7 +45,7 @@
/**
* Simple AeroSpike sink
*/
public class AerospikeSink<K, V> implements Sink<KeyValue<K, V>> {
public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {

private static final Logger LOG = LoggerFactory.getLogger(AerospikeSink.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -39,7 +40,7 @@
* Simple Cassandra sink
* Takes in a KeyValue and writes it to a predefined keyspace/columnfamily/columnname.
*/
public class CassandraSink<K, V> implements Sink<KeyValue<K, V>> {
public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {

private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.sink;
package org.apache.pulsar.io.core;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.io.core.Sink;

/**
* The default implementation of runtime sink.
*
* @param <T>
* A simpler version of the Sink interface users can extend for use cases to
* don't require fine grained delivery control
*/
public class DefaultRuntimeSink<T> implements RuntimeSink<T> {

public static <T> DefaultRuntimeSink<T> of(Sink<T> sink) {
return new DefaultRuntimeSink<>(sink);
}

private final Sink<T> sink;
public abstract class SimpleSink<T> implements Sink<T> {

private DefaultRuntimeSink(Sink<T> sink) {
this.sink = sink;
}

/**
* Open connector with configuration
*
* @param config initialization config
* @throws Exception IO type exceptions when opening a connector
*/
@Override
public void open(final Map<String, Object> config) throws Exception {
sink.open(config);
public void write(RecordContext inputRecordContext, T value) throws Exception {
write(value)
.thenAccept(ignored -> inputRecordContext.ack())
.exceptionally(cause -> {
inputRecordContext.fail();
return null;
});
}

/**
Expand All @@ -56,13 +42,5 @@ public void open(final Map<String, Object> config) throws Exception {
* @param value output value
* @return Completable future fo async publish request
*/
@Override
public CompletableFuture<Void> write(T value) {
return sink.write(value);
}

@Override
public void close() throws Exception {
sink.close();
}
public abstract CompletableFuture<Void> write(T value);
}
21 changes: 11 additions & 10 deletions pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,11 @@
package org.apache.pulsar.io.core;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Pulsar's Sink interface. Sink read data from
* a Pulsar topic and write it to external sinks(kv store, database, filesystem ,etc)
* The lifcycle of a Sink is to open it passing any config needed
* by it to initialize(like open network connection, authenticate, etc).
* On every message from the designated PulsarTopic, the write method is
* invoked which writes the message to the external sink. One can use close
* at the end of the session to do any cleanup
* Generic sink interface users can implement to run Sink on top of Pulsar Functions
*/
public interface Sink<T> extends AutoCloseable {
public interface Sink<T> extends AutoCloseable{
/**
* Open connector with configuration
*
Expand All @@ -45,5 +38,13 @@ public interface Sink<T> extends AutoCloseable {
* @param value output value
* @return Completable future fo async publish request
*/
CompletableFuture<Void> write(T value);


/**
* Write a message to Sink
* @param inputRecordContext Context of value
* @param value value to write to sink
* @throws Exception
*/
void write(RecordContext inputRecordContext, T value) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,7 +39,7 @@
/**
* Simple Kafka Sink to publish messages to a Kafka topic
*/
public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> {
public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);

Expand Down

0 comments on commit 5e4f2bb

Please sign in to comment.