Skip to content

Commit

Permalink
[Transaction] Tc recover handle transaction in committing and abortin…
Browse files Browse the repository at this point in the history
…g status . (apache#10179)

## Motivation
Now recover don't handle transaction in committing or aborting status, it only add to ```transactionTimeOutTracker```. 

## implement
Add ```TransactionRecoverTracker``` to handle different status transaction.

```
    /**
     * Handle recover transaction update status.
     * @param sequenceId {@link long} the sequenceId of this transaction.
     * @param txnStatus {@link long} the txn status of this operation.
     */
    void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws CoordinatorException.InvalidTxnStatusException;

    /**
     * Handle recover transaction in open status.
     * @param sequenceId {@link Long} the sequenceId of this transaction.
     * @param timeout {@link long} the timeout time of this transaction.
     */
    void handleOpenStatusTransaction(long sequenceId, long timeout);

    /**
     * Handle the transaction in open status append to transaction timeout tracker.
     */
    void appendOpenTransactionToTimeoutTracker();

    /**
     * Handle the transaction in committing and aborting.
     */
    void handleCommittingAndAbortingTransaction();
```
### Verifying this change
Add the tests for it

Does this pull request potentially affect one of the following parts:
If yes was chosen, please highlight the changes

Dependencies (does it add or upgrade a dependency): (no)
The public API: (no)
The schema: (no)
The default values of configurations: (no)
The wire protocol: (no)
The rest endpoints: (no)
The admin cli options: (no)
Anything that affects deployment: (no)
  • Loading branch information
congbobo184 authored Apr 19, 2021
1 parent f6b0293 commit 36c3bc3
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import org.apache.pulsar.broker.transaction.recover.TransactionRecoverTrackerImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
Expand All @@ -47,7 +48,9 @@
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
Expand Down Expand Up @@ -137,8 +140,12 @@ public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
if (e != null) {
LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
} else {
TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
TransactionRecoverTracker recoverTracker =
new TransactionRecoverTrackerImpl(TransactionMetadataStoreService.this,
timeoutTracker, tcId.getId());
transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
timeoutTrackerFactory.newTracker(tcId))
timeoutTracker, recoverTracker)
.whenComplete((store, ex) -> {
if (ex != null) {
LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.recover;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;


/**
* The transaction recover tracker implementation {@link TransactionRecoverTracker}.
*/
@Slf4j
public class TransactionRecoverTrackerImpl implements TransactionRecoverTracker {

private final long tcId;
private final TransactionMetadataStoreService transactionMetadataStoreService;
private final TransactionTimeoutTracker timeoutTracker;

/**
* This is for recover open status transaction. The key is this transaction's sequenceId, the value is this
* transaction timeout time.
* <p>
* When transaction update status to committing or aborting, it will be remove form this.
* <p>
* When transactionMetadataStore recover complete, the transaction don't update status, it will send all
* transaction to transactionTimeoutTracker.
*
*/
private final Map<Long, Long> openTransactions;

/**
* Update transaction to committing status.
* <p>
* When transaction update status to committing, it will be add in.
* <p>
* When transaction update status to committed status, the transaction will remove from it.
* <p>
* When transactionMetadataStore recover complete, all transaction in this will endTransaction by commit action.
*/
private final Set<Long> committingTransactions;

/**
* Update transaction to aborting status.
* <p>
* When transaction update status to aborting, it will be add in.
* <p>
* When transaction update status to aborted status, the transaction will remove from it.
* <p>
* When transactionMetadataStore recover complete, all transaction in this will endTransaction by abort action.
*/
private final Set<Long> abortingTransactions;

public TransactionRecoverTrackerImpl(TransactionMetadataStoreService transactionMetadataStoreService,
TransactionTimeoutTracker timeoutTracker, long tcId) {
this.tcId = tcId;
this.transactionMetadataStoreService = transactionMetadataStoreService;
this.openTransactions = new HashMap<>();
this.committingTransactions = new HashSet<>();
this.abortingTransactions = new HashSet<>();
this.timeoutTracker = timeoutTracker;
}

@Override
public void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws InvalidTxnStatusException {
switch (txnStatus) {
case COMMITTING:
openTransactions.remove(sequenceId);
committingTransactions.add(sequenceId);
break;
case ABORTING:
openTransactions.remove(sequenceId);
abortingTransactions.add(sequenceId);
break;
case ABORTED:
abortingTransactions.remove(sequenceId);
break;
case COMMITTED:
committingTransactions.remove(sequenceId);
break;
default:
throw new InvalidTxnStatusException("Transaction recover tracker`"
+ new TxnID(tcId, sequenceId) + "` load replay metadata operation "
+ "from transaction log with unknown operation");
}
}

@Override
public void handleOpenStatusTransaction(long sequenceId, long timeout) {
openTransactions.put(sequenceId, timeout);
}

@Override
public void appendOpenTransactionToTimeoutTracker() {
openTransactions.forEach(timeoutTracker::replayAddTransaction);
}

@Override
public void handleCommittingAndAbortingTransaction() {
committingTransactions.forEach(k ->
transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.COMMIT_VALUE));

abortingTransactions.forEach(k ->
transactionMetadataStoreService.endTransaction(new TxnID(tcId, k), TxnAction.ABORT_VALUE));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Implementation of a transaction recover tracker.
*/
package org.apache.pulsar.broker.transaction.recover;
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.recover;

import io.netty.util.HashedWheelTimer;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerImpl;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.testng.annotations.Test;

import java.lang.reflect.Field;
import java.util.Map;
import java.util.Set;

import static org.mockito.Mockito.mock;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertFalse;
import static org.testng.AssertJUnit.assertTrue;

public class TransactionRecoverTrackerTest {

@Test
public void openStatusRecoverTrackerTest() throws Exception {
TransactionMetadataStoreService transactionMetadataStoreService = mock(TransactionMetadataStoreService.class);
TransactionTimeoutTracker timeoutTracker = new TransactionTimeoutTrackerFactoryImpl(
transactionMetadataStoreService, new HashedWheelTimer()).newTracker(TransactionCoordinatorID.get(1));
TransactionRecoverTrackerImpl recoverTracker =
new TransactionRecoverTrackerImpl(transactionMetadataStoreService, timeoutTracker, 1);

recoverTracker.handleOpenStatusTransaction(1, 200);
recoverTracker.handleOpenStatusTransaction(2, 300);

Field field = TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
field.setAccessible(true);
Map<Long, Long> map = (Map<Long, Long>) field.get(recoverTracker);

assertEquals(map.size(), 2);
assertEquals(map.get(1L).longValue(), 200L);
assertEquals(map.get(2L).longValue(), 300L);

field = TransactionTimeoutTrackerImpl.class.getDeclaredField("priorityQueue");
field.setAccessible(true);
TripleLongPriorityQueue priorityQueue = (TripleLongPriorityQueue) field.get(timeoutTracker);
assertEquals(priorityQueue.size(), 0);

recoverTracker.appendOpenTransactionToTimeoutTracker();
assertEquals(priorityQueue.size(), 2);
}

@Test
public void updateStatusRecoverTest() throws Exception {
TransactionRecoverTrackerImpl recoverTracker =
new TransactionRecoverTrackerImpl(mock(TransactionMetadataStoreService.class),
mock(TransactionTimeoutTrackerImpl.class), 1);
long committingSequenceId = 1L;
long committedSequenceId = 2L;
long abortingSequenceId = 3L;
long abortedSequenceId = 4L;
recoverTracker.handleOpenStatusTransaction(committingSequenceId, 100);
recoverTracker.handleOpenStatusTransaction(committedSequenceId, 100);
recoverTracker.handleOpenStatusTransaction(abortingSequenceId, 100);
recoverTracker.handleOpenStatusTransaction(abortedSequenceId, 100);

Field field = TransactionRecoverTrackerImpl.class.getDeclaredField("openTransactions");
field.setAccessible(true);
Map<Long, Long> openMap = (Map<Long, Long>) field.get(recoverTracker);
assertEquals(4, openMap.size());

recoverTracker.updateTransactionStatus(committingSequenceId, TxnStatus.COMMITTING);
assertEquals(3, openMap.size());
recoverTracker.updateTransactionStatus(committedSequenceId, TxnStatus.COMMITTING);
assertEquals(2, openMap.size());
recoverTracker.updateTransactionStatus(committedSequenceId, TxnStatus.COMMITTED);

recoverTracker.updateTransactionStatus(abortingSequenceId, TxnStatus.ABORTING);
assertEquals(1, openMap.size());
recoverTracker.updateTransactionStatus(abortedSequenceId, TxnStatus.ABORTING);
assertEquals(0, openMap.size());
recoverTracker.updateTransactionStatus(abortedSequenceId, TxnStatus.ABORTED);

field = TransactionRecoverTrackerImpl.class.getDeclaredField("committingTransactions");
field.setAccessible(true);
Set<Long> commitSet = (Set<Long>) field.get(recoverTracker);

assertEquals(commitSet.size(), 1);
assertTrue(commitSet.contains(committingSequenceId));
assertFalse(commitSet.contains(committedSequenceId));

field = TransactionRecoverTrackerImpl.class.getDeclaredField("abortingTransactions");
field.setAccessible(true);
Set<Long> abortSet = (Set<Long>) field.get(recoverTracker);

assertEquals(1, abortSet.size());
assertTrue(abortSet.contains(abortingSequenceId));
assertFalse(abortSet.contains(abortedSequenceId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ static TransactionMetadataStoreProvider newProvider(String providerClassName) th
* @param managedLedgerFactory {@link ManagedLedgerFactory} the managedLedgerFactory to create managedLedger.
* @param managedLedgerConfig {@link ManagedLedgerConfig} the managedLedgerConfig to create managedLedger.
* @param timeoutTracker {@link TransactionTimeoutTracker} the timeoutTracker to handle transaction time out.
* @param recoverTracker {@link TransactionRecoverTracker} the recoverTracker to handle transaction recover.
* @return a future represents the result of the operation.
* an instance of {@link TransactionMetadataStore} is returned
* if the operation succeeds.
*/
CompletableFuture<TransactionMetadataStore> openStore(
TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker);
ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.transaction.coordinator;

import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;

/**
* This tracker is for transaction metadata store recover handle the different status transaction.
*/
public interface TransactionRecoverTracker {

/**
* Handle recover transaction update status.
* @param sequenceId {@link long} the sequenceId of this transaction.
* @param txnStatus {@link long} the txn status of this operation.
*/
void updateTransactionStatus(long sequenceId, TxnStatus txnStatus) throws CoordinatorException.InvalidTxnStatusException;

/**
* Handle recover transaction in open status.
* @param sequenceId {@link Long} the sequenceId of this transaction.
* @param timeout {@link long} the timeout time of this transaction.
*/
void handleOpenStatusTransaction(long sequenceId, long timeout);

/**
* Handle the transaction in open status append to transaction timeout tracker.
*/
void appendOpenTransactionToTimeoutTracker();

/**
* Handle the transaction in committing and aborting status.
*/
void handleCommittingAndAbortingTransaction();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionRecoverTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTracker;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;

/**
* The provider that offers in-memory implementation of {@link TransactionMetadataStore}.
Expand All @@ -36,7 +36,8 @@ public class InMemTransactionMetadataStoreProvider implements TransactionMetadat
public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordinatorID transactionCoordinatorId,
ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig,
TransactionTimeoutTracker timeoutTracker) {
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker) {
return CompletableFuture.completedFuture(
new InMemTransactionMetadataStore(transactionCoordinatorId));
}
Expand Down
Loading

0 comments on commit 36c3bc3

Please sign in to comment.