Skip to content

Commit

Permalink
[Transaction] Fix transaction log handle managed ledger WriteFail sta…
Browse files Browse the repository at this point in the history
…te. (apache#10711)

## Motivation
when transaction log managed ledger state become WriteFailed state, should `readyToCreateNewLedger`.

## implement
append fail check the managedLedger state and the exception do `readyToCreateNewLedger`
  • Loading branch information
congbobo184 authored May 27, 2021
1 parent 4b86c26 commit 55fde82
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
if (existingFuture.isDone()) {
try {
ManagedLedgerImpl l = existingFuture.get();
if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) {
if (l.getState() == State.Fenced || l.getState() == State.Closed) {
// Managed ledger is in unusable state. Recreate it.
log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it",
name, l.getState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;

enum State {
public enum State {
None, // Uninitialized
LedgerOpened, // A ledger is ready to write into
ClosingLedger, // Closing current ledger
Expand Down Expand Up @@ -3501,8 +3501,8 @@ public Position getLastConfirmedEntry() {
return lastConfirmedEntry;
}

public String getState() {
return STATE_UPDATER.get(this).toString();
public State getState() {
return STATE_UPDATER.get(this);
}

public ManagedLedgerMBeanImpl getMBean() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1826,7 +1826,7 @@ public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean
stats.pendingAddEntriesCount = ml.getPendingAddEntriesCount();

stats.lastConfirmedEntry = ml.getLastConfirmedEntry().toString();
stats.state = ml.getState();
stats.state = ml.getState().toString();

stats.ledgers = Lists.newArrayList();
List<CompletableFuture<String>> futures = includeLedgerMetadata ? Lists.newArrayList() : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
Expand Down Expand Up @@ -128,6 +130,11 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("Transaction log write transaction operation error", exception);
if (exception instanceof ManagedLedgerAlreadyClosedException
&& managedLedger instanceof ManagedLedgerImpl
&& State.WriteFailed == ((ManagedLedgerImpl) managedLedger).getState()) {
managedLedger.readyToCreateNewLedger();
}
buf.release();
completableFuture.completeExceptionally(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
Expand All @@ -43,9 +44,12 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {

Expand All @@ -70,7 +74,7 @@ public void testTransactionOperation() throws Exception {
while (true) {
checkReplayRetryCount++;
if (checkReplayRetryCount > 3) {
Assert.fail();
fail();
break;
}
if (transactionMetadataStore.checkIfReady()) {
Expand Down Expand Up @@ -105,7 +109,7 @@ public void testTransactionOperation() throws Exception {

try {
transactionMetadataStore.getTxnMeta(txnID).get();
Assert.fail();
fail();
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
}
Expand Down Expand Up @@ -183,7 +187,7 @@ public void testInitTransactionReader() throws Exception {
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
Assert.fail();
fail();
break;
}
if (transactionMetadataStore.checkIfReady()) {
Expand Down Expand Up @@ -223,7 +227,7 @@ public void testInitTransactionReader() throws Exception {

while (true) {
if (checkReplayRetryCount > 6) {
Assert.fail();
fail();
break;
}
if (transactionMetadataStoreTest.checkIfReady()) {
Expand All @@ -244,14 +248,14 @@ public void testInitTransactionReader() throws Exception {
.updateTxnStatus(txnID2, TxnStatus.COMMITTED, TxnStatus.COMMITTING, false).get();
try {
transactionMetadataStoreTest.getTxnMeta(txnID1).get();
Assert.fail();
fail();
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
}

try {
transactionMetadataStoreTest.getTxnMeta(txnID2).get();
Assert.fail();
fail();
} catch (ExecutionException e) {
Assert.assertTrue(e.getCause() instanceof TransactionNotFoundException);
}
Expand Down Expand Up @@ -287,7 +291,7 @@ public void testDeleteLog() throws Exception {
int checkReplayRetryCount = 0;
while (true) {
if (checkReplayRetryCount > 3) {
Assert.fail();
fail();
break;
}
if (transactionMetadataStore.checkIfReady()) {
Expand Down Expand Up @@ -367,6 +371,39 @@ public void testRecoverWhenDeleteFromCursor() throws Exception {
Awaitility.await().until(transactionMetadataStore::checkIfReady);
}

@Test
public void testManageLedgerWriteFailState() throws Exception {
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
TransactionCoordinatorID transactionCoordinatorID = new TransactionCoordinatorID(1);
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(transactionCoordinatorID, factory,
new ManagedLedgerConfig());
MLTransactionMetadataStore transactionMetadataStore =
new MLTransactionMetadataStore(transactionCoordinatorID, mlTransactionLog,
new TransactionTimeoutTrackerImpl(), new TransactionRecoverTrackerImpl());

Awaitility.await().until(transactionMetadataStore::checkIfReady);
transactionMetadataStore.newTransaction(5000).get();
Field field = MLTransactionLogImpl.class.getDeclaredField("managedLedger");
field.setAccessible(true);
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog);
field = ManagedLedgerImpl.class.getDeclaredField("STATE_UPDATER");
field.setAccessible(true);
AtomicReferenceFieldUpdater state = (AtomicReferenceFieldUpdater) field.get(managedLedger);
state.set(managedLedger, WriteFailed);
try {
transactionMetadataStore.newTransaction(5000).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException);
}
transactionMetadataStore.newTransaction(5000).get();

}

public class TransactionTimeoutTrackerImpl implements TransactionTimeoutTracker {

@Override
Expand Down

0 comments on commit 55fde82

Please sign in to comment.