Skip to content

Commit

Permalink
[Issue 5476]Fix message deduplicate issue while using external sequen…
Browse files Browse the repository at this point in the history
…ce id with batch produce (apache#5491)

Fixes apache#5476 

### Motivation

Fix apache#5476

### Modifications

1. Add `last_sequence_id` in MessageMetadata and CommandSend, use sequence id and last_sequence_id  to indicate the batch `lowest_sequence_id` and `highest_sequence_id`.
2. Handle batch message deduplicate check in MessageDeduplication
3. Response the `last_sequence_id` to client and add message deduplicate check in client
  • Loading branch information
codelipenghui authored and sijie committed Nov 13, 2019
1 parent f8e91a2 commit d3cb108
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,25 @@ public boolean equals(Object obj) {
}

public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
publishMessageToTopic(headersAndPayload, sequenceId, batchSize);
}

public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId,
ByteBuf headersAndPayload, long batchSize) {
if (lowestSequenceId > highestSequenceId) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, highestSequenceId, ServerError.MetadataError,
"Invalid lowest or highest sequence id"));
cnx.completedSendOperation(isNonPersistentTopic);
});
return;
}
beforePublish(producerId, highestSequenceId, headersAndPayload, batchSize);
publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize);
}

public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) {
if (isClosed) {
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.PersistenceError,
Expand Down Expand Up @@ -170,11 +189,20 @@ public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndP
}

startPublishOperation((int) batchSize, headersAndPayload.readableBytes());
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
System.nanoTime()));
}

private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, long batchSize) {
topic.publishMessage(headersAndPayload,
MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize,
System.nanoTime()));
}

private boolean verifyChecksum(ByteBuf headersAndPayload) {
if (hasChecksum(headersAndPayload)) {
int readerIndex = headersAndPayload.readerIndex();
Expand Down Expand Up @@ -257,6 +285,9 @@ private static final class MessagePublishContext implements PublishContext, Runn
private String originalProducerName;
private long originalSequenceId;

private long highestSequenceId;
private long originalHighestSequenceId;

public String getProducerName() {
return producer.getProducerName();
}
Expand All @@ -265,6 +296,11 @@ public long getSequenceId() {
return sequenceId;
}

@Override
public long getHighestSequenceId() {
return highestSequenceId;
}

@Override
public void setOriginalProducerName(String originalProducerName) {
this.originalProducerName = originalProducerName;
Expand All @@ -285,6 +321,16 @@ public long getOriginalSequenceId() {
return originalSequenceId;
}

@Override
public void setOriginalHighestSequenceId(long originalHighestSequenceId) {
this.originalHighestSequenceId = originalHighestSequenceId;
}

@Override
public long getOriginalHighestSequenceId() {
return originalHighestSequenceId;
}

/**
* Executed from managed ledger thread when the message is persisted
*/
Expand All @@ -298,7 +344,8 @@ public void completed(Exception exception, long ledgerId, long entryId) {
if (!(exception instanceof TopicClosedException)) {
// For TopicClosed exception there's no need to send explicit error, since the client was
// already notified
producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, sequenceId,
long callBackSequenceId = Math.max(highestSequenceId, sequenceId);
producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, callBackSequenceId,
serverError, exception.getMessage()));
}
producer.cnx.completedSendOperation(producer.isNonPersistentTopic);
Expand Down Expand Up @@ -330,8 +377,9 @@ public void run() {
// stats
rateIn.recordMultipleEvents(batchSize, msgSize);
producer.topic.recordAddLatency(System.nanoTime() - startTimeNs, TimeUnit.NANOSECONDS);
long callBackSequenceId = Math.max(highestSequenceId, sequenceId);
producer.cnx.ctx().writeAndFlush(
Commands.newSendReceipt(producer.producerId, sequenceId, ledgerId, entryId),
Commands.newSendReceipt(producer.producerId, callBackSequenceId, ledgerId, entryId),
producer.cnx.ctx().voidPromise());
producer.cnx.completedSendOperation(producer.isNonPersistentTopic);
producer.publishOperationCompleted();
Expand All @@ -347,7 +395,22 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn
callback.msgSize = msgSize;
callback.batchSize = batchSize;
callback.originalProducerName = null;
callback.originalSequenceId = -1;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
return callback;
}

static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn,
int msgSize, long batchSize, long startTimeNs) {
MessagePublishContext callback = RECYCLER.get();
callback.producer = producer;
callback.sequenceId = lowestSequenceId;
callback.highestSequenceId = highestSequenceId;
callback.rateIn = rateIn;
callback.msgSize = msgSize;
callback.batchSize = batchSize;
callback.originalProducerName = null;
callback.originalSequenceId = -1L;
callback.startTimeNs = startTimeNs;
return callback;
}
Expand All @@ -366,13 +429,16 @@ protected MessagePublishContext newObject(Recycler.Handle<MessagePublishContext>

public void recycle() {
producer = null;
sequenceId = -1;
sequenceId = -1L;
highestSequenceId = -1L;
originalSequenceId = -1L;
originalHighestSequenceId = -1L;
rateIn = null;
msgSize = 0;
ledgerId = -1;
entryId = -1;
batchSize = 0;
startTimeNs = -1;
ledgerId = -1L;
entryId = -1L;
batchSize = 0L;
startTimeNs = -1L;
recyclerHandle.recycle(this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1051,7 +1051,12 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
startSendOperation(producer);

// Persist the message
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages());
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(),
headersAndPayload, send.getNumMessages());
} else {
producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages());
}
}

private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ default String getProducerName() {
}

default long getSequenceId() {
return -1;
return -1L;
}

default void setOriginalProducerName(String originalProducerName) {
Expand All @@ -73,10 +73,22 @@ default String getOriginalProducerName() {
}

default long getOriginalSequenceId() {
return -1;
return -1L;
}

void completed(Exception e, long ledgerId, long entryId);

default long getHighestSequenceId() {
return -1L;
}

default void setOriginalHighestSequenceId(long originalHighestSequenceId) {

}

default long getOriginalHighestSequenceId() {
return -1L;
}
}

void publishMessage(ByteBuf headersAndPayload, PublishContext callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
MessageMetadata md = Commands.parseMessageMetadata(messageMetadataAndPayload);

String producerName = md.getProducerName();
long sequenceId = md.getSequenceId();
long sequenceId = Math.max(md.getHighestSequenceId(), md.getSequenceId());
highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, sequenceId);

Expand Down Expand Up @@ -283,15 +283,18 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade

String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
long highestSequenceId = Math.max(publishContext.getHighestSequenceId(), sequenceId);
if (producerName.startsWith(replicatorPrefix)) {
// Message is coming from replication, we need to use the original producer name and sequence id
// for the purpose of deduplication and not rely on the "replicator" name.
int readerIndex = headersAndPayload.readerIndex();
MessageMetadata md = Commands.parseMessageMetadata(headersAndPayload);
producerName = md.getProducerName();
sequenceId = md.getSequenceId();
highestSequenceId = Math.max(md.getHighestSequenceId(), sequenceId);
publishContext.setOriginalProducerName(producerName);
publishContext.setOriginalSequenceId(sequenceId);
publishContext.setOriginalHighestSequenceId(highestSequenceId);
headersAndPayload.readerIndex(readerIndex);
md.recycle();
}
Expand All @@ -317,8 +320,7 @@ public MessageDupStatus isDuplicate(PublishContext publishContext, ByteBuf heade
return MessageDupStatus.Unknown;
}
}

highestSequencedPushed.put(producerName, sequenceId);
highestSequencedPushed.put(producerName, highestSequenceId);
}
return MessageDupStatus.NotDup;
}
Expand All @@ -333,13 +335,15 @@ public void recordMessagePersisted(PublishContext publishContext, PositionImpl p

String producerName = publishContext.getProducerName();
long sequenceId = publishContext.getSequenceId();
long highestSequenceId = publishContext.getHighestSequenceId();
if (publishContext.getOriginalProducerName() != null) {
// In case of replicated messages, this will be different from the current replicator producer name
producerName = publishContext.getOriginalProducerName();
sequenceId = publishContext.getOriginalSequenceId();
highestSequenceId = publishContext.getOriginalHighestSequenceId();
}

highestSequencedPersisted.put(producerName, sequenceId);
highestSequencedPersisted.put(producerName, Math.max(highestSequenceId, sequenceId));
if (++snapshotCounter >= snapshotInterval) {
snapshotCounter = 0;
takeSnapshot(position);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
Expand Down Expand Up @@ -377,7 +378,7 @@ private static final class ProducerSendCallback implements SendCallback {

@Override
public void sendComplete(Exception exception) {
if (exception != null) {
if (exception != null && !(exception instanceof PulsarClientException.InvalidMessageException)) {
log.error("[{}][{} -> {}] Error producing on remote broker", replicator.topicName,
replicator.localCluster, replicator.remoteCluster, exception);
// cursor shoud be rewinded since it was incremented when readMoreEntries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

@Slf4j
Expand Down Expand Up @@ -125,6 +126,24 @@ public void testIsDuplicate() {
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertTrue(lastSequenceIdPushed != null);
assertEquals(lastSequenceIdPushed.longValue(), 5);

// update highest sequence persisted
messageDeduplication.highestSequencedPushed.put(producerName1, 0L);
messageDeduplication.highestSequencedPersisted.put(producerName1, 0L);
byteBuf1 = getMessage(producerName1, 0);
publishContext1 = getPublishContext(producerName1, 1, 5);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.NotDup);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);

publishContext1 = getPublishContext(producerName1, 4, 8);
status = messageDeduplication.isDuplicate(publishContext1, byteBuf1);
assertEquals(status, MessageDeduplication.MessageDupStatus.Unknown);
lastSequenceIdPushed = messageDeduplication.highestSequencedPushed.get(producerName1);
assertNotNull(lastSequenceIdPushed);
assertEquals(lastSequenceIdPushed.longValue(), 5);
}

@Test
Expand Down Expand Up @@ -319,4 +338,28 @@ public void completed(Exception e, long ledgerId, long entryId) {
}
});
}

public Topic.PublishContext getPublishContext(String producerName, long seqId, long lastSequenceId) {
return spy(new Topic.PublishContext() {
@Override
public String getProducerName() {
return producerName;
}

@Override
public long getSequenceId() {
return seqId;
}

@Override
public long getHighestSequenceId() {
return lastSequenceId;
}

@Override
public void completed(Exception e, long ledgerId, long entryId) {

}
});
}
}
Loading

0 comments on commit d3cb108

Please sign in to comment.