Skip to content

Commit

Permalink
Bug in Message Deduplication that may cause incorrect client/broker i…
Browse files Browse the repository at this point in the history
…nteraction (apache#5243)

* Bug in Message Deduplication that may cause incorrect behavior

* add tests

* fix error message

(cherry picked from commit 2ad58c4)
  • Loading branch information
jerrypeng authored and wolfstudy committed Nov 19, 2019
1 parent 1168709 commit 0150b1f
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
Expand Down Expand Up @@ -76,15 +77,33 @@ enum Status {
Failed,
}

enum MessageDupStatus {
// whether a message is a definitely a duplicate or not cannot be determined at this time
Unknown,
// message is definitely NOT a duplicate
NotDup,
// message is definitely a duplicate
Dup,
}

public static class MessageDupUnknownException extends RuntimeException {
public MessageDupUnknownException() {
super("Cannot determine whether the message is a duplicate at this time");
}
}


private volatile Status status;

// Map that contains the highest sequenceId that have been sent by each producers. The map will be updated before
// the messages are persisted
private final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap<>(16, 1);
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPushed = new ConcurrentOpenHashMap<>(16, 1);

// Map that contains the highest sequenceId that have been persistent by each producers. The map will be updated
// after the messages are persisted
private final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap<>(16, 1);
@VisibleForTesting
final ConcurrentOpenHashMap<String, Long> highestSequencedPersisted = new ConcurrentOpenHashMap<>(16, 1);

// Number of persisted entries after which to store a snapshot of the sequence ids map
private final int snapshotInterval;
Expand Down Expand Up @@ -258,9 +277,9 @@ public boolean isEnabled() {
*
* @return true if the message should be published or false if it was recognized as a duplicate
*/
public boolean shouldPublishNextMessage(PublishContext publishContext, ByteBuf headersAndPayload) {
public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf headersAndPayload) {
if (!isEnabled()) {
return true;
return MessageDupStatus.NotDup;
}

String producerName = publishContext.getProducerName();
Expand All @@ -287,12 +306,22 @@ public boolean shouldPublishNextMessage(PublishContext publishContext, ByteBuf h
log.debug("[{}] Message identified as duplicated producer={} seq-id={} -- highest-seq-id={}",
topic.getName(), producerName, sequenceId, lastSequenceIdPushed);
}
return false;

// Also need to check sequence ids that has been persisted.
// If current message's seq id is smaller smaller or equals to the lastSequenceIdPersisted than its definitely a dup
// If current message's seq id is between lastSequenceIdPersisted and lastSequenceIdPushed, then we cannot be sure whether the message is a dup or not
// we should return an error to the producer for the latter case so that it can retry at a future time
Long lastSequenceIdPersisted = highestSequencedPersisted.get(producerName);
if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
return MessageDupStatus.Dup;
} else {
return MessageDupStatus.Unknown;
}
}

highestSequencedPushed.put(producerName, sequenceId);
}
return true;
return MessageDupStatus.NotDup;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,17 @@ private PersistentSubscription createPersistentSubscription(String subscriptionN

@Override
public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
if (messageDeduplication.shouldPublishNextMessage(publishContext, headersAndPayload)) {
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
} else {
// Immediately acknowledge duplicated message
publishContext.completed(null, -1, -1);
MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
switch (status){
case NotDup:
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
break;
case Dup:
// Immediately acknowledge duplicated message
publishContext.completed(null, -1, -1);
break;
default:
publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import io.netty.buffer.ByteBuf;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.testng.annotations.Test;

import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

@Slf4j
public class MessageDuplicationTest {

private final static int BROKER_DEDUPLICATION_ENTRIES_INTERVAL = 10;
private final static int BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS = 10;
private final static String REPLICATOR_PREFIX = "foo";

@Test
public void testIsDuplicate() {
PulsarService pulsarService = mock(PulsarService.class);
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
serviceConfiguration.setBrokerDeduplicationEntriesInterval(BROKER_DEDUPLICATION_ENTRIES_INTERVAL);
serviceConfiguration.setBrokerDeduplicationMaxNumberOfProducers(BROKER_DEDUPLICATION_MAX_NUMBER_PRODUCERS);
serviceConfiguration.setReplicatorPrefix(REPLICATOR_PREFIX);

doReturn(serviceConfiguration).when(pulsarService).getConfiguration();
PersistentTopic persistentTopic = mock(PersistentTopic.class);
ManagedLedger managedLedger = mock(ManagedLedger.class);
MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, persistentTopic, managedLedger));
doReturn(true).when(messageDeduplication).isEnabled();

String producerName1 = "producer1";
ByteBuf byteBuf1 = getMessage(producerName1, 0);
Topic.PublishContext publishContext1 = getPublishContext(producerName1, 0);

String producerName2 = "producer2";
ByteBuf byteBuf2 = getMessage(producerName2, 1);
Topic.PublishContext publishContext2 = getPublishContext(producerName2, 1);

MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);

Long lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertTrue(lastSequenceIdPushed != null);
assertEquals(lastSequenceIdPushed.longValue(), 0);

status = messageDeduplication.isDuplicate(publishContext2, byteBuf2);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName2);
assertTrue(lastSequenceIdPushed != null);
assertEquals(lastSequenceIdPushed.longValue(), 1);

byteBuf1 = getMessage(producerName1, 1);
publishContext1 = getPublishContext(producerName1, 1);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertTrue(lastSequenceIdPushed != null);
assertEquals(lastSequenceIdPushed.longValue(), 1);

byteBuf1 = getMessage(producerName1, 5);
publishContext1 = getPublishContext(producerName1, 5);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertTrue(lastSequenceIdPushed != null);
assertEquals(lastSequenceIdPushed.longValue(), 5);

byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 0);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
// should expect unknown because highestSequencePersisted is empty
assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertTrue(lastSequenceIdPushed != null);
assertEquals(lastSequenceIdPushed.longValue(), 5);

// update highest sequence persisted
messageDeduplication.highestSequencedPersisted.put(producerName1, 0L);

byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 0);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
// now that highestSequencedPersisted, message with seqId of zero can be classified as a dup
assertEquals(status, MessageDeduplication.MessageDupStatus.Dup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertTrue(lastSequenceIdPushed != null);
assertEquals(lastSequenceIdPushed.longValue(), 5);
}

public ByteBuf getMessage(String producerName, long seqId) {
PulsarApi.MessageMetadata messageMetadata = PulsarApi.MessageMetadata.newBuilder()
.setProducerName(producerName).setSequenceId(seqId)
.setPublishTime(System.currentTimeMillis()).build();

ByteBuf byteBuf = serializeMetadataAndPayload(
Commands.ChecksumType.Crc32c, messageMetadata, io.netty.buffer.Unpooled.copiedBuffer(new byte[0]));

return byteBuf;
}

public Topic.PublishContext getPublishContext(String producerName, long seqId) {
return new Topic.PublishContext() {
@Override
public String getProducerName() {
return producerName;
}

public long getSequenceId() {
return seqId;
}

@Override
public void completed(Exception e, long ledgerId, long entryId) {

}
};
}
}

0 comments on commit 0150b1f

Please sign in to comment.