Skip to content

Commit

Permalink
Delayed acks impl (apache#1462)
Browse files Browse the repository at this point in the history
* Added delayed grouped acknowledgments

* Fixed flush on consumer close

* Fixed Kafka consumer test
  • Loading branch information
merlimat authored Apr 3, 2018
1 parent 46411fe commit 19dd2c5
Show file tree
Hide file tree
Showing 35 changed files with 821 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
Expand All @@ -30,13 +39,13 @@
import java.util.stream.Collectors;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;

import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
Expand All @@ -51,14 +60,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

/**
* A Consumer is a consumer currently connected and associated with a Subscription
*/
Expand Down Expand Up @@ -288,7 +289,7 @@ void updatePermitsAndPendingAcks(final List<Entry> entries, SendMessageInfo sent
iter.remove();
PositionImpl pos = (PositionImpl) entry.getPosition();
entry.release();
subscription.acknowledgeMessage(pos, AckType.Individual, Collections.emptyMap());
subscription.acknowledgeMessage(Collections.singletonList(pos), AckType.Individual, Collections.emptyMap());
continue;
}
if (pendingAcks != null) {
Expand Down Expand Up @@ -363,31 +364,46 @@ void doUnsubscribe(final long requestId) {
}

void messageAcked(CommandAck ack) {
MessageIdData msgId = ack.getMessageId();
PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());

if (ack.hasValidationError()) {
log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", subscription, consumerId,
position, ack.getValidationError());
}

Map<String,Long> properties = Collections.emptyMap();
if (ack.getPropertiesCount() > 0) {
properties = ack.getPropertiesList().stream()
.collect(Collectors.toMap((e) -> e.getKey(),
(e) -> e.getValue()));
}
if (subType == SubType.Shared) {
// On shared subscriptions, cumulative ack is not supported
checkArgument(ack.getAckType() == AckType.Individual);

// Only ack a single message
removePendingAcks(position);
subscription.acknowledgeMessage(position, AckType.Individual, properties);
if (ack.getAckType() == AckType.Cumulative) {
if (ack.getMessageIdCount() != 1) {
log.warn("[{}] [{}] Received multi-message ack at {} - Reason: {}", subscription, consumerId);
return;
}

if (subType == SubType.Shared) {
log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", subscription, consumerId);
return;
}

MessageIdData msgId = ack.getMessageId(0);
PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
subscription.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative, properties);
} else {
subscription.acknowledgeMessage(position, ack.getAckType(), properties);
}
// Individual ack
List<Position> positionsAcked = new ArrayList<>();
for (int i = 0; i < ack.getMessageIdCount(); i++) {
MessageIdData msgId = ack.getMessageId(i);
PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
positionsAcked.add(position);

if (subType == SubType.Shared) {
removePendingAcks(position);
}

if (ack.hasValidationError()) {
log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", subscription,
consumerId, position, ack.getValidationError());
}
}
subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties);
}
}

void flowPermits(int additionalNumberOfMessages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface Subscription {

void consumerFlow(Consumer consumer, int additionalNumberOfMessages);

void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties);
void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String,Long> properties);

String getTopicName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.nonpersistent;

import com.google.common.base.MoreObjects;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -43,8 +45,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;

public class NonPersistentSubscription implements Subscription {
private final NonPersistentTopic topic;
private volatile NonPersistentDispatcher dispatcher;
Expand Down Expand Up @@ -139,7 +139,7 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
}

@Override
public void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties) {
public void acknowledgeMessage(List<Position> position, AckType ackType, Map<String, Long> properties) {
// No-op
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,23 @@
package org.apache.pulsar.broker.service.persistent;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentFindCursorPositionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;

public class CompactorSubscription extends PersistentSubscription {
private CompactedTopic compactedTopic;

Expand All @@ -79,11 +54,14 @@ public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopi
}

@Override
public void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties) {
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String,Long> properties) {
checkArgument(ackType == AckType.Cumulative);
checkArgument(positions.size() == 1);
checkArgument(properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY));
long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);

Position position = positions.get(0);

if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cumulative ack on compactor subscription {}", topicName, subName, position);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.service.persistent;

import com.google.common.base.MoreObjects;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -56,8 +58,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;

public class PersistentSubscription implements Subscription {
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
Expand Down Expand Up @@ -176,17 +176,23 @@ public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
}

@Override
public void acknowledgeMessage(PositionImpl position, AckType ackType, Map<String,Long> properties) {
public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<String,Long> properties) {
if (ackType == AckType.Cumulative) {
if (positions.size() != 1) {
log.warn("[{}][{}] Invalid cumulative ack received with multiple message ids", topicName, subName);
return;
}

Position position = positions.get(0);
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cumulative ack on {}", topicName, subName, position);
}
cursor.asyncMarkDelete(position, properties, markDeleteCallback, position);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Individual ack on {}", topicName, subName, position);
log.debug("[{}][{}] Individual acks on {}", topicName, subName, positions);
}
cursor.asyncDelete(position, deleteCallback, position);
cursor.asyncDelete(positions, deleteCallback, positions);
}

if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog() == 0) {
Expand Down Expand Up @@ -215,10 +221,9 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {

private final DeleteCallback deleteCallback = new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
PositionImpl pos = (PositionImpl) ctx;
public void deleteComplete(Object position) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Deleted message at {}", topicName, subName, pos);
log.debug("[{}][{}] Deleted message at {}", topicName, subName, position);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ static class RawConsumerImpl extends ConsumerImpl<byte[]> {
final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
final Queue<CompletableFuture<RawMessage>> pendingRawReceives;

RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf,
RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<byte[]> conf,
CompletableFuture<Consumer<byte[]>> consumerFuture) {
super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
consumerFuture, SubscriptionMode.Durable, MessageId.earliest, Schema.IDENTITY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,7 @@ public void persistentTopicsCursorReset(String topicName) throws Exception {

// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).subscribe();
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();

assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));

Expand Down Expand Up @@ -1450,7 +1450,7 @@ public void persistentTopicsCursorResetAfterReset(String topicName) throws Excep

// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).subscribe();
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();

assertEquals(admin.persistentTopics().getSubscriptions(topicName), Lists.newArrayList("my-sub"));

Expand Down Expand Up @@ -1521,7 +1521,7 @@ public void partitionedTopicsCursorReset(String topicName) throws Exception {

// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).subscribe();
.subscriptionType(SubscriptionType.Exclusive).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();

List<String> topics = admin.persistentTopics().getList("prop-xyz/use/ns1");
assertEquals(topics.size(), 4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public void testBrokerServicePersistentTopicStats() throws Exception {
PersistentTopicStats stats;
SubscriptionStats subStats;

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
Expand Down Expand Up @@ -217,7 +218,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception {
SubscriptionStats subStats;

Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).subscribe();
.subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);

PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception {
TestConsumerStateEventListener listener1 = new TestConsumerStateEventListener();
TestConsumerStateEventListener listener2 = new TestConsumerStateEventListener();
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover);
.acknowledmentGroupTime(0, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover);


// 1. two consumers on the same subscription
ConsumerBuilder<byte[]> consumerBulder1 = consumerBuilder.clone().consumerName("1")
.consumerEventListener(listener1);
.consumerEventListener(listener1).acknowledmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBulder1.subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("2").consumerEventListener(listener2)
.subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,8 @@ public void testUnackedCountWithRedeliveries() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();

ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(10).subscriptionType(SubscriptionType.Shared);
.receiverQueueSize(10).subscriptionType(SubscriptionType.Shared)
.acknowledmentGroupTime(0, TimeUnit.SECONDS);
ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) consumerBuilder.subscribe();

for (int i = 0; i < numMsgs; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1243,8 +1243,8 @@ public void testCompactorSubscription() throws Exception {
cursorMock);
PositionImpl position = new PositionImpl(1, 1);
long ledgerId = 0xc0bfefeL;
sub.acknowledgeMessage(position, AckType.Cumulative,
ImmutableMap.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId));
sub.acknowledgeMessage(Collections.singletonList(position), AckType.Cumulative,
ImmutableMap.of(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY, ledgerId));
verify(compactedTopic, Mockito.times(1)).newCompactedLedger(position, ledgerId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public void testFailoverSingleAckedNormalTopic() throws Exception {

// 2. Create consumer
ConsumerBuilder<byte[]> consumerBuilder = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover);
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Failover)
.acknowledmentGroupTime(0, TimeUnit.SECONDS);
Consumer<byte[]> consumer1 = consumerBuilder.clone().consumerName("consumer-1").subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.clone().consumerName("consumer-2").subscribe();

Expand Down
Loading

0 comments on commit 19dd2c5

Please sign in to comment.