Skip to content

Commit

Permalink
[improve][transaction] Optimize topic lookup when TC end tx. (apache#…
Browse files Browse the repository at this point in the history
…14991)

### Motivation

When TC ends tx, it has to look up the topic. The original way uses pulsar-client to do this. It will look up every topic.
If using bundle cache to find the topic owner broker, it can avoid lookup every topic and then decrease the lookup time.

### Modifications

- Using bundle cache to find the topic owner broker. If occurs an error, fall back to look up the topic.
- Remove the topic cache, because using bundle cache, the original cache is useless.
  • Loading branch information
Technoboy- authored Apr 12, 2022
1 parent 1cb4798 commit 46baae6
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ public void start() throws PulsarServerException {
this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(getClient(), transactionTimer,
transactionBufferClient = TransactionBufferClientImpl.create(this, transactionTimer,
config.getTransactionBufferClientMaxConcurrentRequests(),
config.getTransactionBufferClientOperationTimeoutInMills());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
import io.netty.util.HashedWheelTimer;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
Expand All @@ -39,9 +40,9 @@ private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
this.tbHandler = tbHandler;
}

public static TransactionBufferClient create(PulsarClient pulsarClient, HashedWheelTimer timer,
int maxConcurrentRequests, long operationTimeoutInMills) {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarClient, timer,
public static TransactionBufferClient create(PulsarService pulsarService, HashedWheelTimer timer,
int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarService, timer,
maxConcurrentRequests, operationTimeoutInMills);
return new TransactionBufferClientImpl(handler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.netty.buffer.ByteBuf;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
Expand All @@ -42,7 +44,10 @@
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;

@Slf4j
Expand All @@ -53,31 +58,17 @@ public class TransactionBufferHandlerImpl implements TransactionBufferHandler {
private final AtomicLong requestIdGenerator = new AtomicLong();
private final long operationTimeoutInMills;
private final HashedWheelTimer timer;
private final PulsarClient pulsarClient;
private final PulsarService pulsarService;
private final PulsarClientImpl pulsarClient;

private static final AtomicIntegerFieldUpdater<TransactionBufferHandlerImpl> REQUEST_CREDITS_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(TransactionBufferHandlerImpl.class, "requestCredits");
private volatile int requestCredits;

private final LoadingCache<String, CompletableFuture<ClientCnx>> lookupCache = CacheBuilder.newBuilder()
.maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES)
.build(new CacheLoader<String, CompletableFuture<ClientCnx>>() {
@Override
public CompletableFuture<ClientCnx> load(String topic) {
CompletableFuture<ClientCnx> siFuture = getClientCnx(topic);
siFuture.whenComplete((si, cause) -> {
if (null != cause) {
lookupCache.invalidate(topic);
}
});
return siFuture;
}
});

public TransactionBufferHandlerImpl(PulsarClient pulsarClient, HashedWheelTimer timer,
int maxConcurrentRequests, long operationTimeoutInMills) {
this.pulsarClient = pulsarClient;
public TransactionBufferHandlerImpl(PulsarService pulsarService, HashedWheelTimer timer,
int maxConcurrentRequests, long operationTimeoutInMills) throws PulsarServerException {
this.pulsarService = pulsarService;
this.pulsarClient = (PulsarClientImpl) pulsarService.getClient();
this.outstandingRequests = new ConcurrentSkipListMap<>();
this.pendingRequests = new GrowableArrayBlockingQueue<>();
this.operationTimeoutInMills = operationTimeoutInMills;
Expand All @@ -97,15 +88,9 @@ public CompletableFuture<TxnID> endTxnOnTopic(String topic, long txnIdMostBits,
ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits,
topic, action, lowWaterMark);

try {
OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, lookupCache.get(topic));
if (checkRequestCredits(op)) {
endTxn(op);
}
} catch (ExecutionException e) {
log.error("[{}] failed to get client cnx from lookup cache", topic, e);
lookupCache.invalidate(topic);
cb.completeExceptionally(new PulsarClientException.LookupException(e.getCause().getMessage()));
OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, getClientCnx(topic));
if (checkRequestCredits(op)) {
endTxn(op);
}
return cb;
}
Expand All @@ -122,15 +107,9 @@ public CompletableFuture<TxnID> endTxnOnSubscription(String topic, String subscr
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits,
topic, subscription, action, lowWaterMark);
try {
OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, lookupCache.get(topic));
if (checkRequestCredits(op)) {
endTxn(op);
}
} catch (ExecutionException e) {
log.error("[{}] failed to get client cnx from lookup cache", topic, e);
lookupCache.invalidate(topic);
cb.completeExceptionally(new PulsarClientException.LookupException(e.getCause().getMessage()));
OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb, getClientCnx(topic));
if (checkRequestCredits(op)) {
endTxn(op);
}
return cb;
}
Expand All @@ -150,8 +129,8 @@ private boolean checkRequestCredits(OpRequestSend op) {
}

public void endTxn(OpRequestSend op) {
op.cnx.whenComplete((clientCnx, throwable) -> {
if (throwable == null) {
op.cnx.whenComplete((clientCnx, ex) -> {
if (ex == null) {
if (clientCnx.ctx().channel().isActive()) {
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
outstandingRequests.put(op.requestId, op);
Expand All @@ -166,16 +145,19 @@ public void endTxn(OpRequestSend op) {
op.cmd.retain();
clientCnx.ctx().writeAndFlush(op.cmd, clientCnx.ctx().voidPromise());
} else {
invalidateLookupCache(op);
op.cb.completeExceptionally(
new PulsarClientException.LookupException(op.topic + " endTxn channel is not active"));
onResponse(op);
}
} else {
log.error("endTxn error topic: [{}]", op.topic, throwable);
invalidateLookupCache(op);
op.cb.completeExceptionally(
new PulsarClientException.LookupException(throwable.getMessage()));
Throwable cause = FutureUtil.unwrapCompletionException(ex);
log.error("endTxn error topic: [{}]", op.topic, cause);
if (cause instanceof PulsarClientException.BrokerMetadataException) {
op.cb.complete(null);
} else {
op.cb.completeExceptionally(
new PulsarClientException.LookupException(cause.getMessage()));
}
onResponse(op);
}
});
Expand All @@ -202,12 +184,9 @@ public void handleEndTxnOnTopicResponse(long requestId, CommandEndTxnOnPartition
log.error("[{}] Got end txn on topic response for request {} error {}", op.topic,
response.getRequestId(),
response.getError());
invalidateLookupCache(op);
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
response.getMessage()));
}
} catch (Exception e) {
log.error("[{}] Got exception when complete EndTxnOnTopic op for request {}", op.topic, e);
} finally {
onResponse(op);
}
Expand Down Expand Up @@ -235,12 +214,9 @@ public void handleEndTxnOnSubscriptionResponse(long requestId,
} else {
log.error("[{}] Got end txn on subscription response for request {} error {}",
op.topic, response.getRequestId(), response.getError());
invalidateLookupCache(op);
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(),
response.getMessage()));
}
} catch (Exception e) {
log.error("[{}] Got exception when complete EndTxnOnSub op for request {}", op.topic, e);
} finally {
onResponse(op);
}
Expand All @@ -262,21 +238,14 @@ private void checkPendingRequests() {
if (REQUEST_CREDITS_UPDATER.compareAndSet(this, permits, permits - 1)) {
OpRequestSend polled = pendingRequests.poll();
if (polled != null) {
try {
if (polled.cnx != lookupCache.get(polled.topic)) {
OpRequestSend invalid = polled;
polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
lookupCache.get(invalid.topic));
invalid.recycle();
}
endTxn(polled);
} catch (ExecutionException e) {
log.error("[{}] failed to get client cnx from lookup cache", polled.topic, e);
lookupCache.invalidate(polled.topic);
polled.cb.completeExceptionally(new PulsarClientException.LookupException(
e.getCause().getMessage()));
REQUEST_CREDITS_UPDATER.incrementAndGet(this);
CompletableFuture<ClientCnx> clientCnx = getClientCnx(polled.topic);
if (polled.cnx != clientCnx) {
OpRequestSend invalid = polled;
polled = OpRequestSend.create(invalid.requestId, invalid.topic, invalid.cmd, invalid.cb,
clientCnx);
invalid.recycle();
}
endTxn(polled);
} else {
REQUEST_CREDITS_UPDATER.incrementAndGet(this);
}
Expand All @@ -287,16 +256,6 @@ private void checkPendingRequests() {
}
}

private void invalidateLookupCache(OpRequestSend op) {
try {
if (lookupCache.get(op.topic) == op.cnx) {
lookupCache.invalidate(op.topic);
}
} catch (ExecutionException e) {
lookupCache.invalidate(op.topic);
}
}

public static final class OpRequestSend {

long requestId;
Expand Down Expand Up @@ -336,8 +295,42 @@ protected OpRequestSend newObject(Handle<OpRequestSend> handle) {
};
}

public CompletableFuture<ClientCnx> getClientCnxWithLookup(String topic) {
return pulsarClient.getConnection(topic);
}

public CompletableFuture<ClientCnx> getClientCnx(String topic) {
return ((PulsarClientImpl) pulsarClient).getConnection(topic);
NamespaceService namespaceService = pulsarService.getNamespaceService();
CompletableFuture<NamespaceBundle> nsBundle = namespaceService.getBundleAsync(TopicName.get(topic));
return nsBundle
.thenCompose(bundle -> namespaceService.getOwnerAsync(bundle))
.thenCompose(data -> {
if (data.isPresent()) {
NamespaceEphemeralData ephemeralData = data.get();
try {
if (!ephemeralData.isDisabled()) {
URI uri;
if (pulsarClient.getConfiguration().isUseTls()) {
uri = new URI(ephemeralData.getNativeUrlTls());
} else {
uri = new URI(ephemeralData.getNativeUrl());
}
InetSocketAddress brokerAddress =
InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
return pulsarClient.getConnection(brokerAddress, brokerAddress);
} else {
// Bundle is unloading, lookup topic
return getClientCnxWithLookup(topic);
}
} catch (URISyntaxException e) {
// Should never go here
return getClientCnxWithLookup(topic);
}
} else {
// Bundle is not loaded yet, lookup topic
return getClientCnxWithLookup(topic);
}
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,35 @@ public void produceAndCommitTest() throws Exception {
produceTest(true);
}

@Test
public void testDeleteNamespaceBeforeCommit() throws Exception {
final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
PulsarClient pulsarClient = this.pulsarClient;
Transaction tnx = pulsarClient.newTransaction()
.withTransactionTimeout(60, TimeUnit.SECONDS)
.build().get();
long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
Assert.assertTrue(txnIdMostBits > -1);
Assert.assertTrue(txnIdLeastBits > -1);

@Cleanup
Producer<byte[]> outProducer = pulsarClient
.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();

String content = "Hello Txn";
outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();

try {
admin.namespaces().deleteNamespace(NAMESPACE1, true);
} catch (Exception ignore) {}
tnx.commit().get();
}

@Test
public void produceAndAbortTest() throws Exception {
produceTest(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected void startBroker() throws Exception {
conf.setConfigurationMetadataStoreUrl("zk:localhost:3181");
conf.setAllowAutoTopicCreationType("non-partitioned");
conf.setBookkeeperClientExposeStatsToPrometheus(true);

conf.setForceDeleteNamespaceAllowed(true);
conf.setBrokerShutdownTimeoutMs(0L);
conf.setBrokerServicePort(Optional.of(0));
conf.setBrokerServicePortTls(Optional.of(0));
Expand Down
Loading

0 comments on commit 46baae6

Please sign in to comment.