Skip to content

Commit

Permalink
Add Type information to Pulsar client interface (apache#987)
Browse files Browse the repository at this point in the history
* Add type information to Consumer

* Typed Producer and compiling tests

* Typed consumers

* Typed Client with typed consumers and producers

* Wrap reader and writer configurations in typed variants

* remove type annotation from publisher impl

* Add license headers to new files.

* Remove type from PulsarClient

* Type the consumer on the content not the message

* Fix generic type for Reader config

* Remove type information from PartitionedProducerImpl

* Rename Codec to Schema and add javadoc for client interface

* Make the correct api calls

* Fix incorrectly merged test

* WIP: BROKEN REPLICATOR

* unwrap producer

* Passing tests

* Fix broken merge

* wip

* wip
  • Loading branch information
mgodave authored and merlimat committed Feb 28, 2018
1 parent 9c54e73 commit 3e76ba2
Show file tree
Hide file tree
Showing 59 changed files with 728 additions and 544 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public abstract class AbstractReplicator {
protected volatile ProducerImpl producer;

protected final int producerQueueSize;
protected final ProducerBuilder producerBuilder;
protected final ProducerBuilder<byte[]> producerBuilder;

protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS);

Expand Down Expand Up @@ -80,7 +80,7 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
STATE_UPDATER.set(this, State.Stopped);
}

protected abstract void readEntries(org.apache.pulsar.client.api.Producer producer);
protected abstract void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer);

protected abstract Position getReplicatorReadPosition();

Expand Down Expand Up @@ -231,7 +231,7 @@ public static String getReplicatorName(String replicatorPrefix, String cluster)
* Therefore, replicator can't be started on root-partition topic which can internally create multiple partitioned
* producers.
*
* @param topicName
* @param topic
* @param brokerService
*/
private void validatePartitionedTopic(String topic, BrokerService brokerService) throws NamingException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarClientImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, St
}

@Override
protected void readEntries(Producer producer) {
protected void readEntries(Producer<byte[]> producer) {
this.producer = (ProducerImpl) producer;

if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Started)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String
}

@Override
protected void readEntries(org.apache.pulsar.client.api.Producer producer) {
protected void readEntries(org.apache.pulsar.client.api.Producer<byte[]> producer) {
// Rewind the cursor to be sure to read again all non-acked messages sent while restarting
cursor.rewind();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RawReaderImpl;

Expand All @@ -32,8 +30,9 @@ public interface RawReader {
/**
* Create a raw reader for a topic.
*/

public static CompletableFuture<RawReader> create(PulsarClient client, String topic, String subscription) {
CompletableFuture<Consumer> future = new CompletableFuture<>();
CompletableFuture<Consumer<byte[]>> future = new CompletableFuture<>();
RawReader r = new RawReaderImpl((PulsarClientImpl)client, topic, subscription, future);
return future.thenCompose((consumer) -> r.seekAsync(MessageId.earliest)).thenApply((ignore) -> r);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand All @@ -43,12 +44,12 @@
public class RawReaderImpl implements RawReader {

final static int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
private final ConsumerConfigurationData consumerConfiguration;
private final ConsumerConfigurationData<byte[]> consumerConfiguration;
private RawConsumerImpl consumer;

public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
CompletableFuture<Consumer> consumerFuture) {
consumerConfiguration = new ConsumerConfigurationData();
CompletableFuture<Consumer<byte[]>> consumerFuture) {
consumerConfiguration = new ConsumerConfigurationData<>();
consumerConfiguration.getTopicNames().add(topic);
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
Expand Down Expand Up @@ -83,14 +84,14 @@ public CompletableFuture<MessageId> getLastMessageIdAsync() {
return consumer.getLastMessageIdAsync();
}

static class RawConsumerImpl extends ConsumerImpl {
static class RawConsumerImpl extends ConsumerImpl<byte[]> {
final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
final Queue<CompletableFuture<RawMessage>> pendingRawReceives;

RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf,
CompletableFuture<Consumer> consumerFuture) {
CompletableFuture<Consumer<byte[]>> consumerFuture) {
super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
consumerFuture, SubscriptionMode.Durable, MessageId.earliest);
consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,8 +584,8 @@ public void testActiveConsumerFailoverWithDelay() throws Exception {
producer.close();

// two consumers subscribe at almost the same time
CompletableFuture<Consumer> subscribeFuture2 = pulsarClient.subscribeAsync(topicName, subName, consumerConf2);
CompletableFuture<Consumer> subscribeFuture1 = pulsarClient.subscribeAsync(topicName, subName, consumerConf1);
CompletableFuture<Consumer<byte[]>> subscribeFuture2 = pulsarClient.subscribeAsync(topicName, subName, consumerConf2);
CompletableFuture<Consumer<byte[]>> subscribeFuture1 = pulsarClient.subscribeAsync(topicName, subName, consumerConf1);

// wait for all messages to be dequeued
int retry = 20;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
Expand Down Expand Up @@ -1213,14 +1214,22 @@ public void testClosingReplicationProducerTwice() throws Exception {
.createProducerAsync(any(ProducerConfigurationData.class));

replicator.startProducer();
verify(clientImpl).createProducerAsync(any(ProducerConfigurationData.class));
verify(clientImpl)
.createProducerAsync(
any(ProducerConfigurationData.class),
any(Schema.class)
);

replicator.disconnect(false);
replicator.disconnect(false);

replicator.startProducer();

verify(clientImpl, Mockito.times(2)).createProducerAsync(any(ProducerConfigurationData.class));
verify(clientImpl, Mockito.times(2))
.createProducerAsync(
any(ProducerConfigurationData.class),
any(Schema.class)
);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.List;
Expand All @@ -37,7 +39,6 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -58,6 +59,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
Expand All @@ -80,10 +82,6 @@
import org.testng.annotations.Test;
import org.testng.collections.Lists;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;

import io.netty.buffer.ByteBuf;

/**
* Starts 2 brokers that are in 2 different clusters
*/
Expand Down Expand Up @@ -244,7 +242,11 @@ public void testConcurrentReplicator() throws Exception {
}
Thread.sleep(3000);

Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class));
Mockito.verify(pulsarClient, Mockito.times(1))
.createProducerAsync(
Mockito.any(ProducerConfigurationData.class),
Mockito.any(Schema.class)
);

}

Expand Down
Loading

0 comments on commit 3e76ba2

Please sign in to comment.