Skip to content

Commit

Permalink
[Transaction] Fix transaction buffer delete marker problem. (apache#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
congbobo184 authored May 11, 2021
1 parent 21c9c81 commit 7c4aecc
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -382,29 +382,33 @@ private void deleteTransactionMarker(PositionImpl position, AckType ackType, Map
if (position != null) {
ManagedLedgerImpl managedLedger = ((ManagedLedgerImpl) cursor.getManagedLedger());
PositionImpl nextPosition = managedLedger.getNextValidPosition(position);
managedLedger.asyncReadEntry(nextPosition, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
MessageMetadata messageMetadata = Commands.parseMessageMetadata(entry.getDataBuffer());
isDeleteTransactionMarkerInProcess = false;
if (Markers.isTxnCommitMarker(messageMetadata) || Markers.isTxnAbortMarker(messageMetadata)) {
lastMarkDeleteForTransactionMarker = position;
acknowledgeMessage(Collections.singletonList(nextPosition), ackType, properties);
if (nextPosition != null
&& nextPosition.compareTo((PositionImpl) managedLedger.getLastConfirmedEntry()) <= 0) {
managedLedger.asyncReadEntry(nextPosition, new ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
MessageMetadata messageMetadata = Commands.parseMessageMetadata(entry.getDataBuffer());
isDeleteTransactionMarkerInProcess = false;
if (Markers.isTxnCommitMarker(messageMetadata)
|| Markers.isTxnAbortMarker(messageMetadata)) {
lastMarkDeleteForTransactionMarker = position;
acknowledgeMessage(Collections.singletonList(nextPosition), ackType, properties);
}
} finally {
entry.release();
}
} finally {
entry.release();
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
isDeleteTransactionMarkerInProcess = false;
if (log.isDebugEnabled()) {
log.debug("Fail to read transaction marker! Position : {}", position, exception);
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
isDeleteTransactionMarkerInProcess = false;
log.error("Fail to read transaction marker! Position : {}", position, exception);
}
}
}, null);
}, null);
} else {
isDeleteTransactionMarkerInProcess = false;
}
} else {
isDeleteTransactionMarkerInProcess = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import java.util.Collections;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -84,4 +89,25 @@ public void testTransactionMarkerDelete() throws Exception {
assertEquals(((PositionImpl) persistentSubscription.getCursor()
.getMarkDeletedPosition()).compareTo((PositionImpl) position3), 0);
}

@Test
public void testMarkerDeleteTimes() throws Exception {
ManagedLedgerImpl managedLedger = spy((ManagedLedgerImpl) pulsar.getManagedLedgerFactory().open("test"));
PersistentTopic topic = mock(PersistentTopic.class);
BrokerService brokerService = mock(BrokerService.class);
PulsarService pulsarService = mock(PulsarService.class);
ServiceConfiguration configuration = mock(ServiceConfiguration.class);
doReturn(brokerService).when(topic).getBrokerService();
doReturn(pulsarService).when(brokerService).getPulsar();
doReturn(configuration).when(pulsarService).getConfig();
doReturn(true).when(configuration).isTransactionCoordinatorEnabled();
doReturn(managedLedger).when(topic).getManagedLedger();
ManagedCursor cursor = managedLedger.openCursor("test");
PersistentSubscription persistentSubscription = spy(new PersistentSubscription(topic, "test",
cursor, false));
Position position = managedLedger.addEntry("test".getBytes());
persistentSubscription.acknowledgeMessage(Collections.singletonList(position),
AckType.Individual, Collections.emptyMap());
verify(managedLedger, times(0)).asyncReadEntry(any(), any(), any());
}
}

0 comments on commit 7c4aecc

Please sign in to comment.