Skip to content

Commit

Permalink
Update default values for a few publisher settings (apache#1440)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie authored Apr 16, 2018
1 parent bc0a4b8 commit 8fbb926
Show file tree
Hide file tree
Showing 35 changed files with 774 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ProducerImpl;
Expand Down Expand Up @@ -74,7 +75,10 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.producerBuilder = client.newProducer() //
.topic(topicName).sendTimeout(0, TimeUnit.SECONDS) //
.topic(topicName)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.enableBatching(false)
.sendTimeout(0, TimeUnit.SECONDS) //
.maxPendingMessages(producerQueueSize) //
.producerName(getReplicatorName(replicatorPrefix, localCluster));
STATE_UPDATER.set(this, State.Stopped);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,11 @@ public void namespaces() throws PulsarAdminException, PulsarServerException, Exc
assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new PersistencePolicies(3, 2, 1, 10.0));

// Force topic creation and namespace being loaded
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/my-topic").create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://prop-xyz/ns1/my-topic")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.close();
admin.persistentTopics().delete("persistent://prop-xyz/ns1/my-topic");

Expand Down Expand Up @@ -804,8 +808,11 @@ public void partitionedTopics(String topicName) throws Exception {
admin.persistentTopics().deleteSubscription(partitionedTopicName, "my-sub-1");
assertEquals(admin.persistentTopics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));

Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
Producer<byte[]> producer = client.newProducer()
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();

for (int i = 0; i < 10; i++) {
String message = "message-" + i;
Expand Down Expand Up @@ -859,7 +866,11 @@ public void partitionedTopics(String topicName) throws Exception {
} catch (ConflictException ce) {
}

producer = client.newProducer().topic(partitionedTopicName).create();
producer = client.newProducer()
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

topics = admin.persistentTopics().getList("prop-xyz/ns1");
assertEquals(topics.size(), 4);
Expand Down Expand Up @@ -918,7 +929,11 @@ public void testNamespaceSplitBundle() throws Exception {
// Force to create a topic
final String namespace = "prop-xyz/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("message".getBytes());
publishMessagesOnPersistentTopic(topicName, 0);
assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName));
Expand All @@ -944,7 +959,11 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
// Force to create a topic
final String namespace = "prop-xyz/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("message".getBytes());
publishMessagesOnPersistentTopic(topicName, 0);
assertEquals(admin.persistentTopics().getList(namespace), Lists.newArrayList(topicName));
Expand Down Expand Up @@ -1037,7 +1056,11 @@ public void testNamespaceUnloadBundle() throws Exception {
Lists.newArrayList("my-sub"));

// Create producer
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1/ds2").create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://prop-xyz/ns1/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
Expand Down Expand Up @@ -1095,7 +1118,11 @@ public void testNamespaceBundleUnload(Integer numBundles) throws Exception {
Lists.newArrayList("my-sub"));

// Create producer
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://prop-xyz/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
Expand Down Expand Up @@ -1148,7 +1175,11 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
.subscribe();

// Create producer
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds2").create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://prop-xyz/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
Expand All @@ -1157,7 +1188,11 @@ public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
producer.close();

// Create producer
Producer<byte[]> producer1 = pulsarClient.newProducer().topic("persistent://prop-xyz/ns1-bundles/ds1").create();
Producer<byte[]> producer1 = pulsarClient.newProducer()
.topic("persistent://prop-xyz/ns1-bundles/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer1.send(message.getBytes());
Expand Down Expand Up @@ -1251,7 +1286,11 @@ private void publishMessagesOnPersistentTopic(String topicName, int messages) th
}

private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

for (int i = startIdx; i < (messages + startIdx); i++) {
String message = "message-" + i;
Expand Down Expand Up @@ -1296,7 +1335,11 @@ public void statsOnNonExistingTopics() throws Exception {
@Test
public void testDeleteFailedReturnCode() throws Exception {
String topicName = "persistent://prop-xyz/ns1/my-topic";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

try {
admin.persistentTopics().delete(topicName);
Expand Down Expand Up @@ -1684,8 +1727,11 @@ public void testPersistentTopicExpireMessageOnParitionTopic() throws Exception {
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/ns1/ds1")
.subscriptionName("my-sub").subscribe();

Producer<byte[]> producer = client.newProducer().topic("persistent://prop-xyz/ns1/ds1")
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
Producer<byte[]> producer = client.newProducer()
.topic("persistent://prop-xyz/ns1/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,11 @@ public void testIncrementPartitionsOfTopic() throws Exception {
.toString();

// (3) produce messages to all partitions including newly created partitions (RoundRobin)
Producer<byte[]> producer = client.newProducer().topic(partitionedTopicName)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition).create();
Producer<byte[]> producer = client.newProducer()
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();
final int totalMessages = newPartitions * 2;
for (int i = 0; i < totalMessages; i++) {
String message = "message-" + i;
Expand Down Expand Up @@ -268,7 +271,11 @@ public void nonPersistentTopics() throws Exception {
}

private void publishMessagesOnTopic(String topicName, int messages, int startIdx) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

for (int i = startIdx; i < (messages + startIdx); i++) {
String message = "message-" + i;
Expand Down Expand Up @@ -331,7 +338,11 @@ public void testUpdatePersistencePolicyUpdateManagedCursor() throws Exception {
admin.namespaces().setPersistence(namespace, new PersistencePolicies(3, 3, 3, 50.0));
assertEquals(admin.namespaces().getPersistence(namespace), new PersistencePolicies(3, 3, 3, 50.0));

Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
Expand Down Expand Up @@ -381,7 +392,7 @@ public void testUnloadTopic(final String topicType) throws Exception {
assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());

// recreation of producer will load the topic again
producer = pulsarClient.newProducer().topic(topicName).create();
pulsarClient.newProducer().topic(topicName).create();
topic = pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topic);
// unload the topic
Expand Down Expand Up @@ -502,7 +513,11 @@ public void testResetCursorOnPosition(String namespaceName) throws Exception {
}

private void publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx) throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

for (int i = startIdx; i < (messages + startIdx); i++) {
String message = "message-" + i;
Expand Down Expand Up @@ -741,7 +756,12 @@ public void testPublishConsumerStats() throws Exception {
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).build();
Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subscriberName)
.subscriptionType(SubscriptionType.Shared).subscribe();
Producer<byte[]> producer = client.newProducer().topic(topic).producerName(producerName).create();
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.producerName(producerName)
.create();

retryStrategically((test) -> {
PersistentTopicStats stats;
Expand Down
Loading

0 comments on commit 8fbb926

Please sign in to comment.