Skip to content

Commit

Permalink
[Transaction] Fix subscription ack transaction marker. (apache#14170)
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 authored Feb 9, 2022
1 parent 5886327 commit 603d252
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ public int filterEntriesForConsumer(Optional<EntryWrapper[]> entryWrapper, int e
if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
// because consumer can receive message is smaller than maxReadPosition,
// so this marker is useless for this subscription
subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual,
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,13 @@
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -116,27 +113,16 @@ public class PersistentSubscription implements Subscription {
private static final Map<String, Long> NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap();

private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
private volatile Position lastMarkDeleteForTransactionMarker;
private final PendingAckHandle pendingAckHandle;
private Map<String, String> subscriptionProperties;

private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
private final LongAdder msgOutFromRemovedConsumer = new LongAdder();

private DeleteTransactionMarkerState deleteTransactionMarkerState = DeleteTransactionMarkerState.None;

private final Object waitObject = new Object();

static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}

public enum DeleteTransactionMarkerState {
Process,
Wait,
None
}

static Map<String, Long> getBaseCursorProperties(boolean isReplicated) {
return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES;
}
Expand Down Expand Up @@ -428,8 +414,6 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
}
}

deleteTransactionMarker(properties);

if (topic.getManagedLedger().isTerminated() && cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Notify all consumer that the end of topic was reached
if (dispatcher != null) {
Expand All @@ -438,73 +422,6 @@ public void acknowledgeMessage(List<Position> positions, AckType ackType, Map<St
}
}

private void deleteTransactionMarker(Map<String, Long> properties) {

if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
PositionImpl currentMarkDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition();
if ((lastMarkDeleteForTransactionMarker == null
|| ((PositionImpl) lastMarkDeleteForTransactionMarker)
.compareTo(currentMarkDeletePosition) < 0)) {
if (currentMarkDeletePosition != null) {
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
PositionImpl nextPosition = managedLedger.getNextValidPosition(currentMarkDeletePosition);
if (nextPosition != null
&& nextPosition.compareTo((PositionImpl) managedLedger.getLastConfirmedEntry()) <= 0) {
synchronized (waitObject) {
if (deleteTransactionMarkerState == DeleteTransactionMarkerState.None) {
deleteTransactionMarkerState = DeleteTransactionMarkerState.Process;
managedLedger.asyncReadEntry(nextPosition, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
MessageMetadata messageMetadata =
Commands.parseMessageMetadata(entry.getDataBuffer());
if (Markers.isTxnCommitMarker(messageMetadata)
|| Markers.isTxnAbortMarker(messageMetadata)) {
synchronized (waitObject) {
deleteTransactionMarkerState = DeleteTransactionMarkerState.None;
}
lastMarkDeleteForTransactionMarker = currentMarkDeletePosition;
acknowledgeMessage(Collections.singletonList(nextPosition),
AckType.Individual, properties);
} else {
synchronized (waitObject) {
if (deleteTransactionMarkerState
== DeleteTransactionMarkerState.Wait) {
deleteTransactionMarkerState =
DeleteTransactionMarkerState.None;
deleteTransactionMarker(properties);
} else {
deleteTransactionMarkerState =
DeleteTransactionMarkerState.None;
}
}
}
} finally {
entry.release();
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
synchronized (waitObject) {
deleteTransactionMarkerState =
DeleteTransactionMarkerState.None;
}
log.error("Fail to read transaction marker! Position : {}",
currentMarkDeletePosition, exception);
}
}, null);
} else if (deleteTransactionMarkerState == DeleteTransactionMarkerState.Process) {
deleteTransactionMarkerState = DeleteTransactionMarkerState.Wait;
}
}
}
}
}
}
}

public CompletableFuture<Void> transactionIndividualAcknowledge(
TxnID txnId,
List<MutablePair<PositionImpl, Integer>> positions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,39 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import static org.testng.Assert.assertNull;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Sets;

@Test(groups = "broker")
public class TransactionMarkerDeleteTest extends BrokerTestBase {
public class TransactionMarkerDeleteTest extends TransactionTestBase {

private static final int TOPIC_PARTITION = 3;
private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
private static final int NUM_PARTITIONS = 16;
@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setTransactionCoordinatorEnabled(true);
super.baseSetup();
admin.tenants().createTenant("public",
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));

admin.namespaces().createNamespace("public/default");
setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION);
}

@AfterMethod(alwaysRun = true)
Expand All @@ -75,7 +69,8 @@ protected void cleanup() throws Exception {

@Test
public void testMarkerDeleteTimes() throws Exception {
ManagedLedgerImpl managedLedger = spy((ManagedLedgerImpl) pulsar.getManagedLedgerFactory().open("test"));
ManagedLedgerImpl managedLedger =
spy((ManagedLedgerImpl) getPulsarServiceList().get(0).getManagedLedgerFactory().open("test"));
PersistentTopic topic = mock(PersistentTopic.class);
BrokerService brokerService = mock(BrokerService.class);
PulsarService pulsarService = mock(PulsarService.class);
Expand All @@ -86,8 +81,8 @@ public void testMarkerDeleteTimes() throws Exception {
doReturn(false).when(configuration).isTransactionCoordinatorEnabled();
doReturn(managedLedger).when(topic).getManagedLedger();
ManagedCursor cursor = managedLedger.openCursor("test");
PersistentSubscription persistentSubscription = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test",
cursor, false);
PersistentSubscription persistentSubscription =
spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false);
Position position = managedLedger.addEntry("test".getBytes());
persistentSubscription.acknowledgeMessage(Collections.singletonList(position),
AckType.Individual, Collections.emptyMap());
Expand All @@ -97,84 +92,74 @@ public void testMarkerDeleteTimes() throws Exception {

@Test
public void testMarkerDelete() throws Exception {

MessageMetadata msgMetadata = new MessageMetadata().clear()
.setPublishTime(1)
.setProducerName("test")
.setSequenceId(0);

ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);

payload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
msgMetadata, payload);

ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test");
PersistentTopic topic = mock(PersistentTopic.class);
doReturn(pulsar.getBrokerService()).when(topic).getBrokerService();
doReturn(managedLedger).when(topic).getManagedLedger();
doReturn("test").when(topic).getName();
ManagedCursor cursor = managedLedger.openCursor("test");
PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test",
managedLedger.openCursor("test"), false);

byte[] payloadBytes = toBytes(payload);
Position position1 = managedLedger.addEntry(payloadBytes);
Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
.newTxnCommitMarker(1, 1, 1)));

Position position2 = managedLedger.addEntry(payloadBytes);
Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
.newTxnAbortMarker(1, 1, 1)));

Position position3 = managedLedger.addEntry(payloadBytes);

assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);

// ack position1, markerDeletePosition to markerPosition1
persistentSubscription.acknowledgeMessage(Collections.singletonList(position1),
AckType.Individual, Collections.emptyMap());

// ack position1, markerDeletePosition to markerPosition1
Awaitility.await().during(1, TimeUnit.SECONDS).until(() ->
((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
.compareTo((PositionImpl) markerPosition1) == 0);

// ack position2, markerDeletePosition to markerPosition2
persistentSubscription.acknowledgeMessage(Collections.singletonList(position2),
AckType.Individual, Collections.emptyMap());

Awaitility.await().until(() ->
((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
.compareTo((PositionImpl) markerPosition2) == 0);

// add consequent marker
managedLedger.addEntry(toBytes(Markers
.newTxnCommitMarker(1, 1, 1)));

managedLedger.addEntry(toBytes(Markers
.newTxnAbortMarker(1, 1, 1)));

Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
.newTxnAbortMarker(1, 1, 1)));

// ack with transaction, then commit this transaction
persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0),
Collections.singletonList(MutablePair.of((PositionImpl) position3, 0))).get();

persistentSubscription.endTxn(0, 0, 0, 0).get();

// ack with transaction, then commit this transaction
Awaitility.await().until(() ->
((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition())
.compareTo((PositionImpl) markerPosition3) == 0);

final String subName = "testMarkerDelete";
final String topicName = NAMESPACE1 + "/testMarkerDelete";
@Cleanup
Consumer<byte[]> consumer = pulsarClient
.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.isAckReceiptEnabled(true)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

Producer<byte[]> producer = pulsarClient
.newProducer()
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topicName)
.create();

Transaction txn1 = getTxn();
Transaction txn2 = getTxn();
Transaction txn3 = getTxn();
Transaction txn4 = getTxn();

MessageIdImpl msgId1 = (MessageIdImpl) producer.newMessage(txn1).send();
MessageIdImpl msgId2 = (MessageIdImpl) producer.newMessage(txn2).send();
assertNull(consumer.receive(1, TimeUnit.SECONDS));
txn1.commit().get();

consumer.acknowledgeAsync(consumer.receive()).get();
assertNull(consumer.receive(1, TimeUnit.SECONDS));

// maxReadPosition move to msgId1, msgId2 have not be committed
assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
PositionImpl.get(msgId1.getLedgerId(), msgId1.getEntryId()).toString());

MessageIdImpl msgId3 = (MessageIdImpl) producer.newMessage(txn3).send();
txn2.commit().get();

consumer.acknowledgeAsync(consumer.receive()).get();
assertNull(consumer.receive(1, TimeUnit.SECONDS));

// maxReadPosition move to txn1 marker, so entryId is msgId2.getEntryId() + 1,
// because send msgId2 before commit txn1
assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
PositionImpl.get(msgId2.getLedgerId(), msgId2.getEntryId() + 1).toString());

MessageIdImpl msgId4 = (MessageIdImpl) producer.newMessage(txn4).send();
txn3.commit().get();

consumer.acknowledgeAsync(consumer.receive()).get();
assertNull(consumer.receive(1, TimeUnit.SECONDS));

// maxReadPosition move to txn2 marker, because msgId4 have not be committed
assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition,
PositionImpl.get(msgId3.getLedgerId(), msgId3.getEntryId() + 1).toString());

txn4.abort().get();

// maxReadPosition move to txn4 abort marker, so entryId is msgId4.getEntryId() + 2
Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getInternalStats(topicName)
.cursors.get(subName).markDeletePosition, PositionImpl.get(msgId4.getLedgerId(),
msgId4.getEntryId() + 2).toString()));
}

static byte[] toBytes(ByteBuf byteBuf) {
byte[] buf = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(buf);
byteBuf.release();
return buf;
private Transaction getTxn() throws Exception {
return pulsarClient
.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
.build()
.get();
}
}

0 comments on commit 603d252

Please sign in to comment.