diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 19106802cb7c9..bbdc69695013f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -78,11 +78,12 @@ import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner; import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventType; -import org.apache.pulsar.client.api.ClientConfiguration; -import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.configuration.FieldContext; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceBundleFactory; @@ -499,27 +500,29 @@ public PulsarClient getReplicationClient(String cluster) { String path = PulsarWebResource.path("clusters", cluster); ClusterData data = this.pulsar.getConfigurationCache().clustersCache().get(path) .orElseThrow(() -> new KeeperException.NoNodeException(path)); - ClientConfiguration configuration = new ClientConfiguration(); - configuration.setUseTcpNoDelay(false); - configuration.setConnectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker()); - configuration.setStatsInterval(0, TimeUnit.SECONDS); + ClientBuilder clientBuilder = PulsarClient.builder() + .enableTcpNoDelay(false) + .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker()) + .statsInterval(0, TimeUnit.SECONDS); if (pulsar.getConfiguration().isAuthenticationEnabled()) { - configuration.setAuthentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), + clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), pulsar.getConfiguration().getBrokerClientAuthenticationParameters()); } - String clusterUrl = null; if (pulsar.getConfiguration().isReplicationTlsEnabled()) { - clusterUrl = isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls() - : data.getServiceUrlTls(); - configuration.setUseTls(true); - configuration.setTlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath()); - configuration - .setTlsAllowInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection()); + clientBuilder + .serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls() + : data.getServiceUrlTls()) + .enableTls(true) + .tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath()) + .allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection()); } else { - clusterUrl = isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() - : data.getServiceUrl(); + clientBuilder.serviceUrl( + isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl()); } - return new PulsarClientImpl(clusterUrl, configuration, this.workerGroup); + + // Share all the IO threads across broker and client connections + ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); + return new PulsarClientImpl(conf, workerGroup); } catch (Exception e) { throw new RuntimeException(e); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index c0e38399ab53f..02055ac5b5489 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -104,24 +104,20 @@ protected void resetConfig() { protected final void internalSetup() throws Exception { init(); - org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration(); - clientConf.setStatsInterval(0, TimeUnit.SECONDS); lookupUrl = new URI(brokerUrl.toString()); if (isTcpLookup) { lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT); } - pulsarClient = PulsarClient.create(lookupUrl.toString(), clientConf); + pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(0, TimeUnit.SECONDS).build(); } protected final void internalSetupForStatsTest() throws Exception { init(); - org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration(); - clientConf.setStatsInterval(1, TimeUnit.SECONDS); String lookupUrl = brokerUrl.toString(); if (isTcpLookup) { lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); } - pulsarClient = PulsarClient.create(lookupUrl, clientConf); + pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).statsInterval(1, TimeUnit.SECONDS).build(); } protected final void init() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java index fafc541c1b5b5..c3096ece1441f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumeBaseExceptionTest.java @@ -64,7 +64,7 @@ public void testClosedConsumer() throws PulsarClientException { public void testListener() throws PulsarClientException { Consumer consumer = null; ConsumerConfiguration conf = new ConsumerConfiguration(); - conf.setMessageListener((Consumer c, Message msg) -> { + conf.setMessageListener((Consumer c, Message msg) -> { }); consumer = pulsarClient.subscribe("persistent://prop/cluster/ns/topicName", "my-subscription", conf); Assert.assertTrue(consumer.receiveAsync().isCompletedExceptionally()); diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java index 31c9921a1f333..886da5f5d48a4 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java @@ -45,8 +45,8 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.pulsar.client.api.ClientConfiguration; -import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.MessageListener; @@ -62,7 +62,7 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; -public class PulsarKafkaConsumer implements Consumer, MessageListener { +public class PulsarKafkaConsumer implements Consumer, MessageListener { private static final long serialVersionUID = 1L; @@ -74,7 +74,7 @@ public class PulsarKafkaConsumer implements Consumer, MessageListene private final String groupId; private final boolean isAutoCommit; - private final ConcurrentMap consumers = new ConcurrentHashMap<>(); + private final ConcurrentMap> consumers = new ConcurrentHashMap<>(); private final Map lastReceivedOffset = new ConcurrentHashMap<>(); private final Map lastCommittedOffset = new ConcurrentHashMap<>(); @@ -84,10 +84,10 @@ public class PulsarKafkaConsumer implements Consumer, MessageListene private final Properties properties; private static class QueueItem { - final org.apache.pulsar.client.api.Consumer consumer; - final Message message; + final org.apache.pulsar.client.api.Consumer consumer; + final Message message; - QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message message) { + QueueItem(org.apache.pulsar.client.api.Consumer consumer, Message message) { this.consumer = consumer; this.message = message; } @@ -146,19 +146,19 @@ private PulsarKafkaConsumer(ConsumerConfig config, Deserializer keyDeserializ this.properties = new Properties(); config.originals().forEach((k, v) -> properties.put(k, v)); - ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties); + ClientBuilder clientBuilder = PulsarClientKafkaConfig.getClientBuilder(properties); // Since this client instance is going to be used just for the consumers, we can enable Nagle to group // all the acknowledgments sent to broker within a short time frame - clientConf.setUseTcpNoDelay(false); + clientBuilder.enableTcpNoDelay(false); try { - client = PulsarClient.create(serviceUrl, clientConf); + client = clientBuilder.serviceUrl(serviceUrl).build(); } catch (PulsarClientException e) { throw new RuntimeException(e); } } @Override - public void received(org.apache.pulsar.client.api.Consumer consumer, Message msg) { + public void received(org.apache.pulsar.client.api.Consumer consumer, Message msg) { // Block listener thread if the application is slowing down try { receivedMessages.put(new QueueItem(consumer, msg)); @@ -204,16 +204,17 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb // acknowledgeCumulative() int numberOfPartitions = ((PulsarClientImpl) client).getNumberOfPartitions(topic).get(); - ConsumerConfiguration conf = PulsarConsumerKafkaConfig.getConsumerConfiguration(properties); - conf.setSubscriptionType(SubscriptionType.Failover); - conf.setMessageListener(this); + ConsumerBuilder consumerBuilder = PulsarConsumerKafkaConfig.getConsumerBuilder(client, properties); + consumerBuilder.subscriptionType(SubscriptionType.Failover); + consumerBuilder.messageListener(this); + consumerBuilder.subscriptionName(groupId); if (numberOfPartitions > 1) { // Subscribe to each partition - conf.setConsumerName(ConsumerName.generateRandomName()); + consumerBuilder.consumerName(ConsumerName.generateRandomName()); for (int i = 0; i < numberOfPartitions; i++) { String partitionName = TopicName.get(topic).getPartition(i).toString(); - CompletableFuture> future = client - .subscribeAsync(partitionName, groupId, conf); + CompletableFuture> future = consumerBuilder.clone() + .topic(partitionName).subscribeAsync(); int partitionIndex = i; TopicPartition tp = new TopicPartition(topic, partitionIndex); future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer)); @@ -222,8 +223,8 @@ public void subscribe(Collection topics, ConsumerRebalanceListener callb } } else { // Topic has a single partition - CompletableFuture> future = client.subscribeAsync(topic, - groupId, conf); + CompletableFuture> future = consumerBuilder.topic(topic) + .subscribeAsync(); TopicPartition tp = new TopicPartition(topic, 0); future.thenAccept(consumer -> consumers.putIfAbsent(tp, consumer)); futures.add(future); @@ -293,7 +294,7 @@ public ConsumerRecords poll(long timeoutMillis) { TopicName topicName = TopicName.get(item.consumer.getTopic()); String topic = topicName.getPartitionedTopicName(); int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0; - Message msg = item.message; + Message msg = item.message; MessageIdImpl msgId = (MessageIdImpl) msg.getMessageId(); long offset = MessageIdUtils.getOffset(msgId); @@ -335,7 +336,7 @@ public ConsumerRecords poll(long timeoutMillis) { } @SuppressWarnings("unchecked") - private K getKey(String topic, Message msg) { + private K getKey(String topic, Message msg) { if (!msg.hasKey()) { return null; } @@ -393,7 +394,7 @@ private CompletableFuture doCommitOffsets(Map> futures = new ArrayList<>(); offsets.forEach((topicPartition, offsetAndMetadata) -> { - org.apache.pulsar.client.api.Consumer consumer = consumers.get(topicPartition); + org.apache.pulsar.client.api.Consumer consumer = consumers.get(topicPartition); lastCommittedOffset.put(topicPartition, offsetAndMetadata); futures.add(consumer.acknowledgeCumulativeAsync(MessageIdUtils.getMessageId(offsetAndMetadata.offset()))); @@ -415,7 +416,7 @@ private Map getCurrentOffsetsMap() { @Override public void seek(TopicPartition partition, long offset) { MessageId msgId = MessageIdUtils.getMessageId(offset); - org.apache.pulsar.client.api.Consumer c = consumers.get(partition); + org.apache.pulsar.client.api.Consumer c = consumers.get(partition); if (c == null) { throw new IllegalArgumentException("Cannot seek on a partition where we are not subscribed"); } @@ -436,7 +437,7 @@ public void seekToBeginning(Collection partitions) { } for (TopicPartition tp : partitions) { - org.apache.pulsar.client.api.Consumer c = consumers.get(tp); + org.apache.pulsar.client.api.Consumer c = consumers.get(tp); if (c == null) { futures.add(FutureUtil.failedFuture( new IllegalArgumentException("Cannot seek on a partition where we are not subscribed"))); @@ -457,7 +458,7 @@ public void seekToEnd(Collection partitions) { } for (TopicPartition tp : partitions) { - org.apache.pulsar.client.api.Consumer c = consumers.get(tp); + org.apache.pulsar.client.api.Consumer c = consumers.get(tp); if (c == null) { futures.add(FutureUtil.failedFuture( new IllegalArgumentException("Cannot seek on a partition where we are not subscribed"))); diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java index 7b8bf9ab39269..ae69c8575e71e 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java @@ -38,12 +38,11 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageBuilder; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.MessageIdImpl; @@ -54,9 +53,9 @@ public class PulsarKafkaProducer implements Producer { private final PulsarClient client; - private final ProducerConfiguration pulsarProducerConf; + private final ProducerBuilder pulsarProducerBuilder; - private final ConcurrentMap producers = new ConcurrentHashMap<>(); + private final ConcurrentMap> producers = new ConcurrentHashMap<>(); private final Serializer keySerializer; private final Serializer valueSerializer; @@ -107,30 +106,29 @@ private PulsarKafkaProducer(Map conf, Properties properties, Ser } String serviceUrl = producerConfig.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0); - ClientConfiguration clientConf = PulsarClientKafkaConfig.getClientConfiguration(properties); try { - client = PulsarClient.create(serviceUrl, clientConf); + client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(serviceUrl).build(); } catch (PulsarClientException e) { throw new RuntimeException(e); } - pulsarProducerConf = PulsarProducerKafkaConfig.getProducerConfiguration(properties); + pulsarProducerBuilder = PulsarProducerKafkaConfig.getProducerBuilder(client, properties); // To mimic the same batching mode as Kafka, we need to wait a very little amount of // time to batch if the client is trying to send messages fast enough long lingerMs = Long.parseLong(properties.getProperty(ProducerConfig.LINGER_MS_CONFIG, "1")); - pulsarProducerConf.setBatchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS); + pulsarProducerBuilder.batchingMaxPublishDelay(lingerMs, TimeUnit.MILLISECONDS); String compressionType = properties.getProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG); if ("gzip".equals(compressionType)) { - pulsarProducerConf.setCompressionType(CompressionType.ZLIB); + pulsarProducerBuilder.compressionType(CompressionType.ZLIB); } else if ("lz4".equals(compressionType)) { - pulsarProducerConf.setCompressionType(CompressionType.LZ4); + pulsarProducerBuilder.compressionType(CompressionType.LZ4); } - pulsarProducerConf.setSendTimeout( - Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000")), - TimeUnit.MILLISECONDS); + + int sendTimeoutMillis = Integer.parseInt(properties.getProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "60000")); + pulsarProducerBuilder.sendTimeout(sendTimeoutMillis, TimeUnit.MILLISECONDS); boolean blockOnBufferFull = Boolean .parseBoolean(properties.getProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false")); @@ -138,8 +136,8 @@ private PulsarKafkaProducer(Map conf, Properties properties, Ser // Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client // Pulsar throws error immediately when the queue is full and blockIfQueueFull=false // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error. - boolean shouldBlockPulsarProducer = pulsarProducerConf.getSendTimeoutMs() > 0 || blockOnBufferFull; - pulsarProducerConf.setBlockIfQueueFull(shouldBlockPulsarProducer); + boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0 || blockOnBufferFull; + pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer); } @Override @@ -149,7 +147,7 @@ public Future send(ProducerRecord record) { @Override public Future send(ProducerRecord record, Callback callback) { - org.apache.pulsar.client.api.Producer producer; + org.apache.pulsar.client.api.Producer producer; try { producer = producers.computeIfAbsent(record.topic(), topic -> createNewProducer(topic)); @@ -162,7 +160,7 @@ public Future send(ProducerRecord record, Callback callbac return future; } - Message msg = getMessage(record); + Message msg = getMessage(record); int messageSize = msg.getData().length; CompletableFuture future = new CompletableFuture<>(); @@ -225,20 +223,20 @@ public void close(long timeout, TimeUnit unit) { } } - private org.apache.pulsar.client.api.Producer createNewProducer(String topic) { + private org.apache.pulsar.client.api.Producer createNewProducer(String topic) { try { - return client.createProducer(topic, pulsarProducerConf); + return pulsarProducerBuilder.clone().topic(topic).create(); } catch (PulsarClientException e) { throw new RuntimeException(e); } } - private Message getMessage(ProducerRecord record) { + private Message getMessage(ProducerRecord record) { if (record.partition() != null) { throw new UnsupportedOperationException(""); } - MessageBuilder builder = MessageBuilder.create(); + MessageBuilder builder = MessageBuilder.create(); if (record.key() != null) { builder.setKey(getKey(record.topic(), record.key())); } @@ -259,7 +257,7 @@ private String getKey(String topic, K key) { } } - private RecordMetadata getRecordMetadata(String topic, Message msg, MessageId messageId, int size) { + private RecordMetadata getRecordMetadata(String topic, Message msg, MessageId messageId, int size) { MessageIdImpl msgId = (MessageIdImpl) messageId; // Combine ledger id and entry id to form offset diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java index d9ce75e6fe1d7..ca57f5b12214f 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarClientKafkaConfig.java @@ -22,7 +22,8 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; public class PulsarClientKafkaConfig { @@ -31,6 +32,7 @@ public class PulsarClientKafkaConfig { public static final String USE_TLS = "pulsar.use.tls"; public static final String TLS_TRUST_CERTS_FILE_PATH = "pulsar.tls.trust.certs.file.path"; public static final String TLS_ALLOW_INSECURE_CONNECTION = "pulsar.tls.allow.insecure.connection"; + public static final String TLS_HOSTNAME_VERIFICATION = "pulsar.tls.hostname.verification"; public static final String OPERATION_TIMEOUT_MS = "pulsar.operation.timeout.ms"; public static final String STATS_INTERVAL_SECONDS = "pulsar.stats.interval.seconds"; @@ -43,8 +45,8 @@ public class PulsarClientKafkaConfig { public static final String CONCURRENT_LOOKUP_REQUESTS = "pulsar.concurrent.lookup.requests"; public static final String MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION = "pulsar.max.number.rejected.request.per.connection"; - public static ClientConfiguration getClientConfiguration(Properties properties) { - ClientConfiguration conf = new ClientConfiguration(); + public static ClientBuilder getClientBuilder(Properties properties) { + ClientBuilder clientBuilder = PulsarClient.builder(); if (properties.containsKey(AUTHENTICATION_CLASS)) { String className = properties.getProperty(AUTHENTICATION_CLASS); @@ -52,48 +54,53 @@ public static ClientConfiguration getClientConfiguration(Properties properties) @SuppressWarnings("unchecked") Class clazz = (Class) Class.forName(className); Authentication auth = clazz.newInstance(); - conf.setAuthentication(auth); + clientBuilder.authentication(auth); } catch (Exception e) { throw new RuntimeException(e); } } - conf.setUseTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false"))); - conf.setUseTls(Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false"))); + clientBuilder.enableTls(Boolean.parseBoolean(properties.getProperty(USE_TLS, "false"))); + clientBuilder.allowTlsInsecureConnection( + Boolean.parseBoolean(properties.getProperty(TLS_ALLOW_INSECURE_CONNECTION, "false"))); + clientBuilder.enableTlsHostnameVerification( + Boolean.parseBoolean(properties.getProperty(TLS_HOSTNAME_VERIFICATION, "false"))); + if (properties.containsKey(TLS_TRUST_CERTS_FILE_PATH)) { - conf.setTlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH)); + clientBuilder.tlsTrustCertsFilePath(properties.getProperty(TLS_TRUST_CERTS_FILE_PATH)); } if (properties.containsKey(OPERATION_TIMEOUT_MS)) { - conf.setOperationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)), + clientBuilder.operationTimeout(Integer.parseInt(properties.getProperty(OPERATION_TIMEOUT_MS)), TimeUnit.MILLISECONDS); } if (properties.containsKey(STATS_INTERVAL_SECONDS)) { - conf.setStatsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)), TimeUnit.SECONDS); + clientBuilder.statsInterval(Integer.parseInt(properties.getProperty(STATS_INTERVAL_SECONDS)), + TimeUnit.SECONDS); } if (properties.containsKey(NUM_IO_THREADS)) { - conf.setIoThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS))); + clientBuilder.ioThreads(Integer.parseInt(properties.getProperty(NUM_IO_THREADS))); } if (properties.containsKey(CONNECTIONS_PER_BROKER)) { - conf.setConnectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER))); + clientBuilder.connectionsPerBroker(Integer.parseInt(properties.getProperty(CONNECTIONS_PER_BROKER))); } if (properties.containsKey(USE_TCP_NODELAY)) { - conf.setUseTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY))); + clientBuilder.enableTcpNoDelay(Boolean.parseBoolean(properties.getProperty(USE_TCP_NODELAY))); } if (properties.containsKey(CONCURRENT_LOOKUP_REQUESTS)) { - conf.setConcurrentLookupRequest(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS))); + clientBuilder.maxConcurrentLookupRequests(Integer.parseInt(properties.getProperty(CONCURRENT_LOOKUP_REQUESTS))); } if (properties.containsKey(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION)) { - conf.setMaxNumberOfRejectedRequestPerConnection( + clientBuilder.maxNumberOfRejectedRequestPerConnection( Integer.parseInt(properties.getProperty(MAX_NUMBER_OF_REJECTED_REQUESTS_PER_CONNECTION))); } - return conf; + return clientBuilder; } } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java index f91c48485b9e3..4addfb70928fd 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarConsumerKafkaConfig.java @@ -20,7 +20,8 @@ import java.util.Properties; -import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.PulsarClient; public class PulsarConsumerKafkaConfig { @@ -29,22 +30,22 @@ public class PulsarConsumerKafkaConfig { public static final String RECEIVER_QUEUE_SIZE = "pulsar.consumer.receiver.queue.size"; public static final String TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS = "pulsar.consumer.total.receiver.queue.size.across.partitions"; - public static ConsumerConfiguration getConsumerConfiguration(Properties properties) { - ConsumerConfiguration conf = new ConsumerConfiguration(); + public static ConsumerBuilder getConsumerBuilder(PulsarClient client, Properties properties) { + ConsumerBuilder consumerBuilder = client.newConsumer(); if (properties.containsKey(CONSUMER_NAME)) { - conf.setConsumerName(properties.getProperty(CONSUMER_NAME)); + consumerBuilder.consumerName(properties.getProperty(CONSUMER_NAME)); } if (properties.containsKey(RECEIVER_QUEUE_SIZE)) { - conf.setReceiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE))); + consumerBuilder.receiverQueueSize(Integer.parseInt(properties.getProperty(RECEIVER_QUEUE_SIZE))); } if (properties.containsKey(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS)) { - conf.setMaxTotalReceiverQueueSizeAcrossPartitions( + consumerBuilder.maxTotalReceiverQueueSizeAcrossPartitions( Integer.parseInt(properties.getProperty(TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS))); } - return conf; + return consumerBuilder; } } diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java index c2e4886bc9243..5a9a651527c34 100644 --- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java +++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java @@ -20,7 +20,8 @@ import java.util.Properties; -import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; public class PulsarProducerKafkaConfig { @@ -33,32 +34,32 @@ public class PulsarProducerKafkaConfig { public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled"; public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages"; - public static ProducerConfiguration getProducerConfiguration(Properties properties) { - ProducerConfiguration conf = new ProducerConfiguration(); + public static ProducerBuilder getProducerBuilder(PulsarClient client, Properties properties) { + ProducerBuilder producerBuilder = client.newProducer(); if (properties.containsKey(PRODUCER_NAME)) { - conf.setProducerName(properties.getProperty(PRODUCER_NAME)); + producerBuilder.producerName(properties.getProperty(PRODUCER_NAME)); } if (properties.containsKey(INITIAL_SEQUENCE_ID)) { - conf.setInitialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID))); + producerBuilder.initialSequenceId(Long.parseLong(properties.getProperty(INITIAL_SEQUENCE_ID))); } if (properties.containsKey(MAX_PENDING_MESSAGES)) { - conf.setMaxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES))); + producerBuilder.maxPendingMessages(Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES))); } if (properties.containsKey(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS)) { - conf.setMaxPendingMessagesAcrossPartitions( + producerBuilder.maxPendingMessagesAcrossPartitions( Integer.parseInt(properties.getProperty(MAX_PENDING_MESSAGES_ACROSS_PARTITIONS))); } - conf.setBatchingEnabled(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true"))); + producerBuilder.enableBatching(Boolean.parseBoolean(properties.getProperty(BATCHING_ENABLED, "true"))); if (properties.containsKey(BATCHING_MAX_MESSAGES)) { - conf.setBatchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES))); + producerBuilder.batchingMaxMessages(Integer.parseInt(properties.getProperty(BATCHING_MAX_MESSAGES))); } - return conf; + return producerBuilder; } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index d9dcb1c9cac2f..e4457dfcc16c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -100,7 +100,7 @@ public interface ConsumerBuilder extends Serializable, Cloneable { * * @param topicsPattern */ - ConsumerBuilder topicsPattern(Pattern topicsPattern); + ConsumerBuilder topicsPattern(Pattern topicsPattern); /** * Specify a pattern for topics that this consumer will subscribe on. @@ -111,7 +111,7 @@ public interface ConsumerBuilder extends Serializable, Cloneable { * @param topicsPattern * given regular expression for topics pattern */ - ConsumerBuilder topicsPattern(String topicsPattern); + ConsumerBuilder topicsPattern(String topicsPattern); /** * Specify the subscription name for this consumer. @@ -238,7 +238,7 @@ public interface ConsumerBuilder extends Serializable, Cloneable { * @param periodInMinutes * whether to read from the compacted topic */ - ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes); + ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes); /** * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java index 7761b9db9e885..c526af5b9fe5e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java @@ -95,7 +95,7 @@ public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionTy /** * @return the configured {@link MessageListener} for the consumer */ - public MessageListener getMessageListener() { + public MessageListener getMessageListener() { return conf.getMessageListener(); } @@ -108,7 +108,7 @@ public MessageListener getMessageListener() { * @param messageListener * the listener object */ - public ConsumerConfiguration setMessageListener(MessageListener messageListener) { + public ConsumerConfiguration setMessageListener(MessageListener messageListener) { checkNotNull(messageListener); conf.setMessageListener(messageListener); return this; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java index aeb8bbbb5dfff..e2e627414f137 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerEventListener.java @@ -26,11 +26,11 @@ public interface ConsumerEventListener { /** * Notified when the consumer group is changed, and the consumer becomes the active consumer. */ - void becameActive(Consumer consumer, int partitionId); + void becameActive(Consumer consumer, int partitionId); /** * Notified when the consumer group is changed, and the consumer is still inactive or becomes inactive. */ - void becameInactive(Consumer consumer, int partitionId); + void becameInactive(Consumer consumer, int partitionId); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java index a9cef6f733d50..bc2b91536e267 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRouter.java @@ -30,7 +30,7 @@ public interface MessageRouter extends Serializable { * @deprecated since 1.22.0. Please use {@link #choosePartition(Message, TopicMetadata)} instead. */ @Deprecated - default int choosePartition(Message msg) { + default int choosePartition(Message msg) { throw new UnsupportedOperationException("Use #choosePartition(Message, TopicMetadata) instead"); } @@ -42,7 +42,7 @@ default int choosePartition(Message msg) { * @return the partition to route the message. * @since 1.22.0 */ - default int choosePartition(Message msg, TopicMetadata metadata) { + default int choosePartition(Message msg, TopicMetadata metadata) { return choosePartition(msg); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java index a037cfe45c4ee..16dcd009cb177 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java @@ -262,7 +262,7 @@ public interface ProducerBuilder extends Serializable, Cloneable { * maximum number of messages in a batch * @return */ - ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch); + ProducerBuilder batchingMaxMessages(int batchMessagesMaxMessagesPerBatch); /** * Set the baseline for the sequence ids for messages published by the producer. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index c81b2eb35cdd2..f40a8a455272a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -99,7 +99,7 @@ public interface ReaderBuilder extends Serializable, Cloneable { * @param readerListener * the listener object */ - ReaderBuilder readerListener(ReaderListener readerListener); + ReaderBuilder readerListener(ReaderListener readerListener); /** * Sets a {@link CryptoKeyReader} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java index d329d09d4c9f8..a97e5249cc173 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainer.java @@ -53,7 +53,7 @@ class BatchMessageContainer { // sequence id for this batch which will be persisted as a single entry by broker long sequenceId = -1; ByteBuf batchedMessageMetadataAndPayload; - List messages = Lists.newArrayList(); + List> messages = Lists.newArrayList(); // keep track of callbacks for individual messages being published in a batch SendCallback firstCallback; @@ -73,13 +73,13 @@ class BatchMessageContainer { this.producerName = producerName; } - boolean hasSpaceInBatch(MessageImpl msg) { + boolean hasSpaceInBatch(MessageImpl msg) { int messageSize = msg.getDataBuffer().readableBytes(); return ((messageSize + currentBatchSizeBytes) <= MAX_MESSAGE_BATCH_SIZE_BYTES && numMessagesInBatch < maxNumMessagesInBatch); } - void add(MessageImpl msg, SendCallback callback) { + void add(MessageImpl msg, SendCallback callback) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 4cffb7fa8a247..3effc7f3bfb0b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -154,4 +154,8 @@ public ClientBuilder maxNumberOfRejectedRequestPerConnection(int maxNumberOfReje conf.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection); return this; } + + public ClientConfigurationData getClientConfigurationData() { + return conf; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 6c2d758c78c5b..37904fdfcdfca 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -86,8 +86,8 @@ public class ClientCnx extends PulsarHandler { private final ConcurrentLongHashMap>> pendingGetTopicsRequests = new ConcurrentLongHashMap<>( 16, 1); - private final ConcurrentLongHashMap producers = new ConcurrentLongHashMap<>(16, 1); - private final ConcurrentLongHashMap consumers = new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap> producers = new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap> consumers = new ConcurrentLongHashMap<>(16, 1); private final CompletableFuture connectionFuture = new CompletableFuture(); private final Semaphore pendingLookupRequestSemaphore; @@ -253,7 +253,7 @@ protected void handleMessage(CommandMessage cmdMessage, ByteBuf headersAndPayloa if (log.isDebugEnabled()) { log.debug("{} Received a message from the server: {}", ctx.channel(), cmdMessage); } - ConsumerImpl consumer = consumers.get(cmdMessage.getConsumerId()); + ConsumerImpl consumer = consumers.get(cmdMessage.getConsumerId()); if (consumer != null) { consumer.messageReceived(cmdMessage.getMessageId(), headersAndPayload, this); } @@ -266,7 +266,7 @@ protected void handleActiveConsumerChange(CommandActiveConsumerChange change) { if (log.isDebugEnabled()) { log.debug("{} Received a consumer group change message from the server : {}", ctx.channel(), change); } - ConsumerImpl consumer = consumers.get(change.getConsumerId()); + ConsumerImpl consumer = consumers.get(change.getConsumerId()); if (consumer != null) { consumer.activeConsumerChanged(change.getIsActive()); } @@ -398,7 +398,7 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn log.info("[{}] Broker notification reached the end of topic: {}", remoteAddress, consumerId); - ConsumerImpl consumer = consumers.get(consumerId); + ConsumerImpl consumer = consumers.get(consumerId); if (consumer != null) { consumer.setTerminated(); } @@ -472,7 +472,7 @@ protected void handleError(CommandError error) { protected void handleCloseProducer(CommandCloseProducer closeProducer) { log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId()); final long producerId = closeProducer.getProducerId(); - ProducerImpl producer = producers.get(producerId); + ProducerImpl producer = producers.get(producerId); if (producer != null) { producer.connectionClosed(this); } else { @@ -484,7 +484,7 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) { protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) { log.info("[{}] Broker notification of Closed consumer: {}", remoteAddress, closeConsumer.getConsumerId()); final long consumerId = closeConsumer.getConsumerId(); - ConsumerImpl consumer = consumers.get(consumerId); + ConsumerImpl consumer = consumers.get(consumerId); if (consumer != null) { consumer.connectionClosed(this); } else { @@ -666,11 +666,11 @@ private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) { return false; } - void registerConsumer(final long consumerId, final ConsumerImpl consumer) { + void registerConsumer(final long consumerId, final ConsumerImpl consumer) { consumers.put(consumerId, consumer); } - void registerProducer(final long producerId, final ProducerImpl producer) { + void registerProducer(final long producerId, final ProducerImpl producer) { producers.put(producerId, producer); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index af6b732b7749b..f51b1b3dba0c6 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -50,7 +50,7 @@ enum ConsumerType { } protected final String subscription; - protected final ConsumerConfigurationData conf; + protected final ConsumerConfigurationData conf; protected final String consumerName; protected final CompletableFuture> subscribeFuture; protected final MessageListener listener; @@ -168,7 +168,7 @@ public Message receive(int timeout, TimeUnit unit) throws PulsarClientExcepti abstract protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClientException; @Override - public void acknowledge(Message message) throws PulsarClientException { + public void acknowledge(Message message) throws PulsarClientException { try { acknowledge(message.getMessageId()); } catch (NullPointerException npe) { @@ -194,7 +194,7 @@ public void acknowledge(MessageId messageId) throws PulsarClientException { } @Override - public void acknowledgeCumulative(Message message) throws PulsarClientException { + public void acknowledgeCumulative(Message message) throws PulsarClientException { try { acknowledgeCumulative(message.getMessageId()); } catch (NullPointerException npe) { @@ -220,7 +220,7 @@ public void acknowledgeCumulative(MessageId messageId) throws PulsarClientExcept } @Override - public CompletableFuture acknowledgeAsync(Message message) { + public CompletableFuture acknowledgeAsync(Message message) { try { return acknowledgeAsync(message.getMessageId()); } catch (NullPointerException npe) { @@ -229,7 +229,7 @@ public CompletableFuture acknowledgeAsync(Message message) { } @Override - public CompletableFuture acknowledgeCumulativeAsync(Message message) { + public CompletableFuture acknowledgeCumulativeAsync(Message message) { try { return acknowledgeCumulativeAsync(message.getMessageId()); } catch (NullPointerException npe) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index e7163c9fbcc84..d8507bbe0dfb1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -206,9 +206,8 @@ public ConsumerBuilder readCompacted(boolean readCompacted) { } @Override - public ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes) { + public ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes) { conf.setPatternAutoDiscoveryPeriod(periodInMinutes); return this; } - } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 8499565aef2b9..183651a1e43de 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -82,6 +82,7 @@ public class ConsumerImpl extends ConsumerBase { // Number of messages that have delivered to the application. Every once in a while, this number will be sent to the // broker to notify that we are ready to get (and store in the incoming messages queue) more messages + @SuppressWarnings("rawtypes") private static final AtomicIntegerFieldUpdater AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater .newUpdater(ConsumerImpl.class, "availablePermits"); @SuppressWarnings("unused") @@ -288,7 +289,7 @@ private Message fetchSingleMessageFromBroker() throws PulsarClientException { do { message = incomingMessages.take(); lastDequeuedMessage = message.getMessageId(); - ClientCnx msgCnx = ((MessageImpl) message).getCnx(); + ClientCnx msgCnx = ((MessageImpl) message).getCnx(); // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()" synchronized (ConsumerImpl.this) { // if message received due to an old flow - discard it and wait for the message from the @@ -631,7 +632,7 @@ void connectionOpened(final ClientCnx cnx) { * not seen by the application */ private BatchMessageIdImpl clearReceiverQueue() { - List currentMessageQueue = new ArrayList<>(incomingMessages.size()); + List> currentMessageQueue = new ArrayList<>(incomingMessages.size()); incomingMessages.drainTo(currentMessageQueue); if (!currentMessageQueue.isEmpty()) { MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId(); @@ -984,9 +985,9 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc * * Periodically, it sends a Flow command to notify the broker that it can push more messages */ - protected synchronized void messageProcessed(Message msg) { + protected synchronized void messageProcessed(Message msg) { ClientCnx currentCnx = cnx(); - ClientCnx msgCnx = ((MessageImpl) msg).getCnx(); + ClientCnx msgCnx = ((MessageImpl) msg).getCnx(); lastDequeuedMessage = msg.getMessageId(); if (msgCnx != currentCnx) { @@ -1371,7 +1372,7 @@ private void internalGetLastMessageIdAsync(final Backoff backoff, } } - private MessageIdImpl getMessageIdImpl(Message msg) { + private MessageIdImpl getMessageIdImpl(Message msg) { MessageIdImpl messageId = (MessageIdImpl) msg.getMessageId(); if (messageId instanceof BatchMessageIdImpl) { // messageIds contain MessageIdImpl, not BatchMessageIdImpl diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java index 3ef26c831d1dd..21ff3c4364d9a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java @@ -41,7 +41,7 @@ public class ConsumerStats implements Serializable { private static final long serialVersionUID = 1L; private TimerTask stat; private Timeout statTimeout; - private ConsumerImpl consumer; + private ConsumerImpl consumer; private PulsarClientImpl pulsarClient; private long oldTime; private long statsIntervalSeconds; @@ -74,7 +74,7 @@ public ConsumerStats() { throughputFormat = null; } - public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData conf, ConsumerImpl consumer) { + public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData conf, ConsumerImpl consumer) { this.pulsarClient = pulsarClient; this.consumer = consumer; this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds(); @@ -92,7 +92,7 @@ public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData co init(conf); } - private void init(ConsumerConfigurationData conf) { + private void init(ConsumerConfigurationData conf) { ObjectMapper m = new ObjectMapper(); m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); ObjectWriter w = m.writerWithDefaultPrettyPrinter(); @@ -149,7 +149,7 @@ private void init(ConsumerConfigurationData conf) { statTimeout = pulsarClient.timer().newTimeout(stat, statsIntervalSeconds, TimeUnit.SECONDS); } - void updateNumMsgsReceived(Message message) { + void updateNumMsgsReceived(Message message) { if (message != null) { numMsgsReceived.increment(); numBytesReceived.add(message.getData().length); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java index fdcaf63783cc7..08b189cc1dae7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsDisabled.java @@ -24,7 +24,7 @@ public class ConsumerStatsDisabled extends ConsumerStats { private static final long serialVersionUID = 1L; @Override - void updateNumMsgsReceived(Message message) { + void updateNumMsgsReceived(Message message) { // Do nothing } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java index f4585ce23d24f..8fcc40ad287e2 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java @@ -56,7 +56,8 @@ public class MessageImpl implements Message { // Constructor for out-going message static MessageImpl create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload, Schema schema) { - MessageImpl msg = RECYCLER.get(); + @SuppressWarnings("unchecked") + MessageImpl msg = (MessageImpl) RECYCLER.get(); msg.msgMetadataBuilder = msgMetadataBuilder; msg.messageId = null; msg.cnx = null; @@ -67,7 +68,8 @@ static MessageImpl create(MessageMetadata.Builder msgMetadataBuilder, Byt } static MessageImpl create(MessageMetadata.Builder msgMetadataBuilder, ByteBuffer payload) { - MessageImpl msg = RECYCLER.get(); + @SuppressWarnings("unchecked") + MessageImpl msg = (MessageImpl) RECYCLER.get(); msg.msgMetadataBuilder = msgMetadataBuilder; msg.messageId = null; msg.cnx = null; @@ -141,8 +143,9 @@ public MessageImpl(String msgId, Map properties, ByteBuf payload this.properties = Collections.unmodifiableMap(properties); } - public static MessageImpl deserialize(ByteBuf headersAndPayload) throws IOException { - MessageImpl msg = RECYCLER.get(); + public static MessageImpl deserialize(ByteBuf headersAndPayload) throws IOException { + @SuppressWarnings("unchecked") + MessageImpl msg = (MessageImpl) RECYCLER.get(); MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload); msg.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata); @@ -287,16 +290,16 @@ public void recycle() { } } - private MessageImpl(Handle recyclerHandle) { + private MessageImpl(Handle> recyclerHandle) { this.recyclerHandle = recyclerHandle; } - private Handle recyclerHandle; + private Handle> recyclerHandle; - private final static Recycler RECYCLER = new Recycler() { + private final static Recycler> RECYCLER = new Recycler>() { @Override - protected MessageImpl newObject(Handle handle) { - return new MessageImpl(handle); + protected MessageImpl newObject(Handle> handle) { + return new MessageImpl<>(handle); } }; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java index f662ccf1d46a9..df6894c474d2e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkArgument; -import com.google.common.collect.Lists; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +33,7 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; + import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; @@ -47,13 +47,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + public class PartitionedConsumerImpl extends ConsumerBase { private final List> consumers; // Queue of partition consumers on which we have stopped calling receiveAsync() because the // shared incoming queue was full - private final ConcurrentLinkedQueue pausedConsumers; + private final ConcurrentLinkedQueue> pausedConsumers; // Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to // resume receiving from the paused consumer partitions @@ -165,7 +167,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { try { if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) { while (true) { - ConsumerImpl consumer = pausedConsumers.poll(); + ConsumerImpl consumer = pausedConsumers.poll(); if (consumer == null) { break; } @@ -360,12 +362,7 @@ private void failPendingReceive() { @Override public boolean isConnected() { - for (ConsumerImpl consumer : consumers) { - if (!consumer.isConnected()) { - return false; - } - } - return true; + return consumers.stream().allMatch(ConsumerImpl::isConnected); } @Override @@ -442,6 +439,7 @@ private ConsumerConfigurationData getInternalConsumerConfig() { if (null != conf.getConsumerEventListener()) { internalConsumerConfig.setConsumerEventListener(conf.getConsumerEventListener()); } + int receiverQueueSize = Math.min(conf.getReceiverQueueSize(), conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); internalConsumerConfig.setReceiverQueueSize(receiverQueueSize); @@ -460,7 +458,7 @@ private ConsumerConfigurationData getInternalConsumerConfig() { @Override public void redeliverUnacknowledgedMessages() { synchronized (this) { - for (ConsumerImpl c : consumers) { + for (ConsumerImpl c : consumers) { c.redeliverUnacknowledgedMessages(); } incomingMessages.clear(); @@ -509,11 +507,7 @@ public CompletableFuture seekAsync(MessageId messageId) { * @return true if all batch messages have been acknowledged */ public boolean isBatchingAckTrackerEmpty() { - boolean state = true; - for (Consumer consumer : consumers) { - state &= ((ConsumerImpl) consumer).isBatchingAckTrackerEmpty(); - } - return state; + return consumers.stream().allMatch(ConsumerImpl::isBatchingAckTrackerEmpty); } List> getConsumers() { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index e70ce4b2b400f..5281d5104abb9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -161,14 +161,8 @@ public CompletableFuture sendAsync(Message message) { @Override public boolean isConnected() { - for (ProducerImpl producer : producers) { - // returns false if any of the partition is not connected - if (!producer.isConnected()) { - return false; - } - } - - return true; + // returns false if any of the partition is not connected + return producers.stream().allMatch(ProducerImpl::isConnected); } @Override diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index b55f2791467cf..15779cd927767 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -104,6 +104,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask { private final Map metadata; + @SuppressWarnings("rawtypes") private static final AtomicLongFieldUpdater msgIdGeneratorUpdater = AtomicLongFieldUpdater .newUpdater(ProducerImpl.class, "msgIdGenerator"); @@ -184,7 +185,7 @@ public long getLastSequenceId() { } @Override - public CompletableFuture sendAsync(Message message) { + public CompletableFuture sendAsync(Message message) { CompletableFuture future = new CompletableFuture<>(); sendAsync(message, new SendCallback() { @@ -232,7 +233,7 @@ public void addCallback(SendCallback scb) { return future; } - public void sendAsync(Message message, SendCallback callback) { + public void sendAsync(Message message, SendCallback callback) { checkArgument(message instanceof MessageImpl); if (!isValidProducerState(callback)) { @@ -243,7 +244,7 @@ public void sendAsync(Message message, SendCallback callback) { return; } - MessageImpl msg = (MessageImpl) message; + MessageImpl msg = (MessageImpl) message; MessageMetadata.Builder msgMetadata = msg.getMessageBuilder(); ByteBuf payload = msg.getDataBuffer(); @@ -387,7 +388,7 @@ private ByteBufPair sendMessage(long producerId, long sequenceId, int numMessage return Commands.newSend(producerId, sequenceId, numMessages, checksumType, msgMetadata, compressedPayload); } - private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf payload) { + private void doBatchSendAndAdd(MessageImpl msg, SendCallback callback, ByteBuf payload) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] Closing out batch to accomodate large message with size {}", topic, producerName, msg.getDataBuffer().readableBytes()); @@ -440,12 +441,12 @@ private boolean canEnqueueRequest(SendCallback callback) { } private static final class WriteInEventLoopCallback implements Runnable { - private ProducerImpl producer; + private ProducerImpl producer; private ByteBufPair cmd; private long sequenceId; private ClientCnx cnx; - static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) { + static WriteInEventLoopCallback create(ProducerImpl producer, ClientCnx cnx, OpSendMsg op) { WriteInEventLoopCallback c = RECYCLER.get(); c.producer = producer; c.cnx = cnx; @@ -735,8 +736,8 @@ protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) { } protected static final class OpSendMsg { - MessageImpl msg; - List msgs; + MessageImpl msg; + List> msgs; ByteBufPair cmd; SendCallback callback; long sequenceId; @@ -744,7 +745,7 @@ protected static final class OpSendMsg { long batchSizeByte = 0; int numMessagesInBatch = 1; - static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long sequenceId, SendCallback callback) { + static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long sequenceId, SendCallback callback) { OpSendMsg op = RECYCLER.get(); op.msg = msg; op.cmd = cmd; @@ -754,7 +755,7 @@ static OpSendMsg create(MessageImpl msg, ByteBufPair cmd, long sequenceId, SendC return op; } - static OpSendMsg create(List msgs, ByteBufPair cmd, long sequenceId, SendCallback callback) { + static OpSendMsg create(List> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback) { OpSendMsg op = RECYCLER.get(); op.msgs = msgs; op.cmd = cmd; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java index 6a5e321fffdf9..262800336d3d4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java @@ -41,7 +41,7 @@ public class ProducerStats implements Serializable { private static final long serialVersionUID = 1L; private TimerTask stat; private Timeout statTimeout; - private ProducerImpl producer; + private ProducerImpl producer; private PulsarClientImpl pulsarClient; private long oldTime; private long statsIntervalSeconds; @@ -74,7 +74,7 @@ public ProducerStats() { ds = null; } - public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl producer) { + public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl producer) { this.pulsarClient = pulsarClient; this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds(); this.producer = producer; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 1741eeba9eeb7..47e953b5fb97f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.client.impl; -import static com.google.common.base.Preconditions.checkState; import static org.apache.commons.lang3.StringUtils.isBlank; import com.google.common.collect.Lists; @@ -256,7 +255,7 @@ public CompletableFuture> createProducerAsync(ProducerConfigurat log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); } - ProducerBase producer; + ProducerBase producer; if (metadata.partitions > 1) { producer = new PartitionedProducerImpl<>(PulsarClientImpl.this, topic, conf, metadata.partitions, producerCreatedFuture, schema); @@ -437,7 +436,7 @@ private CompletableFuture> patternTopicSubscribeAsync(ConsumerCo List topicsList = topicsPatternFilter(topics, conf.getTopicsPattern()); conf.getTopicNames().addAll(topicsList); - ConsumerBase consumer = new PatternTopicsConsumerImpl<>(conf.getTopicsPattern(), + ConsumerBase consumer = new PatternTopicsConsumerImpl<>(conf.getTopicsPattern(), PulsarClientImpl.this, conf, externalExecutorProvider.getExecutor(), @@ -587,12 +586,12 @@ public CompletableFuture closeAsync() { synchronized (producers) { // Copy to a new list, because the closing will trigger a removal from the map // and invalidate the iterator - List producersToClose = Lists.newArrayList(producers.keySet()); + List> producersToClose = Lists.newArrayList(producers.keySet()); producersToClose.forEach(p -> futures.add(p.closeAsync())); } synchronized (consumers) { - List consumersToClose = Lists.newArrayList(consumers.keySet()); + List> consumersToClose = Lists.newArrayList(consumers.keySet()); consumersToClose.forEach(c -> futures.add(c.closeAsync())); } @@ -688,13 +687,13 @@ private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) { return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory); } - void cleanupProducer(ProducerBase producer) { + void cleanupProducer(ProducerBase producer) { synchronized (producers) { producers.remove(producer); } } - void cleanupConsumer(ConsumerBase consumer) { + void cleanupConsumer(ConsumerBase consumer) { synchronized (consumers) { consumers.remove(consumer); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index bef909d983246..4ab5f423b2b24 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -43,16 +43,17 @@ public class ReaderBuilderImpl implements ReaderBuilder { private final Schema schema; ReaderBuilderImpl(PulsarClientImpl client, Schema schema) { - this(client, new ReaderConfigurationData(), schema); + this(client, new ReaderConfigurationData(), schema); } - private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData conf, Schema schema) { + private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData conf, Schema schema) { this.client = client; this.conf = conf; this.schema = schema; } @Override + @SuppressWarnings("unchecked") public ReaderBuilder clone() { try { return (ReaderBuilder) super.clone(); @@ -106,7 +107,7 @@ public ReaderBuilder startMessageId(MessageId startMessageId) { } @Override - public ReaderBuilder readerListener(ReaderListener readerListener) { + public ReaderBuilder readerListener(ReaderListener readerListener) { conf.setReaderListener(readerListener); return this; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index daf4799fe2eee..ed374f66320de 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -90,7 +90,7 @@ public String getTopic() { return consumer.getTopic(); } - public ConsumerImpl getConsumer() { + public ConsumerImpl getConsumer() { return consumer; } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java index 6b3f937942b50..ce67f5b466c1f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java @@ -39,7 +39,7 @@ public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme) { } @Override - public int choosePartition(Message msg, TopicMetadata topicMetadata) { + public int choosePartition(Message msg, TopicMetadata topicMetadata) { // If the message has a key, it supersedes the round robin routing policy if (msg.hasKey()) { return hash.makeHash(msg.getKey()) % topicMetadata.numPartitions(); diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java index d95d83cb7164a..433744273ed44 100644 --- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java +++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java @@ -31,9 +31,10 @@ import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.impl.ConnectionPool; import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService; @@ -95,19 +96,20 @@ public ProxyService(ProxyConfiguration proxyConfig) throws IOException { this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory); this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory); - ClientConfiguration clientConfiguration = new ClientConfiguration(); + ClientConfigurationData clientConf = new ClientConfigurationData(); + clientConf.setServiceUrl(serviceUrl); if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) { - clientConfiguration.setAuthentication(proxyConfig.getBrokerClientAuthenticationPlugin(), - proxyConfig.getBrokerClientAuthenticationParameters()); + clientConf.setAuthentication(AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(), + proxyConfig.getBrokerClientAuthenticationParameters())); } if (proxyConfig.isTlsEnabledWithBroker()) { - clientConfiguration.setUseTls(true); - clientConfiguration.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath()); - clientConfiguration.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection()); + clientConf.setUseTls(true); + clientConf.setTlsTrustCertsFilePath(proxyConfig.getBrokerClientTrustCertsFilePath()); + clientConf.setTlsAllowInsecureConnection(proxyConfig.isTlsAllowInsecureConnection()); } - this.client = new PulsarClientImpl(serviceUrl, clientConfiguration, workerGroup); - this.clientAuthentication = clientConfiguration.getAuthentication(); + this.client = new PulsarClientImpl(clientConf, workerGroup); + this.clientAuthentication = clientConf.getAuthentication(); } public void start() throws Exception { diff --git a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java index d4b8e4d86be71..f98c48d829283 100644 --- a/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java +++ b/pulsar-proxy/src/test/java/org/apache/pulsar/proxy/server/ProxyForwardAuthDataTest.java @@ -25,11 +25,9 @@ import org.apache.bookkeeper.test.PortManager; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.AuthAction; import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.proxy.server.ProxyRolesEnforcementTest.BasicAuthentication; @@ -142,8 +140,7 @@ private void createAdminClient() throws PulsarClientException { } private PulsarClient createPulsarClient(String proxyServiceUrl, String authParams) throws PulsarClientException { - org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration(); - clientConf.setAuthentication(BasicAuthentication.class.getName(), authParams); - return PulsarClient.create(proxyServiceUrl, clientConf); + return PulsarClient.builder().serviceUrl(proxyServiceUrl) + .authentication(BasicAuthentication.class.getName(), authParams).build(); } } diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java index 3c43611c656e0..4291fa8fd97ea 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java @@ -33,7 +33,7 @@ public interface MessageToValuesMapper extends Serializable { * @param msg * @return */ - public Values toValues(Message msg); + public Values toValues(Message msg); /** * Declare the output schema for the spout. diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java index a293fd69424b5..0aa1ee35fc704 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java @@ -18,29 +18,34 @@ */ package org.apache.pulsar.storm; +import static java.lang.String.format; + import java.util.Map; import java.util.concurrent.ConcurrentMap; -import org.apache.storm.utils.TupleUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.PulsarClientException; - +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.apache.storm.metric.api.IMetric; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; -import static java.lang.String.format; +import org.apache.storm.utils.TupleUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +@SuppressWarnings("deprecation") public class PulsarBolt extends BaseRichBolt implements IMetric { /** * @@ -52,8 +57,8 @@ public class PulsarBolt extends BaseRichBolt implements IMetric { public static final String PRODUCER_RATE = "producerRate"; public static final String PRODUCER_THROUGHPUT_BYTES = "producerThroughput"; - private final ClientConfiguration clientConf; - private final ProducerConfiguration producerConf; + private final ClientConfigurationData clientConf; + private final ProducerConfigurationData producerConf; private final PulsarBoltConfiguration pulsarBoltConf; private final ConcurrentMap metricsMap = Maps.newConcurrentMap(); @@ -65,17 +70,39 @@ public class PulsarBolt extends BaseRichBolt implements IMetric { private volatile long messagesSent = 0; private volatile long messageSizeSent = 0; + public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientBuilder clientBuilder) { + this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(); + this.producerConf = new ProducerConfigurationData(); + Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl()); + Preconditions.checkNotNull(pulsarBoltConf.getTopic()); + Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper()); + + this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl()); + this.producerConf.setTopicName(pulsarBoltConf.getTopic()); + this.pulsarBoltConf = pulsarBoltConf; + } + + /** + * @deprecated Use {@link #PulsarBolt(PulsarBoltConfiguration, ClientBuilder)} + */ + @Deprecated public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfiguration clientConf) { this(pulsarBoltConf, clientConf, new ProducerConfiguration()); } + /** + * @deprecated Use {@link #PulsarBolt(PulsarBoltConfiguration, ClientBuilder)} + */ + @Deprecated public PulsarBolt(PulsarBoltConfiguration pulsarBoltConf, ClientConfiguration clientConf, ProducerConfiguration producerConf) { - this.clientConf = clientConf; - this.producerConf = producerConf; + this.clientConf = clientConf.getConfigurationData().clone(); + this.producerConf = producerConf.getProducerConfigurationData().clone(); Preconditions.checkNotNull(pulsarBoltConf.getServiceUrl()); Preconditions.checkNotNull(pulsarBoltConf.getTopic()); Preconditions.checkNotNull(pulsarBoltConf.getTupleToMessageMapper()); + this.clientConf.setServiceUrl(pulsarBoltConf.getServiceUrl()); + this.producerConf.setTopicName(pulsarBoltConf.getTopic()); this.pulsarBoltConf = pulsarBoltConf; } @@ -86,8 +113,8 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector this.boltId = String.format("%s-%s", componentId, context.getThisTaskId()); this.collector = collector; try { - sharedPulsarClient = SharedPulsarClient.get(componentId, pulsarBoltConf.getServiceUrl(), clientConf); - producer = sharedPulsarClient.getSharedProducer(pulsarBoltConf.getTopic(), producerConf); + sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf); + producer = sharedPulsarClient.getSharedProducer(producerConf); LOG.info("[{}] Created a pulsar producer on topic {} to send messages", boltId, pulsarBoltConf.getTopic()); } catch (PulsarClientException e) { LOG.error("[{}] Error initializing pulsar producer on topic {}", boltId, pulsarBoltConf.getTopic(), e); diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index 639adb915f0ba..af26035ed8530 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -22,15 +22,11 @@ import java.util.Map; import java.util.Queue; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerConfiguration; @@ -38,15 +34,25 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.Backoff; - +import org.apache.pulsar.client.impl.ClientBuilderImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.storm.metric.api.IMetric; +import org.apache.storm.shade.com.google.common.collect.Sets; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; + +@SuppressWarnings("deprecation") public class PulsarSpout extends BaseRichSpout implements IMetric { private static final long serialVersionUID = 1L; @@ -59,37 +65,61 @@ public class PulsarSpout extends BaseRichSpout implements IMetric { public static final String CONSUMER_RATE = "consumerRate"; public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput"; - private final ClientConfiguration clientConf; - private final ConsumerConfiguration consumerConf; + private final ClientConfigurationData clientConf; + private final ConsumerConfigurationData consumerConf; private final PulsarSpoutConfiguration pulsarSpoutConf; private final long failedRetriesTimeoutNano; private final int maxFailedRetries; private final ConcurrentMap pendingMessageRetries = Maps.newConcurrentMap(); - private final Queue failedMessages = Queues.newConcurrentLinkedQueue(); + private final Queue> failedMessages = Queues.newConcurrentLinkedQueue(); private final ConcurrentMap metricsMap = Maps.newConcurrentMap(); private SharedPulsarClient sharedPulsarClient; private String componentId; private String spoutId; private SpoutOutputCollector collector; - private Consumer consumer; + private Consumer consumer; private volatile long messagesReceived = 0; private volatile long messagesEmitted = 0; private volatile long pendingAcks = 0; private volatile long messageSizeReceived = 0; + public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clientBuilder) { + Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl()); + Preconditions.checkNotNull(pulsarSpoutConf.getTopic()); + Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName()); + Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper()); + + this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(); + this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl()); + this.consumerConf = new ConsumerConfigurationData<>(); + this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic())); + this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName()); + + this.pulsarSpoutConf = pulsarSpoutConf; + this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS); + this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries(); + } + + @Deprecated public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfiguration clientConf) { this(pulsarSpoutConf, clientConf, new ConsumerConfiguration()); } + @Deprecated public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientConfiguration clientConf, ConsumerConfiguration consumerConf) { - this.clientConf = clientConf; - this.consumerConf = consumerConf; + this.clientConf = clientConf.getConfigurationData().clone(); + this.consumerConf = consumerConf.getConfigurationData().clone(); Preconditions.checkNotNull(pulsarSpoutConf.getServiceUrl()); Preconditions.checkNotNull(pulsarSpoutConf.getTopic()); Preconditions.checkNotNull(pulsarSpoutConf.getSubscriptionName()); Preconditions.checkNotNull(pulsarSpoutConf.getMessageToValuesMapper()); + + this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl()); + this.consumerConf.setTopicNames(Sets.newHashSet(pulsarSpoutConf.getTopic())); + this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName()); + this.pulsarSpoutConf = pulsarSpoutConf; this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS); this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries(); @@ -115,7 +145,7 @@ public void close() { @Override public void ack(Object msgId) { if (msgId instanceof Message) { - Message msg = (Message) msgId; + Message msg = (Message) msgId; if (LOG.isDebugEnabled()) { LOG.debug("[{}] Received ack for message {}", spoutId, msg.getMessageId()); } @@ -128,7 +158,8 @@ public void ack(Object msgId) { @Override public void fail(Object msgId) { if (msgId instanceof Message) { - Message msg = (Message) msgId; + @SuppressWarnings("unchecked") + Message msg = (Message) msgId; MessageId id = msg.getMessageId(); LOG.warn("[{}] Error processing message {}", spoutId, id); @@ -160,7 +191,7 @@ public void fail(Object msgId) { public void nextTuple() { emitNextAvailableTuple(); } - + /** * It makes sure that it emits next available non-tuple to topology unless consumer queue doesn't have any message * available. It receives message from consumer queue and converts it to tuple and emits to topology. if the @@ -168,7 +199,7 @@ public void nextTuple() { * emit. */ public void emitNextAvailableTuple() { - Message msg; + Message msg; // check if there are any failed messages to re-emit in the topology msg = failedMessages.peek(); @@ -219,13 +250,15 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect pendingMessageRetries.clear(); failedMessages.clear(); try { - sharedPulsarClient = SharedPulsarClient.get(componentId, pulsarSpoutConf.getServiceUrl(), clientConf); + sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf); if (pulsarSpoutConf.isSharedConsumerEnabled()) { - consumer = sharedPulsarClient.getSharedConsumer(pulsarSpoutConf.getTopic(), - pulsarSpoutConf.getSubscriptionName(), consumerConf); + consumer = sharedPulsarClient.getSharedConsumer(consumerConf); } else { - consumer = sharedPulsarClient.getClient().subscribe(pulsarSpoutConf.getTopic(), - pulsarSpoutConf.getSubscriptionName(), consumerConf); + try { + consumer = sharedPulsarClient.getClient().subscribeAsync(consumerConf).join(); + } catch (CompletionException e) { + throw (PulsarClientException) e.getCause(); + } } LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId, pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName()); @@ -244,7 +277,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { } - private boolean mapToValueAndEmit(Message msg) { + private boolean mapToValueAndEmit(Message msg) { if (msg != null) { Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg); ++pendingAcks; diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java index 86740774db562..4506e11a7bd3e 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java @@ -18,36 +18,37 @@ */ package org.apache.pulsar.storm; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Maps; -import org.apache.pulsar.client.api.ClientConfiguration; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; -import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; public class SharedPulsarClient { private static final Logger LOG = LoggerFactory.getLogger(SharedPulsarClient.class); private static final ConcurrentMap instances = Maps.newConcurrentMap(); private final String componentId; - private final PulsarClient client; + private final PulsarClientImpl client; private final AtomicInteger counter = new AtomicInteger(); - private Consumer consumer; - private Producer producer; + private Consumer consumer; + private Producer producer; - private SharedPulsarClient(String componentId, String serviceUrl, ClientConfiguration clientConf) + private SharedPulsarClient(String componentId, ClientConfigurationData clientConf) throws PulsarClientException { - this.client = PulsarClient.create(serviceUrl, clientConf); + this.client = new PulsarClientImpl(clientConf); this.componentId = componentId; } @@ -62,13 +63,13 @@ private SharedPulsarClient(String componentId, String serviceUrl, ClientConfigur * @return * @throws PulsarClientException */ - public static SharedPulsarClient get(String componentId, String serviceUrl, ClientConfiguration clientConf) + public static SharedPulsarClient get(String componentId, ClientConfigurationData clientConf) throws PulsarClientException { AtomicReference exception = new AtomicReference(); instances.computeIfAbsent(componentId, pulsarClient -> { SharedPulsarClient sharedPulsarClient = null; try { - sharedPulsarClient = new SharedPulsarClient(componentId, serviceUrl, clientConf); + sharedPulsarClient = new SharedPulsarClient(componentId, clientConf); LOG.info("[{}] Created a new Pulsar Client.", componentId); } catch (PulsarClientException e) { exception.set(e); @@ -81,33 +82,41 @@ public static SharedPulsarClient get(String componentId, String serviceUrl, Clie return instances.get(componentId); } - public PulsarClient getClient() { + public PulsarClientImpl getClient() { counter.incrementAndGet(); return client; } - public Consumer getSharedConsumer(String topic, String subscription, ConsumerConfiguration consumerConf) + public Consumer getSharedConsumer(ConsumerConfigurationData consumerConf) throws PulsarClientException { counter.incrementAndGet(); synchronized (this) { if (consumer == null) { - consumer = client.subscribe(topic, subscription, consumerConf); - LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, topic); + try { + consumer = client.subscribeAsync(consumerConf).join(); + } catch (CompletionException e) { + throw (PulsarClientException) e.getCause(); + } + LOG.info("[{}] Created a new Pulsar Consumer on {}", componentId, consumerConf.getSingleTopic()); } else { - LOG.info("[{}] Using a shared consumer on {}", componentId, topic); + LOG.info("[{}] Using a shared consumer on {}", componentId, consumerConf.getSingleTopic()); } } return consumer; } - public Producer getSharedProducer(String topic, ProducerConfiguration producerConf) throws PulsarClientException { + public Producer getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException { counter.incrementAndGet(); synchronized (this) { if (producer == null) { - producer = client.createProducer(topic, producerConf); - LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, topic); + try { + producer = client.createProducerAsync(producerConf).join(); + } catch (CompletionException e) { + throw (PulsarClientException) e.getCause(); + } + LOG.info("[{}] Created a new Pulsar Producer on {}", componentId, producerConf.getTopicName()); } else { - LOG.info("[{}] Using a shared producer on {}", componentId, topic); + LOG.info("[{}] Using a shared producer on {}", componentId, producerConf.getTopicName()); } } return producer; diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java index f07eeba7fc96f..e508412e6743f 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java @@ -18,8 +18,6 @@ */ package org.apache.pulsar.testclient; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -36,16 +34,14 @@ import org.HdrHistogram.Histogram; import org.HdrHistogram.Recorder; -import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.EncryptionKeyInfo; -import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -197,33 +193,33 @@ public static void main(String[] args) throws Exception { final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null; - MessageListener listener = new MessageListener() { - public void received(Consumer consumer, Message msg) { - messagesReceived.increment(); - bytesReceived.add(msg.getData().length); + MessageListener listener = (consumer, msg) -> { + messagesReceived.increment(); + bytesReceived.add(msg.getData().length); - if (limiter != null) { - limiter.acquire(); - } + if (limiter != null) { + limiter.acquire(); + } - long latencyMillis = System.currentTimeMillis() - msg.getPublishTime(); - recorder.recordValue(latencyMillis); - cumulativeRecorder.recordValue(latencyMillis); + long latencyMillis = System.currentTimeMillis() - msg.getPublishTime(); + recorder.recordValue(latencyMillis); + cumulativeRecorder.recordValue(latencyMillis); - consumer.acknowledgeAsync(msg); - } + consumer.acknowledgeAsync(msg); }; - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setConnectionsPerBroker(arguments.maxConnections); - clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS); - clientConf.setIoThreads(Runtime.getRuntime().availableProcessors()); + ClientBuilder clientBuilder = PulsarClient.builder() // + .serviceUrl(arguments.serviceURL) // + .connectionsPerBroker(arguments.maxConnections) // + .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) // + .ioThreads(Runtime.getRuntime().availableProcessors()) // + .enableTls(arguments.useTls) // + .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath); if (isNotBlank(arguments.authPluginClassName)) { - clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams); + clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams); } - clientConf.setUseTls(arguments.useTls); - clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath); - PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf); + + PulsarClient pulsarClient = clientBuilder.build(); class EncKeyReader implements CryptoKeyReader { @@ -246,16 +242,17 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe return null; } } + List>> futures = Lists.newArrayList(); - ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); - consumerConfig.setMessageListener(listener); - consumerConfig.setReceiverQueueSize(arguments.receiverQueueSize); - consumerConfig.setSubscriptionType(arguments.subscriptionType); + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer() // + .messageListener(listener) // + .receiverQueueSize(arguments.receiverQueueSize) // + .subscriptionType(arguments.subscriptionType); if (arguments.encKeyName != null) { byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile)); EncKeyReader keyReader = new EncKeyReader(pKey); - consumerConfig.setCryptoKeyReader(keyReader); + consumerBuilder.cryptoKeyReader(keyReader); } for (int i = 0; i < arguments.numTopics; i++) { @@ -271,7 +268,8 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe subscriberName = arguments.subscriberName; } - futures.add(pulsarClient.subscribeAsync(topicName.toString(), subscriberName, consumerConfig)); + futures.add(consumerBuilder.clone().topic(topicName.toString()).subscriptionName(subscriberName) + .subscribeAsync()); } } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java index 681eb3d2785b8..41dfa98149e43 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceProducer.java @@ -42,16 +42,14 @@ import org.HdrHistogram.Histogram; import org.HdrHistogram.HistogramLogWriter; import org.HdrHistogram.Recorder; -import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.CryptoKeyReader; import org.apache.pulsar.client.api.EncryptionKeyInfo; +import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; -import org.apache.pulsar.client.api.ProducerConfiguration; -import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.ProducerBuilder; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.impl.PulsarClientImpl; -import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue; import org.apache.pulsar.testclient.utils.PaddingDecimalFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -233,15 +231,17 @@ public static void main(String[] args) throws Exception { String prefixTopicName = arguments.topics.get(0); List>> futures = Lists.newArrayList(); - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setConnectionsPerBroker(arguments.maxConnections); - clientConf.setIoThreads(Runtime.getRuntime().availableProcessors()); - clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS); + ClientBuilder clientBuilder = PulsarClient.builder() // + .serviceUrl(arguments.serviceURL) // + .connectionsPerBroker(arguments.maxConnections) // + .ioThreads(Runtime.getRuntime().availableProcessors()) // + .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) // + .enableTls(arguments.useTls) // + .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath); + if (isNotBlank(arguments.authPluginClassName)) { - clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams); + clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams); } - clientConf.setUseTls(arguments.useTls); - clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath); class EncKeyReader implements CryptoKeyReader { @@ -264,27 +264,26 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe return null; } } - PulsarClient client = new PulsarClientImpl(arguments.serviceURL, clientConf); - - ProducerConfiguration producerConf = new ProducerConfiguration(); - producerConf.setSendTimeout(0, TimeUnit.SECONDS); - producerConf.setCompressionType(arguments.compression); - producerConf.setMaxPendingMessages(arguments.maxOutstanding); - // enable round robin message routing if it is a partitioned topic - producerConf.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition); + PulsarClient client = clientBuilder.build(); + ProducerBuilder producerBuilder = client.newProducer() // + .sendTimeout(0, TimeUnit.SECONDS) // + .compressionType(arguments.compression) // + .maxPendingMessages(arguments.maxOutstanding) // + // enable round robin message routing if it is a partitioned topic + .messageRoutingMode(MessageRoutingMode.RoundRobinPartition); + if (arguments.batchTime > 0) { - producerConf.setBatchingMaxPublishDelay(arguments.batchTime, TimeUnit.MILLISECONDS); - producerConf.setBatchingEnabled(true); + producerBuilder.batchingMaxPublishDelay(arguments.batchTime, TimeUnit.MILLISECONDS).enableBatching(true); } // Block if queue is full else we will start seeing errors in sendAsync - producerConf.setBlockIfQueueFull(true); + producerBuilder.blockIfQueueFull(true); if (arguments.encKeyName != null) { - producerConf.addEncryptionKey(arguments.encKeyName); + producerBuilder.addEncryptionKey(arguments.encKeyName); byte[] pKey = Files.readAllBytes(Paths.get(arguments.encKeyFile)); EncKeyReader keyReader = new EncKeyReader(pKey); - producerConf.setCryptoKeyReader(keyReader); + producerBuilder.cryptoKeyReader(keyReader); } for (int i = 0; i < arguments.numTopics; i++) { @@ -292,7 +291,7 @@ public EncryptionKeyInfo getPrivateKey(String keyName, Map keyMe log.info("Adding {} publishers on topic {}", arguments.numProducers, topic); for (int j = 0; j < arguments.numProducers; j++) { - futures.add(client.createProducerAsync(topic, producerConf)); + futures.add(producerBuilder.clone().topic(topic).createAsync()); } } diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java index c5e66b2141bf9..7a4fdf9647cbb 100644 --- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java +++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceReader.java @@ -29,14 +29,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; -import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.ReaderConfiguration; +import org.apache.pulsar.client.api.ReaderBuilder; import org.apache.pulsar.client.api.ReaderListener; import org.apache.pulsar.client.impl.MessageIdImpl; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; @@ -173,7 +172,7 @@ public static void main(String[] args) throws Exception { final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null; - ReaderListener listener = (reader, msg) -> { + ReaderListener listener = (reader, msg) -> { messagesReceived.increment(); bytesReceived.add(msg.getData().length); @@ -182,21 +181,21 @@ public static void main(String[] args) throws Exception { } }; - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setConnectionsPerBroker(arguments.maxConnections); - clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS); - clientConf.setIoThreads(Runtime.getRuntime().availableProcessors()); + ClientBuilder clientBuilder = PulsarClient.builder() // + .serviceUrl(arguments.serviceURL) // + .connectionsPerBroker(arguments.maxConnections) // + .statsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS) // + .ioThreads(Runtime.getRuntime().availableProcessors()) // + .enableTls(arguments.useTls) // + .tlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath); + if (isNotBlank(arguments.authPluginClassName)) { - clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams); + clientBuilder.authentication(arguments.authPluginClassName, arguments.authParams); } - clientConf.setUseTls(arguments.useTls); - clientConf.setTlsTrustCertsFilePath(arguments.tlsTrustCertsFilePath); - PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf); + + PulsarClient pulsarClient = clientBuilder.build(); List>> futures = Lists.newArrayList(); - ReaderConfiguration readerConfig = new ReaderConfiguration(); - readerConfig.setReaderListener(listener); - readerConfig.setReceiverQueueSize(arguments.receiverQueueSize); MessageId startMessageId; if ("earliest".equals(arguments.startMessageId)) { @@ -208,11 +207,16 @@ public static void main(String[] args) throws Exception { startMessageId = new MessageIdImpl(Long.parseLong(parts[0]), Long.parseLong(parts[1]), -1); } + ReaderBuilder readerBuilder = pulsarClient.newReader() // + .readerListener(listener) // + .receiverQueueSize(arguments.receiverQueueSize) // + .startMessageId(startMessageId); + for (int i = 0; i < arguments.numTopics; i++) { final TopicName topicName = (arguments.numTopics == 1) ? prefixTopicName : TopicName.get(String.format("%s-%d", prefixTopicName, i)); - futures.add(pulsarClient.createReaderAsync(topicName.toString(), startMessageId, readerConfig)); + futures.add(readerBuilder.clone().topic(topicName.toString()).createAsync()); } FutureUtil.waitForAll(futures).get(); diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java index 6573477d49a64..b917dc24475ec 100644 --- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java +++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java @@ -36,7 +36,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationService; import org.apache.pulsar.broker.authorization.AuthorizationService; import org.apache.pulsar.broker.cache.ConfigurationCacheService; -import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; @@ -176,30 +176,33 @@ public synchronized PulsarClient getPulsarClient() throws IOException { } private PulsarClient createClientInstance(ClusterData clusterData) throws IOException { - ClientConfiguration clientConf = new ClientConfiguration(); - clientConf.setStatsInterval(0, TimeUnit.SECONDS); - clientConf.setUseTls(config.isTlsEnabled()); - clientConf.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection()); - clientConf.setTlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath()); - clientConf.setIoThreads(config.getWebSocketNumIoThreads()); - clientConf.setConnectionsPerBroker(config.getWebSocketConnectionsPerBroker()); + ClientBuilder clientBuilder = PulsarClient.builder() // + .statsInterval(0, TimeUnit.SECONDS) // + .enableTls(config.isTlsEnabled()) // + .allowTlsInsecureConnection(config.isTlsAllowInsecureConnection()) // + .tlsTrustCertsFilePath(config.getBrokerClientTrustCertsFilePath()) // + .ioThreads(config.getWebSocketNumIoThreads()) // + .connectionsPerBroker(config.getWebSocketConnectionsPerBroker()); if (isNotBlank(config.getBrokerClientAuthenticationPlugin()) && isNotBlank(config.getBrokerClientAuthenticationParameters())) { - clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(), + clientBuilder.authentication(config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()); } if (config.isTlsEnabled()) { if (isNotBlank(clusterData.getBrokerServiceUrlTls())) { - return PulsarClient.create(clusterData.getBrokerServiceUrlTls(), clientConf); + clientBuilder.serviceUrl(clusterData.getBrokerServiceUrlTls()); } else if (isNotBlank(clusterData.getServiceUrlTls())) { - return PulsarClient.create(clusterData.getServiceUrlTls(), clientConf); + clientBuilder.serviceUrl(clusterData.getServiceUrlTls()); } } else if (isNotBlank(clusterData.getBrokerServiceUrl())) { - return PulsarClient.create(clusterData.getBrokerServiceUrl(), clientConf); + clientBuilder.serviceUrl(clusterData.getBrokerServiceUrl()); + } else { + clientBuilder.serviceUrl(clusterData.getServiceUrl()); } - return PulsarClient.create(clusterData.getServiceUrl(), clientConf); + + return clientBuilder.build(); } private static ClusterData createClusterData(WebSocketProxyConfiguration config) {