Skip to content

Commit

Permalink
Revert "Functions schema integration (apache#1845)" (apache#2018)
Browse files Browse the repository at this point in the history
This reverts commit 97b56cf.
  • Loading branch information
sijie authored and srkukarni committed Jun 22, 2018
1 parent d02ea01 commit 4c5c3cf
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -118,11 +117,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
this.namespaceName = conf.getTopicNames().stream().findFirst()
.flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get();

List<CompletableFuture<Void>> futures =
conf.getTopicNames().stream()
.map(this::subscribeAsync)
List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t))
.collect(Collectors.toList());

FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
try {
Expand All @@ -131,7 +127,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
setState(State.Ready);
// We have successfully created N consumers, so we can start receiving messages now
startReceivingMessages(new ArrayList<>(consumers.values()));
startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
subscribeFuture().complete(MultiTopicsConsumerImpl.this);
log.info("[{}] [{}] Created topics consumer with {} sub-consumers",
topic, subscription, allTopicPartitionsNumber.get());
Expand Down
6 changes: 0 additions & 6 deletions pulsar-functions/api-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>net.jodah</groupId>
<artifactId>typetools</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,10 @@
*/
package org.apache.pulsar.functions.api;

import java.util.Collections;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;

/**
* An interface for serializer/deserializer.
*/
public interface SerDe<T> extends Schema<T> {
public interface SerDe<T> {
T deserialize(byte[] input);

byte[] serialize(T input);

@Override
default SchemaInfo getSchemaInfo() {
SchemaInfo info = new SchemaInfo();
info.setName("");
info.setType(SchemaType.NONE);
info.setSchema(new byte[0]);
info.setProperties(Collections.emptyMap());
return info;
}

@Override
default byte[] encode(T message) {
return serialize(message);
}

@Override
default T decode(byte[] bytes) {
return deserialize(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,6 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
}
}

public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
ClassLoader classLoader) {
this(config, logger, client, classLoader, null);
}

public void setInputConsumer(Consumer inputConsumer) {
this.inputConsumer = inputConsumer;
}

public void setCurrentMessageContext(MessageId messageId, String topicName) {
this.messageId = messageId;
this.currentTopicName = topicName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@
*/
package org.apache.pulsar.functions.instance;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import javax.swing.text.html.Option;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.io.core.Source;

import org.apache.pulsar.functions.source.PulsarSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,12 +38,11 @@
*/
@Slf4j
public class JavaInstance implements AutoCloseable {
private ContextImpl context;

@Getter(AccessLevel.PACKAGE)
private final ContextImpl context;
private Function function;
private java.util.function.Function javaUtilFunction;
private Optional<PulsarSource> optionalPulsarSource = Optional.empty();

public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader,
Expand All @@ -56,8 +52,8 @@ public JavaInstance(InstanceConfig config, Object userClassObject,
Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionDetails().getName());

if (source instanceof PulsarSource) {
this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader);
optionalPulsarSource = Optional.of((PulsarSource) source);
this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader,
((PulsarSource) source).getInputConsumer());
} else {
this.context = null;
}
Expand All @@ -68,17 +64,13 @@ public JavaInstance(InstanceConfig config, Object userClassObject,
} else {
this.javaUtilFunction = (java.util.function.Function) userClassObject;
}

}

public JavaExecutionResult handleMessage(MessageId messageId, String topicName, Object input) {
optionalPulsarSource.ifPresent((pulsarSource) -> {
this.context.setInputConsumer(pulsarSource.getConsumerForTopic(topicName));
this.context.setCurrentMessageContext(messageId, topicName);
});

if (context != null) {
context.setCurrentMessageContext(messageId, topicName);
}
JavaExecutionResult executionResult = new JavaExecutionResult();

try {
Object output;
if (function != null) {
Expand All @@ -93,15 +85,11 @@ public JavaExecutionResult handleMessage(MessageId messageId, String topicName,
return executionResult;
}

public ContextImpl getContext() {
return this.context;
}

@Override
public void close() {
}

public MetricsData getAndResetMetrics() {
public InstanceCommunication.MetricsData getAndResetMetrics() {
return context.getAndResetMetrics();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,23 @@
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.instance.FunctionResultRouter;

public abstract class AbstractOneOuputTopicProducers<T> implements Producers<T> {
public abstract class AbstractOneOuputTopicProducers implements Producers {

protected final PulsarClient client;
protected final String outputTopic;
protected final Schema<T> schema;

AbstractOneOuputTopicProducers(PulsarClient client,
String outputTopic,
Schema<T> schema)
String outputTopic)
throws PulsarClientException {
this.client = client;
this.outputTopic = outputTopic;
this.schema = schema;
}

static <U> ProducerBuilder<U> newProducerBuilder(PulsarClient client, Schema<U> schema) {
static ProducerBuilder<byte[]> newProducerBuilder(PulsarClient client) {
// use function result router to deal with different processing guarantees.
return client.newProducer(schema) //
return client.newProducer() //
.blockIfQueueFull(true) //
.enableBatching(true) //
.batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
Expand All @@ -57,23 +53,23 @@ static <U> ProducerBuilder<U> newProducerBuilder(PulsarClient client, Schema<U>
.messageRouter(FunctionResultRouter.of());
}

protected Producer<T> createProducer(String topic, Schema<T> schema)
protected Producer<byte[]> createProducer(String topic)
throws PulsarClientException {
return createProducer(client, topic, schema);
return createProducer(client, topic);
}

public static <T> Producer<T> createProducer(PulsarClient client, String topic, Schema<T> schema)
public static Producer<byte[]> createProducer(PulsarClient client, String topic)
throws PulsarClientException {
return newProducerBuilder(client, schema).topic(topic).create();
return newProducerBuilder(client).topic(topic).create();
}

protected Producer<T> createProducer(String topic, String producerName, Schema<T> schema)
protected Producer<byte[]> createProducer(String topic, String producerName)
throws PulsarClientException {
return createProducer(client, topic, producerName, schema);
return createProducer(client, topic, producerName);
}

public static <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema)
public static Producer<byte[]> createProducer(PulsarClient client, String topic, String producerName)
throws PulsarClientException {
return newProducerBuilder(client, schema).topic(topic).producerName(producerName).create();
return newProducerBuilder(client).topic(topic).producerName(producerName).create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.instance.producers;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -30,21 +31,19 @@
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;

@Slf4j
public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTopicProducers<T> {
public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers {

@Getter(AccessLevel.PACKAGE)
// PartitionId -> producer
private final Map<String, Producer<T>> producers;
private final Map<String, Producer<byte[]>> producers;


public MultiConsumersOneOuputTopicProducers(PulsarClient client,
String outputTopic,
Schema<T> schema)
String outputTopic)
throws PulsarClientException {
super(client, outputTopic, schema);
super(client, outputTopic);
this.producers = new ConcurrentHashMap<>();
}

Expand All @@ -58,18 +57,18 @@ static String makeProducerName(String srcTopicName, String srcTopicPartition) {
}

@Override
public synchronized Producer<T> getProducer(String srcPartitionId) throws PulsarClientException {
Producer<T> producer = producers.get(srcPartitionId);
public synchronized Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException {
Producer<byte[]> producer = producers.get(srcPartitionId);
if (null == producer) {
producer = createProducer(outputTopic, srcPartitionId, schema);
producer = createProducer(outputTopic, srcPartitionId);
producers.put(srcPartitionId, producer);
}
return producer;
}

@Override
public synchronized void closeProducer(String srcPartitionId) {
Producer<T> producer = producers.get(srcPartitionId);
Producer<byte[]> producer = producers.get(srcPartitionId);
if (null != producer) {
producer.closeAsync();
producers.remove(srcPartitionId);
Expand All @@ -79,7 +78,7 @@ public synchronized void closeProducer(String srcPartitionId) {
@Override
public synchronized void close() {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producers.size());
for (Producer<T> producer: producers.values()) {
for (Producer<byte[]> producer: producers.values()) {
closeFutures.add(producer.closeAsync());
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* An interface for managing publishers within a java instance.
*/
public interface Producers<T> extends AutoCloseable {
public interface Producers extends AutoCloseable {

/**
* Initialize all the producers.
Expand All @@ -40,7 +40,7 @@ public interface Producers<T> extends AutoCloseable {
* src partition Id
* @return the producer instance to produce messages
*/
Producer<T> getProducer(String srcPartitionId) throws PulsarClientException;
Producer<byte[]> getProducer(String srcPartitionId) throws PulsarClientException;

/**
* Close a producer specified by <tt>srcPartitionId</tt>.
Expand Down
Loading

0 comments on commit 4c5c3cf

Please sign in to comment.