Skip to content

Commit

Permalink
[admin][txn] Add transaction admin to get recover time in stats. (apa…
Browse files Browse the repository at this point in the history
…che#15654)

* [admin][txn] Add transaction admin to get recover time in stats.
### Motivation & Modification
Optimize admin tools to get recovery timestamps in TB, TP, TC stats.
  • Loading branch information
liangyepianzhou authored Jul 5, 2022
1 parent 57c3a67 commit da325fa
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;

Expand Down Expand Up @@ -103,6 +104,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
*/
private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();

public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();

private final Semaphore handleLowWaterMark = new Semaphore(1);

public TopicTransactionBuffer(PersistentTopic topic) {
Expand All @@ -120,6 +123,7 @@ public TopicTransactionBuffer(PersistentTopic topic) {
}

private void recover() {
recoverTime.setRecoverStartTime(System.currentTimeMillis());
this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this)
.execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() {
@Override
Expand All @@ -142,6 +146,7 @@ public void recoverComplete() {
timer.newTimeout(TopicTransactionBuffer.this,
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
transactionBufferFuture.complete(null);
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
}
Expand All @@ -157,6 +162,7 @@ public void noNeedToRecover() {
log.error("[{}]Transaction buffer recover fail", topic.getName());
} else {
transactionBufferFuture.complete(null);
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
}
Expand Down Expand Up @@ -210,6 +216,7 @@ public void recoverExceptionally(Throwable e) {
} else {
transactionBufferFuture.completeExceptionally(e);
}
recoverTime.setRecoverEndTime(System.currentTimeMillis());
topic.close(true);
}
}, this.topic, this, takeSnapshotWriter));
Expand Down Expand Up @@ -565,6 +572,8 @@ public TransactionBufferStats getStats(boolean lowWaterMarks) {
}
transactionBufferStats.ongoingTxnSize = ongoingTxns.size();

transactionBufferStats.recoverStartTime = recoverTime.getRecoverStartTime();
transactionBufferStats.recoverEndTime = recoverTime.getRecoverEndTime();
return transactionBufferStats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;

Expand Down Expand Up @@ -127,6 +128,8 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
@Getter
private final ExecutorService internalPinnedExecutor;

public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();


public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
super(State.None);
Expand Down Expand Up @@ -157,6 +160,7 @@ private void initPendingAckStore() {
this.pendingAckStoreFuture =
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
recoverTime.setRecoverStartTime(System.currentTimeMillis());
pendingAckStore.replayAsync(this, internalPinnedExecutor);
}).exceptionally(e -> {
acceptQueue.clear();
Expand Down Expand Up @@ -905,18 +909,26 @@ public TransactionPendingAckStats getStats(boolean lowWaterMarks) {
} else {
transactionPendingAckStats.ongoingTxnSize = 0;
}
transactionPendingAckStats.recoverStartTime = recoverTime.getRecoverStartTime();
transactionPendingAckStats.recoverEndTime = recoverTime.getRecoverEndTime();
return transactionPendingAckStats;
}

public synchronized void completeHandleFuture() {
if (!this.pendingAckHandleCompletableFuture.isDone()) {
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
}
if (recoverTime.getRecoverStartTime() == 0L) {
return;
} else {
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}

public synchronized void exceptionHandleFuture(Throwable t) {
if (!this.pendingAckHandleCompletableFuture.isDone()) {
this.pendingAckHandleCompletableFuture.completeExceptionally(t);
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -619,6 +620,61 @@ public void testUpdateTransactionCoordinatorNumber() throws Exception {
}
}

@Test
public void testGetRecoveryTime() throws Exception {
initTransaction(1);
final String topic = "persistent://public/default/testGetRecoveryTime";
final String subName = "test";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.subscriptionName(subName)
.topic(topic)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topic)
.create();

Awaitility.await().untilAsserted(() -> {
Map<Integer, TransactionCoordinatorStats> transactionCoordinatorStatsMap =
admin.transactions().getCoordinatorStats();
assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverStartTime, 0L);
assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverEndTime, 0L);
assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverEndTime, -1L);
});
Awaitility.await().untilAsserted(() -> {
TransactionBufferStats transactionBufferStats = admin.transactions().getTransactionBufferStats(topic);
assertNotEquals(transactionBufferStats.recoverStartTime, 0L);
assertNotEquals(transactionBufferStats.recoverEndTime, 0L);
assertNotEquals(transactionBufferStats.recoverEndTime, -1L);
});

TransactionPendingAckStats transactionPendingAckStats =
admin.transactions().getPendingAckStats(topic, subName);
assertEquals(transactionPendingAckStats.recoverStartTime, 0L);
assertEquals(transactionPendingAckStats.recoverEndTime, 0L);

Transaction transaction1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();

producer.newMessage().send();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);

consumer.acknowledgeAsync(message.getMessageId(), transaction1);
transaction1.commit().get();

transactionPendingAckStats =
admin.transactions().getPendingAckStats(topic, subName);
assertNotEquals(transactionPendingAckStats.recoverStartTime, 0L);
assertNotEquals(transactionPendingAckStats.recoverEndTime, 0L);
assertNotEquals(transactionPendingAckStats.recoverEndTime, -1L);
}


@Test
public void testCheckPositionInPendingAckState() throws Exception {
Expand Down Expand Up @@ -718,7 +774,7 @@ public void testGetPositionStatsInPendingAckStatsFroBatch() throws Exception {
admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 1);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.PendingAck);

positionStatsInPendingAckStats =
admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public class TransactionBufferStats {
* The total number of ongoing transactions in this transaction buffer.
*/
public long ongoingTxnSize;

//Start timestamp of transaction buffer recovery. 0L means no startup.
public long recoverStartTime;
//End timestamp of transaction buffer recovery. 0L means no startup.
public long recoverEndTime;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ public class TransactionCoordinatorStats {
* The total number of ongoing transactions in this transaction coordinator.
*/
public long ongoingTxnSize;
//Start timestamp of transaction coordinator recovery. 0L means no startup.
public long recoverStartTime;
//End timestamp of transaction coordinator recovery. 0L means no startup.
public long recoverEndTime;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,8 @@ public class TransactionPendingAckStats {
* The total number of ongoing transactions in this transaction pending ack.
*/
public long ongoingTxnSize;
//Start timestamp of transaction pendingAck recovery. 0L means no startup.
public long recoverStartTime;
//End timestamp of transaction pendingAck recovery. 0L means no startup.
public long recoverEndTime;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.util;


import lombok.Getter;
import lombok.Setter;

@Getter
@Setter
public class RecoverTimeRecord {

private long recoverStartTime;

private long recoverEndTime;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class MLTransactionMetadataStore
private final LongAdder appendLogCount;
private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
private final ExecutorService internalPinnedExecutor;
public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
private final long maxActiveTransactionsPerCoordinator;

public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
Expand Down Expand Up @@ -110,8 +112,8 @@ public CompletableFuture<TransactionMetadataStore> init(TransactionRecoverTracke
.CoordinatorNotFoundException("transaction metadata store with tcId "
+ tcID.toString() + " change state to Initializing error when init it"));
} else {
recoverTime.setRecoverStartTime(System.currentTimeMillis());
internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {

@Override
public void replayComplete() {
recoverTracker.appendOpenTransactionToTimeoutTracker();
Expand All @@ -126,6 +128,7 @@ public void replayComplete() {
recoverTracker.handleCommittingAndAbortingTransaction();
timeoutTracker.start();
completableFuture.complete(MLTransactionMetadataStore.this);
recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}

Expand Down Expand Up @@ -446,6 +449,8 @@ public TransactionCoordinatorStats getCoordinatorStats() {
transactionCoordinatorstats.setState(getState().name());
transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId());
transactionCoordinatorstats.ongoingTxnSize = txnMetaMap.size();
transactionCoordinatorstats.recoverStartTime = recoverTime.getRecoverStartTime();
transactionCoordinatorstats.recoverEndTime = recoverTime.getRecoverEndTime();
return transactionCoordinatorstats;
}

Expand Down

0 comments on commit da325fa

Please sign in to comment.