Skip to content

Commit

Permalink
Introduce batch message container framework and support key based bat…
Browse files Browse the repository at this point in the history
…ching container (apache#4435)

### Motivation

Introduce batch message container framework to support multiple ways to do message batch. 
Currently, pulsar support a most basic batch message container, use the batch message container framework can quickly implement other types batch message container, even users can customize their own batch message container.

Add a new batch message container named BatchMessageKeyBasedContainer to support batching message in key_shared subscription mode.
  • Loading branch information
codelipenghui authored and sijie committed Jun 19, 2019
1 parent 34f1a58 commit b45736a
Show file tree
Hide file tree
Showing 18 changed files with 914 additions and 275 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.util.ArrayList;
Expand All @@ -37,6 +38,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertTrue;
Expand All @@ -46,6 +48,13 @@ public class KeySharedSubscriptionTest extends ProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(KeySharedSubscriptionTest.class);
private static final List<String> keys = Arrays.asList("0", "1", "2", "3", "4", "5", "6", "7", "8", "9");

@DataProvider(name = "batch")
public Object[][] batchProvider() {
return new Object[][] {
{ false },
{ true }
};
}

@BeforeMethod
@Override
Expand All @@ -60,40 +69,22 @@ protected void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
@Test(dataProvider = "batch")
public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared";
String topic = "persistent://public/default/key_shared-" + UUID.randomUUID();

@Cleanup
Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer1 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer2 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer3 = createConsumer(topic);

@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();
Producer<Integer> producer = createProducer(topic, enableBatch);

int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
Expand Down Expand Up @@ -129,41 +120,23 @@ public void testSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws Pu
receiveAndCheck(checkList);
}

@Test
public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException, InterruptedException {
@Test(dataProvider = "batch")
public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException, InterruptedException {

this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_consumer_crash";
String topic = "persistent://public/default/key_shared_consumer_crash-" + UUID.randomUUID();

@Cleanup
Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer1 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer2 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer3 = createConsumer(topic);

@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();
Producer<Integer> producer = createProducer(topic, enableBatch);

int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
Expand Down Expand Up @@ -219,45 +192,27 @@ public void testConsumerCrashSendAndReceiveWithHashRangeStickyKeyConsumerSelecto
}


@Test
public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
@Test(dataProvider = "batch")
public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_none_key";
String topic = "persistent://public/default/key_shared_none_key-" + UUID.randomUUID();

@Cleanup
Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer1 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer2 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer3 = createConsumer(topic);

@Cleanup
Producer<Integer> producer = createProducer(topic, enableBatch);

int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
int consumer3Slot = consumer2Slot >> 1;

@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();

for (int i = 0; i < 100; i++) {
producer.newMessage()
.value(i)
Expand All @@ -276,40 +231,22 @@ public void testNonKeySendAndReceiveWithHashRangeStickyKeyConsumerSelector() thr
receiveAndCheck(checkList);
}

@Test
public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector() throws PulsarClientException {
@Test(dataProvider = "batch")
public void testOrderingKeyWithHashRangeStickyKeyConsumerSelector(boolean enableBatch) throws PulsarClientException {
this.conf.setSubscriptionKeySharedEnable(true);
String topic = "persistent://public/default/key_shared_ordering_key";
String topic = "persistent://public/default/key_shared_ordering_key-" + UUID.randomUUID();

@Cleanup
Consumer<Integer> consumer1 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer1 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer2 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer2 = createConsumer(topic);

@Cleanup
Consumer<Integer> consumer3 = pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
Consumer<Integer> consumer3 = createConsumer(topic);

@Cleanup
Producer<Integer> producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();
Producer<Integer> producer = createProducer(topic, enableBatch);

int consumer1Slot = HashRangeStickyKeyConsumerSelector.DEFAULT_RANGE_SIZE;
int consumer2Slot = consumer1Slot >> 1;
Expand Down Expand Up @@ -358,6 +295,32 @@ public void testDisableKeySharedSubscription() throws PulsarClientException {
.subscribe();
}

private Producer<Integer> createProducer(String topic, boolean enableBatch) throws PulsarClientException {
Producer<Integer> producer = null;
if (enableBatch) {
producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(true)
.batcherBuilder(BatcherBuilder.KEY_BASED)
.create();
} else {
producer = pulsarClient.newProducer(Schema.INT32)
.topic(topic)
.enableBatching(false)
.create();
}
return producer;
}

private Consumer<Integer> createConsumer(String topic) throws PulsarClientException {
return pulsarClient.newConsumer(Schema.INT32)
.topic(topic)
.subscriptionName("key_shared")
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(3, TimeUnit.SECONDS)
.subscribe();
}

private void receiveAndCheck(List<KeyValue<Consumer<Integer>, Integer>> checkList) throws PulsarClientException {
Map<Consumer, Set<String>> consumerKeys = new HashMap<>();
for (KeyValue<Consumer<Integer>, Integer> check : checkList) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* Batch message container for individual messages being published until they are batched and sent to broker
*/
public interface BatchMessageContainer {

/**
* Clear the message batch container.
*/
void clear();

/**
* Check the message batch container is empty.
*
* @return return true if empty, otherwise return false.
*/
boolean isEmpty();

/**
* Get count of messages in the message batch container.
*
* @return messages count
*/
int getNumMessagesInBatch();

/**
* Get current message batch size of the message batch container in bytes.
*
* @return message batch size in bytes
*/
long getCurrentBatchSize();

/**
* Release the payload and clear the container.
*
* @param ex cause
*/
void discard(Exception ex);

/**
* Return the batch container batch message in multiple batches
* @return
*/
boolean isMultiBatches();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import org.apache.pulsar.client.internal.DefaultImplementation;

/**
* Batcher builder
*/
public interface BatcherBuilder {

/**
* Default batch message container
*
* incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
*
* batched into single batch message:
* [(k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)]
*/
BatcherBuilder DEFAULT = DefaultImplementation.newDefaultBatcherBuilder();

/**
* Key based batch message container
*
* incoming single messages:
* (k1, v1), (k2, v1), (k3, v1), (k1, v2), (k2, v2), (k3, v2), (k1, v3), (k2, v3), (k3, v3)
*
* batched into multiple batch messages:
* [(k1, v1), (k1, v2), (k1, v3)], [(k2, v1), (k2, v2), (k2, v3)], [(k3, v1), (k3, v2), (k3, v3)]
*/
BatcherBuilder KEY_BASED = DefaultImplementation.newKeyBasedBatcherBuilder();

/**
* Build a new batch message container.
* @return new batch message container
*/
BatchMessageContainer build();

}
Loading

0 comments on commit b45736a

Please sign in to comment.