Skip to content

Commit

Permalink
[fix][txn]: fix transaction buffer recover reader and writer fail (ap…
Browse files Browse the repository at this point in the history
…ache#14801)

### Motivation
When Transaction buffer recover create reader or create writer fail or read snapshot fail throw PulsarClientException, we should rerecover this topic so close this topic to reinit.
  • Loading branch information
congbobo184 authored Mar 24, 2022
1 parent a14a97e commit bf56863
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,17 @@ public void close() throws IOException {

@Override
public CompletableFuture<Void> closeAsync() {
return producer.closeAsync().thenCompose(v -> {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
producer.closeAsync().whenComplete((v, e) -> {
// if close fail, also need remove the producer
transactionBufferSystemTopicClient.removeWriter(this);
return CompletableFuture.completedFuture(null);
if (e != null) {
completableFuture.completeExceptionally(e);
return;
}
completableFuture.complete(null);
});
return completableFuture;
}

@Override
Expand Down Expand Up @@ -179,10 +186,17 @@ public void close() throws IOException {

@Override
public CompletableFuture<Void> closeAsync() {
return reader.closeAsync().thenCompose(v -> {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
reader.closeAsync().whenComplete((v, e) -> {
// if close fail, also need remove the reader
transactionBufferSystemTopicClient.removeReader(this);
return CompletableFuture.completedFuture(null);
if (e != null) {
completableFuture.completeExceptionally(e);
return;
}
completableFuture.complete(null);
});
return completableFuture;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
Expand Down Expand Up @@ -178,15 +179,24 @@ public void handleTxnEntry(Entry entry) {
}

@Override
public void recoverExceptionally(Exception e) {
if (e instanceof PulsarClientException.BrokerMetadataException) {
public void recoverExceptionally(Throwable e) {

// when create reader or writer fail throw PulsarClientException,
// should close this topic and then reinit this topic
if (e instanceof PulsarClientException) {
// if transaction buffer recover fail throw PulsarClientException,
// we need to change the PulsarClientException to ServiceUnitNotReadyException,
// the tc do op will retry
transactionBufferFuture.completeExceptionally
(new BrokerServiceException.ServiceUnitNotReadyException(e.getMessage(), e));
log.warn("Closing topic {} due to read transaction buffer snapshot while recovering the "
+ "transaction buffer throw exception", topic.getName(), e);
topic.close();
} else {
transactionBufferFuture.completeExceptionally(e);
}
transactionBufferFuture.completeExceptionally(e);
topic.close(true);
}
}, this.topic, this));
}, this.topic, this, takeSnapshotWriter));
}

@Override
Expand Down Expand Up @@ -541,98 +551,112 @@ static class TopicTransactionBufferRecover implements Runnable {

private final TopicTransactionBuffer topicTransactionBuffer;

private final CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> takeSnapshotWriter;

private TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack, PersistentTopic topic,
TopicTransactionBuffer transactionBuffer) {
TopicTransactionBuffer transactionBuffer, CompletableFuture<
SystemTopicClient.Writer<TransactionBufferSnapshot>> takeSnapshotWriter) {
this.topic = topic;
this.callBack = callBack;
this.entryQueue = new SpscArrayQueue<>(2000);
this.topicTransactionBuffer = transactionBuffer;
this.takeSnapshotWriter = takeSnapshotWriter;
}

@SneakyThrows
@Override
public void run() {
if (!this.topicTransactionBuffer.changeToInitializingState()) {
log.warn("TransactionBuffer {} of topic {} can not change state to Initializing",
this, topic.getName());
return;
}
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
try {
boolean hasSnapshot = false;
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshot> message = reader.readNext();
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
hasSnapshot = true;
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
}
if (!hasSnapshot) {
callBack.noNeedToRecover();
return;
}
} catch (PulsarClientException pulsarClientException) {
log.error("[{}]Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), pulsarClientException);
callBack.recoverExceptionally(pulsarClientException);
reader.closeAsync().exceptionally(e -> {
log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
return null;
});
this.takeSnapshotWriter.thenRunAsync(() -> {
if (!this.topicTransactionBuffer.changeToInitializingState()) {
log.warn("TransactionBuffer {} of topic {} can not change state to Initializing",
this, topic.getName());
return;
}
reader.closeAsync().exceptionally(e -> {
log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
return null;
});

ManagedCursor managedCursor;
try {
managedCursor = topic.getManagedLedger()
.newNonDurableCursor(this.startReadCursorPosition, SUBSCRIPTION_NAME);
} catch (ManagedLedgerException e) {
callBack.recoverExceptionally(e);
log.error("[{}]Transaction buffer recover fail when open cursor!", topic.getName(), e);
return;
}
PositionImpl lastConfirmedEntry = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
PositionImpl currentLoadPosition = (PositionImpl) this.startReadCursorPosition;
FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue,
managedCursor, TopicTransactionBufferRecover.this);
if (lastConfirmedEntry.getEntryId() != -1) {
while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
&& fillEntryQueueCallback.fillQueue()) {
Entry entry = entryQueue.poll();
if (entry != null) {
topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> {
try {
currentLoadPosition = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
callBack.handleTxnEntry(entry);
} finally {
entry.release();
boolean hasSnapshot = false;
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshot> message = reader.readNext();
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot = message.getValue();
if (transactionBufferSnapshot != null) {
hasSnapshot = true;
callBack.handleSnapshot(transactionBufferSnapshot);
this.startReadCursorPosition = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
}
if (!hasSnapshot) {
callBack.noNeedToRecover();
return;
}
} catch (PulsarClientException pulsarClientException) {
log.error("[{}]Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), pulsarClientException);
callBack.recoverExceptionally(pulsarClientException);
reader.closeAsync().exceptionally(e -> {
log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
return null;
});
return;
}
} else {
reader.closeAsync().exceptionally(e -> {
log.error("[{}]Transaction buffer reader close error!", topic.getName(), e);
return null;
});

ManagedCursor managedCursor;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
//no-op
managedCursor = topic.getManagedLedger()
.newNonDurableCursor(this.startReadCursorPosition, SUBSCRIPTION_NAME);
} catch (ManagedLedgerException e) {
callBack.recoverExceptionally(e);
log.error("[{}]Transaction buffer recover fail when open cursor!", topic.getName(), e);
return;
}
PositionImpl lastConfirmedEntry =
(PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
PositionImpl currentLoadPosition = (PositionImpl) this.startReadCursorPosition;
FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue,
managedCursor, TopicTransactionBufferRecover.this);
if (lastConfirmedEntry.getEntryId() != -1) {
while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0
&& fillEntryQueueCallback.fillQueue()) {
Entry entry = entryQueue.poll();
if (entry != null) {
try {
currentLoadPosition = PositionImpl.get(entry.getLedgerId(),
entry.getEntryId());
callBack.handleTxnEntry(entry);
} finally {
entry.release();
}
} else {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
//no-op
}
}
}
}
}
}
}

closeCursor(managedCursor);
callBack.recoverComplete();
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this))
.exceptionally(e -> {
callBack.recoverExceptionally(new Exception(e));
log.error("[{}]Transaction buffer new snapshot reader fail!", topic.getName(), e);
closeCursor(managedCursor);
callBack.recoverComplete();
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this)).exceptionally(e -> {
callBack.recoverExceptionally(e.getCause());
log.error("[{}]Transaction buffer new snapshot reader fail!", topic.getName(), e);
return null;
});
}, topic.getBrokerService().getPulsar().getTransactionExecutorProvider()
.getExecutor(this)).exceptionally(e -> {
callBack.recoverExceptionally(e.getCause());
log.error("[{}]Transaction buffer create snapshot writer fail!",
topic.getName(), e);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,5 @@ public interface TopicTransactionBufferRecoverCallBack {
/**
* Topic transaction buffer recover exceptionally.
*/
void recoverExceptionally(Exception e);
void recoverExceptionally(Throwable e);
}
Loading

0 comments on commit bf56863

Please sign in to comment.