From 603d252ed6e405d2015d2d0fb73047bb9a96b268 Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Wed, 9 Feb 2022 15:41:48 +0800 Subject: [PATCH] [Transaction] Fix subscription ack transaction marker. (#14170) --- .../service/AbstractBaseDispatcher.java | 4 + .../persistent/PersistentSubscription.java | 83 -------- .../service/TransactionMarkerDeleteTest.java | 183 ++++++++---------- 3 files changed, 88 insertions(+), 182 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 64bc61103c5f4..ec51fecccfe00 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -160,6 +160,10 @@ public int filterEntriesForConsumer(Optional entryWrapper, int e if (!isReplayRead && msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) { if (Markers.isTxnMarker(msgMetadata)) { + // because consumer can receive message is smaller than maxReadPosition, + // so this marker is useless for this subscription + subscription.acknowledgeMessage(Collections.singletonList(entry.getPosition()), AckType.Individual, + Collections.emptyMap()); entries.set(i, null); entry.release(); continue; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 074181c93a175..80c7869906c1f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -71,7 +71,6 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.KeySharedMeta; -import org.apache.pulsar.common.api.proto.MessageMetadata; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.TopicName; @@ -79,8 +78,6 @@ import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl; -import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.util.FutureUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,27 +113,16 @@ public class PersistentSubscription implements Subscription { private static final Map NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES = Collections.emptyMap(); private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache; - private volatile Position lastMarkDeleteForTransactionMarker; private final PendingAckHandle pendingAckHandle; private Map subscriptionProperties; private final LongAdder bytesOutFromRemovedConsumers = new LongAdder(); private final LongAdder msgOutFromRemovedConsumer = new LongAdder(); - private DeleteTransactionMarkerState deleteTransactionMarkerState = DeleteTransactionMarkerState.None; - - private final Object waitObject = new Object(); - static { REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L); } - public enum DeleteTransactionMarkerState { - Process, - Wait, - None - } - static Map getBaseCursorProperties(boolean isReplicated) { return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; } @@ -428,8 +414,6 @@ public void acknowledgeMessage(List positions, AckType ackType, Map positions, AckType ackType, Map properties) { - - if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()) { - PositionImpl currentMarkDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); - if ((lastMarkDeleteForTransactionMarker == null - || ((PositionImpl) lastMarkDeleteForTransactionMarker) - .compareTo(currentMarkDeletePosition) < 0)) { - if (currentMarkDeletePosition != null) { - ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger()); - PositionImpl nextPosition = managedLedger.getNextValidPosition(currentMarkDeletePosition); - if (nextPosition != null - && nextPosition.compareTo((PositionImpl) managedLedger.getLastConfirmedEntry()) <= 0) { - synchronized (waitObject) { - if (deleteTransactionMarkerState == DeleteTransactionMarkerState.None) { - deleteTransactionMarkerState = DeleteTransactionMarkerState.Process; - managedLedger.asyncReadEntry(nextPosition, new ReadEntryCallback() { - @Override - public void readEntryComplete(Entry entry, Object ctx) { - try { - MessageMetadata messageMetadata = - Commands.parseMessageMetadata(entry.getDataBuffer()); - if (Markers.isTxnCommitMarker(messageMetadata) - || Markers.isTxnAbortMarker(messageMetadata)) { - synchronized (waitObject) { - deleteTransactionMarkerState = DeleteTransactionMarkerState.None; - } - lastMarkDeleteForTransactionMarker = currentMarkDeletePosition; - acknowledgeMessage(Collections.singletonList(nextPosition), - AckType.Individual, properties); - } else { - synchronized (waitObject) { - if (deleteTransactionMarkerState - == DeleteTransactionMarkerState.Wait) { - deleteTransactionMarkerState = - DeleteTransactionMarkerState.None; - deleteTransactionMarker(properties); - } else { - deleteTransactionMarkerState = - DeleteTransactionMarkerState.None; - } - } - } - } finally { - entry.release(); - } - } - - @Override - public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - synchronized (waitObject) { - deleteTransactionMarkerState = - DeleteTransactionMarkerState.None; - } - log.error("Fail to read transaction marker! Position : {}", - currentMarkDeletePosition, exception); - } - }, null); - } else if (deleteTransactionMarkerState == DeleteTransactionMarkerState.Process) { - deleteTransactionMarkerState = DeleteTransactionMarkerState.Wait; - } - } - } - } - } - } - } - public CompletableFuture transactionIndividualAcknowledge( TxnID txnId, List> positions) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java index 9c81a754c265a..aa2a8d49e9c95 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java @@ -26,45 +26,39 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; +import static org.testng.Assert.assertNull; import java.util.Collections; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.ManagedCursor; -import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; -import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.api.proto.CommandAck.AckType; -import org.apache.pulsar.common.api.proto.MessageMetadata; -import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.apache.pulsar.common.protocol.Commands; -import org.apache.pulsar.common.protocol.Markers; import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import org.testng.collections.Sets; @Test(groups = "broker") -public class TransactionMarkerDeleteTest extends BrokerTestBase { +public class TransactionMarkerDeleteTest extends TransactionTestBase { + private static final int TOPIC_PARTITION = 3; + private static final String TOPIC_OUTPUT = NAMESPACE1 + "/output"; + private static final int NUM_PARTITIONS = 16; @BeforeMethod - @Override protected void setup() throws Exception { - conf.setTransactionCoordinatorEnabled(true); - super.baseSetup(); - admin.tenants().createTenant("public", - new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test"))); - - admin.namespaces().createNamespace("public/default"); + setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, TOPIC_PARTITION); } @AfterMethod(alwaysRun = true) @@ -75,7 +69,8 @@ protected void cleanup() throws Exception { @Test public void testMarkerDeleteTimes() throws Exception { - ManagedLedgerImpl managedLedger = spy((ManagedLedgerImpl) pulsar.getManagedLedgerFactory().open("test")); + ManagedLedgerImpl managedLedger = + spy((ManagedLedgerImpl) getPulsarServiceList().get(0).getManagedLedgerFactory().open("test")); PersistentTopic topic = mock(PersistentTopic.class); BrokerService brokerService = mock(BrokerService.class); PulsarService pulsarService = mock(PulsarService.class); @@ -86,8 +81,8 @@ public void testMarkerDeleteTimes() throws Exception { doReturn(false).when(configuration).isTransactionCoordinatorEnabled(); doReturn(managedLedger).when(topic).getManagedLedger(); ManagedCursor cursor = managedLedger.openCursor("test"); - PersistentSubscription persistentSubscription = spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", - cursor, false); + PersistentSubscription persistentSubscription = + spyWithClassAndConstructorArgs(PersistentSubscription.class, topic, "test", cursor, false); Position position = managedLedger.addEntry("test".getBytes()); persistentSubscription.acknowledgeMessage(Collections.singletonList(position), AckType.Individual, Collections.emptyMap()); @@ -97,84 +92,74 @@ public void testMarkerDeleteTimes() throws Exception { @Test public void testMarkerDelete() throws Exception { - - MessageMetadata msgMetadata = new MessageMetadata().clear() - .setPublishTime(1) - .setProducerName("test") - .setSequenceId(0); - - ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0); - - payload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, - msgMetadata, payload); - - ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test"); - PersistentTopic topic = mock(PersistentTopic.class); - doReturn(pulsar.getBrokerService()).when(topic).getBrokerService(); - doReturn(managedLedger).when(topic).getManagedLedger(); - doReturn("test").when(topic).getName(); - ManagedCursor cursor = managedLedger.openCursor("test"); - PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test", - managedLedger.openCursor("test"), false); - - byte[] payloadBytes = toBytes(payload); - Position position1 = managedLedger.addEntry(payloadBytes); - Position markerPosition1 = managedLedger.addEntry(toBytes(Markers - .newTxnCommitMarker(1, 1, 1))); - - Position position2 = managedLedger.addEntry(payloadBytes); - Position markerPosition2 = managedLedger.addEntry(toBytes(Markers - .newTxnAbortMarker(1, 1, 1))); - - Position position3 = managedLedger.addEntry(payloadBytes); - - assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5); - assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0); - - // ack position1, markerDeletePosition to markerPosition1 - persistentSubscription.acknowledgeMessage(Collections.singletonList(position1), - AckType.Individual, Collections.emptyMap()); - - // ack position1, markerDeletePosition to markerPosition1 - Awaitility.await().during(1, TimeUnit.SECONDS).until(() -> - ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition()) - .compareTo((PositionImpl) markerPosition1) == 0); - - // ack position2, markerDeletePosition to markerPosition2 - persistentSubscription.acknowledgeMessage(Collections.singletonList(position2), - AckType.Individual, Collections.emptyMap()); - - Awaitility.await().until(() -> - ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition()) - .compareTo((PositionImpl) markerPosition2) == 0); - - // add consequent marker - managedLedger.addEntry(toBytes(Markers - .newTxnCommitMarker(1, 1, 1))); - - managedLedger.addEntry(toBytes(Markers - .newTxnAbortMarker(1, 1, 1))); - - Position markerPosition3 = managedLedger.addEntry(toBytes(Markers - .newTxnAbortMarker(1, 1, 1))); - - // ack with transaction, then commit this transaction - persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0), - Collections.singletonList(MutablePair.of((PositionImpl) position3, 0))).get(); - - persistentSubscription.endTxn(0, 0, 0, 0).get(); - - // ack with transaction, then commit this transaction - Awaitility.await().until(() -> - ((PositionImpl) persistentSubscription.getCursor().getMarkDeletedPosition()) - .compareTo((PositionImpl) markerPosition3) == 0); - + final String subName = "testMarkerDelete"; + final String topicName = NAMESPACE1 + "/testMarkerDelete"; + @Cleanup + Consumer consumer = pulsarClient + .newConsumer() + .topic(topicName) + .subscriptionName(subName) + .isAckReceiptEnabled(true) + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + Producer producer = pulsarClient + .newProducer() + .sendTimeout(0, TimeUnit.SECONDS) + .topic(topicName) + .create(); + + Transaction txn1 = getTxn(); + Transaction txn2 = getTxn(); + Transaction txn3 = getTxn(); + Transaction txn4 = getTxn(); + + MessageIdImpl msgId1 = (MessageIdImpl) producer.newMessage(txn1).send(); + MessageIdImpl msgId2 = (MessageIdImpl) producer.newMessage(txn2).send(); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + txn1.commit().get(); + + consumer.acknowledgeAsync(consumer.receive()).get(); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + // maxReadPosition move to msgId1, msgId2 have not be committed + assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition, + PositionImpl.get(msgId1.getLedgerId(), msgId1.getEntryId()).toString()); + + MessageIdImpl msgId3 = (MessageIdImpl) producer.newMessage(txn3).send(); + txn2.commit().get(); + + consumer.acknowledgeAsync(consumer.receive()).get(); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + // maxReadPosition move to txn1 marker, so entryId is msgId2.getEntryId() + 1, + // because send msgId2 before commit txn1 + assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition, + PositionImpl.get(msgId2.getLedgerId(), msgId2.getEntryId() + 1).toString()); + + MessageIdImpl msgId4 = (MessageIdImpl) producer.newMessage(txn4).send(); + txn3.commit().get(); + + consumer.acknowledgeAsync(consumer.receive()).get(); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + + // maxReadPosition move to txn2 marker, because msgId4 have not be committed + assertEquals(admin.topics().getInternalStats(topicName).cursors.get(subName).markDeletePosition, + PositionImpl.get(msgId3.getLedgerId(), msgId3.getEntryId() + 1).toString()); + + txn4.abort().get(); + + // maxReadPosition move to txn4 abort marker, so entryId is msgId4.getEntryId() + 2 + Awaitility.await().untilAsserted(() -> assertEquals(admin.topics().getInternalStats(topicName) + .cursors.get(subName).markDeletePosition, PositionImpl.get(msgId4.getLedgerId(), + msgId4.getEntryId() + 2).toString())); } - static byte[] toBytes(ByteBuf byteBuf) { - byte[] buf = new byte[byteBuf.readableBytes()]; - byteBuf.readBytes(buf); - byteBuf.release(); - return buf; + private Transaction getTxn() throws Exception { + return pulsarClient + .newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS) + .build() + .get(); } }