Skip to content

Commit

Permalink
Converted main part of code to use builder APIs with typed interface (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Mar 2, 2018
1 parent 21ffe2c commit a64b383
Show file tree
Hide file tree
Showing 41 changed files with 436 additions and 361 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
Expand Down Expand Up @@ -499,27 +500,29 @@ public PulsarClient getReplicationClient(String cluster) {
String path = PulsarWebResource.path("clusters", cluster);
ClusterData data = this.pulsar.getConfigurationCache().clustersCache().get(path)
.orElseThrow(() -> new KeeperException.NoNodeException(path));
ClientConfiguration configuration = new ClientConfiguration();
configuration.setUseTcpNoDelay(false);
configuration.setConnectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker());
configuration.setStatsInterval(0, TimeUnit.SECONDS);
ClientBuilder clientBuilder = PulsarClient.builder()
.enableTcpNoDelay(false)
.connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker())
.statsInterval(0, TimeUnit.SECONDS);
if (pulsar.getConfiguration().isAuthenticationEnabled()) {
configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(),
pulsar.getConfiguration().getBrokerClientAuthenticationParameters());
}
String clusterUrl = null;
if (pulsar.getConfiguration().isReplicationTlsEnabled()) {
clusterUrl = isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
: data.getServiceUrlTls();
configuration.setUseTls(true);
configuration.setTlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath());
configuration
.setTlsAllowInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
clientBuilder
.serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls()
: data.getServiceUrlTls())
.enableTls(true)
.tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath())
.allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection());
} else {
clusterUrl = isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl()
: data.getServiceUrl();
clientBuilder.serviceUrl(
isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
}
return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup);

// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,24 +104,20 @@ protected void resetConfig() {

protected final void internalSetup() throws Exception {
init();
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
lookupUrl = new URI(brokerUrl.toString());
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT);
}
pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf);
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build();
}

protected final void internalSetupForStatsTest() throws Exception {
init();
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(1, TimeUnit.SECONDS);
String lookupUrl = brokerUrl.toString();
if (isTcpLookup) {
lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString();
}
pulsarClient = PulsarClient.create(lookupUrl, clientConf);
pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(1, TimeUnit.SECONDS).build();
}

protected final void init() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public void testClosedConsumer() throws PulsarClientException {
public void testListener() throws PulsarClientException {
Consumer consumer = null;
ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setMessageListener((Consumer c, Message msg) -> {
conf.setMessageListener((Consumer<byte[]> c, Message<byte[]> msg) -> {
});
consumer = pulsarClient.subscribe("persistent://prop/cluster/ns/topicName", "my-subscription", conf);
Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
Expand All @@ -62,7 +62,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;

public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener {
public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListener<byte[]> {

private static final long serialVersionUID = 1L;

Expand All @@ -74,7 +74,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final String groupId;
private final boolean isAutoCommit;

private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer> consumers = new ConcurrentHashMap<>();
private final ConcurrentMap<TopicPartition, org.apache.pulsar.client.api.Consumer<byte[]>> consumers = new ConcurrentHashMap<>();

private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new ConcurrentHashMap<>();
Expand All @@ -84,10 +84,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>, MessageListene
private final Properties properties;

private static class QueueItem {
final org.apache.pulsar.client.api.Consumer consumer;
final Message message;
final org.apache.pulsar.client.api.Consumer<byte[]> consumer;
final Message<byte[]> message;

QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message message) {
QueueItem(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> message) {
this.consumer = consumer;
this.message = message;
}
Expand Down Expand Up @@ -146,19 +146,19 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializ

this.properties = new Properties();
config.originals().forEach((k, v) -> properties.put(k, v));
ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties);
ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties);
// Since this client instance is going to be used just for the consumers, we can enable Nagle to group
// all the acknowledgments sent to broker within a short time frame
clientConf.setUseTcpNoDelay(false);
clientBuilder.enableTcpNoDelay(false);
try {
client = PulsarClient.create(serviceUrl, clientConf);
client = clientBuilder.serviceUrl(serviceUrl).build();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}

@Override
public void received(org.apache.pulsar.client.api.Consumer consumer, Message msg) {
public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]> msg) {
// Block listener thread if the application is slowing down
try {
receivedMessages.put(new QueueItem(consumer, msg));
Expand Down Expand Up @@ -204,16 +204,17 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
// acknowledgeCumulative()
int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get();

ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties);
conf.setSubscriptionType(SubscriptionType.Failover);
conf.setMessageListener(this);
ConsumerBuilder<byte[]> consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties);
consumerBuilder.subscriptionType(SubscriptionType.Failover);
consumerBuilder.messageListener(this);
consumerBuilder.subscriptionName(groupId);
if (numberOfPartitions > 1) {
// Subscribe to each partition
conf.setConsumerName(ConsumerName.generateRandomName());
consumerBuilder.consumerName(ConsumerName.generateRandomName());
for (int i = 0; i < numberOfPartitions; i++) {
String partitionName = TopicName.get(topic).getPartition(i).toString();
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client
.subscribeAsync(partitionName, groupId, conf);
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.clone()
.topic(partitionName).subscribeAsync();
int partitionIndex = i;
TopicPartition tp = new TopicPartition(topic, partitionIndex);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
Expand All @@ -222,8 +223,8 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener callb
}
} else {
// Topic has a single partition
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = client.subscribeAsync(topic,
groupId, conf);
CompletableFuture<org.apache.pulsar.client.api.Consumer<byte[]>> future = consumerBuilder.topic(topic)
.subscribeAsync();
TopicPartition tp = new TopicPartition(topic, 0);
future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer));
futures.add(future);
Expand Down Expand Up @@ -293,7 +294,7 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
TopicName topicName = TopicName.get(item.consumer.getTopic());
String topic = topicName.getPartitionedTopicName();
int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;
Message msg = item.message;
Message<byte[]> msg = item.message;
MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId();
long offset = MessageIdUtils.getOffset(msgId);

Expand Down Expand Up @@ -335,7 +336,7 @@ public ConsumerRecords<K, V> poll(long timeoutMillis) {
}

@SuppressWarnings("unchecked")
private K getKey(String topic, Message msg) {
private K getKey(String topic, Message<byte[]> msg) {
if (!msg.hasKey()) {
return null;
}
Expand Down Expand Up @@ -393,7 +394,7 @@ private CompletableFuture<Void> doCommitOffsets(Map<TopicPartition, OffsetAndMet
List<CompletableFuture<Void>> futures = new ArrayList<>();

offsets.forEach((topicPartition, offsetAndMetadata) -> {
org.apache.pulsar.client.api.Consumer consumer = consumers.get(topicPartition);
org.apache.pulsar.client.api.Consumer<byte[]> consumer = consumers.get(topicPartition);

lastCommittedOffset.put(topicPartition, offsetAndMetadata);
futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset())));
Expand All @@ -415,7 +416,7 @@ private Map<TopicPartition, OffsetAndMetadata> getCurrentOffsetsMap() {
@Override
public void seek(TopicPartition partition, long offset) {
MessageId msgId = MessageIdUtils.getMessageId(offset);
org.apache.pulsar.client.api.Consumer c = consumers.get(partition);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(partition);
if (c == null) {
throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed");
}
Expand All @@ -436,7 +437,7 @@ public void seekToBeginning(Collection<TopicPartition> partitions) {
}

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
Expand All @@ -457,7 +458,7 @@ public void seekToEnd(Collection<TopicPartition> partitions) {
}

for (TopicPartition tp : partitions) {
org.apache.pulsar.client.api.Consumer c = consumers.get(tp);
org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
if (c == null) {
futures.add(FutureUtil.failedFuture(
new IllegalArgumentException("Cannot seek on a partition where we are not subscribed")));
Expand Down
Loading

0 comments on commit a64b383

Please sign in to comment.