Skip to content

Commit

Permalink
[fix][Java Client] Fix thread safety issue of LastCumulativeAck (ap…
Browse files Browse the repository at this point in the history
…ache#16072)

### Motivation

There were several issues caused by the thread safe issue of
`LastCumulativeAck`, see:
- apache#10586
- apache#12343

The root cause is that `LastCumulativeAck` could be accessed by
different threads, especially in `flushAsync` method. But the fields are
accessed directly and no thread safety can be guaranteed.

In addition, the current `LastCumulativeAck` class  was added in
apache#8996 to hold two object
references, but this modification is wrong.

Before apache#8996, there are two CAS operations in `doCumulativeAck` method
in case it's called concurretly. Though the composite CAS operation is
not atomic.

However, after apache#8996, only CAS operation was performed but it's compared
with a `LastCumulativeAck` object, not the two fields (`messageId` and
`bitSetRecyclable`).

There is another issue that it uses a flag `cumulativeAckFlushRequired`
to mark if `lastCumulativeAck` should flush. However, if `flushAsync`
was called concurrently, both would send ACK commands to broker.

### Modifications

To solve the thread safety issue, this PR move the `LastCumulativeAck`
out of the `PersistentAcknowledgmentsGroupingTracker` to disable
directly access to the internal fields. Then, the following synchronized
methods were added to guarantee the thread safety:
- `update`: Guarantee the safe write operations. It also recycles the
  `BitSetRecyclable` object before assigning new values and indicates
  itself can be flushed.
- `flush`: If it can be flushed, return a thread local
  `LastCumulativeAck` instance that contains the message ID and the bit
  set. The bit set is deep copied to avoid the original reference being
  recycled in another `update` call.

In addition, since the `messageId` field is volatile, the `getMessageId`
method can always retrieve the latest reference.

`LastCumulativeAckTest` is added to verify the sematics above.

Based on the new design, we can only maintain a `LastCumulativeAck`
field in `PersistentAcknowledgmentsGroupingTracker` and call the related
methods in `doCumulativeAck` and `flushAsync`. It also fixes the problem
that two concurrent `flushAsync` calls might send the same ACK command
twice.
  • Loading branch information
BewareMyPower authored Jun 22, 2022
1 parent c8f03e8 commit 936d6fd
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Recycler;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand All @@ -34,9 +34,8 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.NonNull;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -68,18 +67,11 @@ public class PersistentAcknowledgmentsGroupingTracker implements Acknowledgments
private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;

private volatile LastCumulativeAck lastCumulativeAck =
LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);

private volatile boolean cumulativeAckFlushRequired = false;
private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();

// When we flush the command, we should ensure current ack request will send correct
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

private static final AtomicReferenceFieldUpdater<PersistentAcknowledgmentsGroupingTracker, LastCumulativeAck>
LAST_CUMULATIVE_ACK_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
PersistentAcknowledgmentsGroupingTracker.class, LastCumulativeAck.class, "lastCumulativeAck");

/**
* This is a set of all the individual acks that the application has issued and that were not already sent to
* broker.
Expand Down Expand Up @@ -116,13 +108,13 @@ public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, Consum
* resent after a disconnection and for which the user has already sent an acknowledgement.
*/
@Override
public boolean isDuplicate(@NonNull MessageId messageId) {
final MessageId messageIdOfLastAck = lastCumulativeAck.messageId;
public boolean isDuplicate(MessageId messageId) {
final MessageIdImpl messageIdOfLastAck = lastCumulativeAck.getMessageId();
if (messageIdOfLastAck != null && messageId.compareTo(messageIdOfLastAck) <= 0) {
// Already included in a cumulative ack
return true;
} else {
return pendingIndividualAcks.contains(messageId);
return pendingIndividualAcks.contains((MessageIdImpl) messageId);
}
}

Expand Down Expand Up @@ -370,30 +362,7 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {

private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
// Handle concurrent updates from different threads
LastCumulativeAck currentCumulativeAck = LastCumulativeAck.create(msgId, bitSet);
while (true) {
LastCumulativeAck lastCumulativeAck = this.lastCumulativeAck;
if (msgId.compareTo(lastCumulativeAck.messageId) > 0) {
if (LAST_CUMULATIVE_ACK_UPDATER.compareAndSet(this, this.lastCumulativeAck, currentCumulativeAck)) {
if (lastCumulativeAck.bitSetRecyclable != null) {
try {
lastCumulativeAck.bitSetRecyclable.recycle();
} catch (Exception ignore) {
// no-op
}
lastCumulativeAck.bitSetRecyclable = null;
}
lastCumulativeAck.recycle();
// Successfully updated the last cumulative ack. Next flush iteration will send this to broker.
cumulativeAckFlushRequired = true;
return;
}
} else {
currentCumulativeAck.recycle();
// message id acknowledging an before the current last cumulative ack
return;
}
}
lastCumulativeAck.update(msgId, bitSet);
}

private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
Expand Down Expand Up @@ -474,15 +443,15 @@ public void flush() {
}

private void flushAsync(ClientCnx cnx) {
final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
boolean shouldFlush = false;
if (cumulativeAckFlushRequired) {
newMessageAckCommandAndWrite(cnx, consumer.consumerId, lastCumulativeAck.messageId.ledgerId,
lastCumulativeAck.messageId.getEntryId(), lastCumulativeAck.bitSetRecyclable,
AckType.Cumulative, null, Collections.emptyMap(), false,
this.currentCumulativeAckFuture, null);
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(lastCumulativeAck.messageId);
if (lastCumulativeAckToFlush != null) {
shouldFlush = true;
cumulativeAckFlushRequired = false;
final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId();
newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(),
lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, null,
Collections.emptyMap(), false, this.currentCumulativeAckFuture, null);
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
}

// Flush all individual acks
Expand Down Expand Up @@ -560,7 +529,7 @@ private void flushAsync(ClientCnx cnx) {
@Override
public void flushAndClean() {
flush();
lastCumulativeAck = LastCumulativeAck.create((MessageIdImpl) MessageIdImpl.earliest, null);
lastCumulativeAck.reset();
pendingIndividualAcks.clear();
}

Expand Down Expand Up @@ -664,36 +633,72 @@ private boolean isAckReceiptEnabled(ClientCnx cnx) {
return ackReceiptEnabled && cnx != null
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}
}

private static class LastCumulativeAck {
private MessageIdImpl messageId;
private BitSetRecyclable bitSetRecyclable;
@Getter
class LastCumulativeAck {

static LastCumulativeAck create(MessageIdImpl messageId, BitSetRecyclable bitSetRecyclable) {
LastCumulativeAck op = RECYCLER.get();
op.messageId = messageId;
op.bitSetRecyclable = bitSetRecyclable;
return op;
}
// It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
new FastThreadLocal<LastCumulativeAck>() {

private LastCumulativeAck(Recycler.Handle<LastCumulativeAck> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
@Override
protected LastCumulativeAck initialValue() {
return new LastCumulativeAck();
}
};
public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;

void recycle() {
if (bitSetRecyclable != null) {
private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
private BitSetRecyclable bitSetRecyclable = null;
private boolean flushRequired = false;

public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
if (messageId.compareTo(this.messageId) > 0) {
if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
this.bitSetRecyclable.recycle();
}
this.messageId = null;
recyclerHandle.recycle(this);
set(messageId, bitSetRecyclable);
flushRequired = true;
}
}

private final Recycler.Handle<LastCumulativeAck> recyclerHandle;
private static final Recycler<LastCumulativeAck> RECYCLER = new Recycler<LastCumulativeAck>() {
@Override
protected LastCumulativeAck newObject(Handle<LastCumulativeAck> handle) {
return new LastCumulativeAck(handle);
public synchronized LastCumulativeAck flush() {
if (flushRequired) {
final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
if (bitSetRecyclable != null) {
localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));
} else {
localLastCumulativeAck.set(this.messageId, null);
}
};
flushRequired = false;
return localLastCumulativeAck;
} else {
// Return null to indicate nothing to be flushed
return null;
}
}

public synchronized void reset() {
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
messageId = DEFAULT_MESSAGE_ID;
bitSetRecyclable = null;
flushRequired = false;
}

private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
this.messageId = messageId;
this.bitSetRecyclable = bitSetRecyclable;
}

@Override
public String toString() {
String s = messageId.toString();
if (bitSetRecyclable != null) {
s += " (bit set: " + bitSetRecyclable + ")";
}
return s;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.testng.annotations.Test;

public class LastCumulativeAckTest {

@Test
public void testUpdate() {
final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
assertFalse(lastCumulativeAck.isFlushRequired());
assertEquals(lastCumulativeAck.getMessageId(), LastCumulativeAck.DEFAULT_MESSAGE_ID);
assertNull(lastCumulativeAck.getBitSetRecyclable());

final MessageIdImpl messageId1 = new MessageIdImpl(0L, 1L, 10);
final BitSetRecyclable bitSetRecyclable1 = BitSetRecyclable.create();
bitSetRecyclable1.set(0, 3);
lastCumulativeAck.update(messageId1, bitSetRecyclable1);
assertTrue(lastCumulativeAck.isFlushRequired());
assertSame(lastCumulativeAck.getMessageId(), messageId1);
assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);

final MessageIdImpl messageId2 = new MessageIdImpl(0L, 2L, 8);
lastCumulativeAck.update(messageId2, bitSetRecyclable1);
// bitSetRecyclable1 is not recycled
assertEquals(bitSetRecyclable1.toString(), "{0, 1, 2}");

final BitSetRecyclable bitSetRecyclable2 = BitSetRecyclable.create();
bitSetRecyclable2.set(0, 2);

// `update()` only accepts a newer message ID, so this call here has no side effect
lastCumulativeAck.update(messageId2, bitSetRecyclable2);
assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable1);

final MessageIdImpl messageId3 = new MessageIdImpl(0L, 3L, 9);
lastCumulativeAck.update(messageId3, bitSetRecyclable2);
// bitSetRecyclable1 is recycled because it's replaced in `update`
assertEquals(bitSetRecyclable1.toString(), "{}");
assertSame(lastCumulativeAck.getMessageId(), messageId3);
assertSame(lastCumulativeAck.getBitSetRecyclable(), bitSetRecyclable2);
bitSetRecyclable2.recycle();
}

@Test
public void testFlush() {
final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
assertNull(lastCumulativeAck.flush());

final MessageIdImpl messageId = new MessageIdImpl(0L, 1L, 3);
final BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.set(0, 3);
lastCumulativeAck.update(messageId, bitSetRecyclable);
assertTrue(lastCumulativeAck.isFlushRequired());

final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
assertFalse(lastCumulativeAck.isFlushRequired());
assertSame(lastCumulativeAckToFlush.getMessageId(), messageId);
assertNotSame(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
assertEquals(lastCumulativeAckToFlush.getBitSetRecyclable(), bitSetRecyclable);
}

}

0 comments on commit 936d6fd

Please sign in to comment.