Skip to content

Commit

Permalink
Fix transaction buffer lookup (apache#10257)
Browse files Browse the repository at this point in the history
## Motivation
now transaction buffer client handle transaction coordinator command by find topic address and create connect. it can't init `PersistentTopic` in broker, so the command will always fail. 
## implement
1. transaction buffer client handle transaction coordinator command should lookup topic once, the lookup command will init `PersistentTopic`.
2. add a cache for the transaction buffer client.
  • Loading branch information
congbobo184 authored Apr 23, 2021
1 parent b67a4b2 commit e970c29
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
Expand Down Expand Up @@ -745,8 +744,7 @@ public Boolean get() {
this.transactionBufferSnapshotService = new SystemTopicBaseTxnBufferSnapshotService(getClient());
this.transactionTimer =
new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer"));
transactionBufferClient = TransactionBufferClientImpl.create(
getNamespaceService(), ((PulsarClientImpl) getClient()).getCnxPool(), transactionTimer);
transactionBufferClient = TransactionBufferClientImpl.create(getClient(), transactionTimer);

transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider
.newProvider(config.getTransactionMetadataStoreProviderClassName()), this,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import io.netty.util.HashedWheelTimer;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.proto.TxnAction;

Expand All @@ -40,9 +39,8 @@ private TransactionBufferClientImpl(TransactionBufferHandler tbHandler) {
this.tbHandler = tbHandler;
}

public static TransactionBufferClient create(NamespaceService namespaceService, ConnectionPool connectionPool,
HashedWheelTimer timer) {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(connectionPool, namespaceService, timer);
public static TransactionBufferClient create(PulsarClient pulsarClient, HashedWheelTimer timer) {
TransactionBufferHandler handler = new TransactionBufferHandlerImpl(pulsarClient, timer);
return new TransactionBufferClientImpl(handler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,53 +18,68 @@
*/
package org.apache.pulsar.broker.transaction.buffer.impl;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.netty.buffer.ByteBuf;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.ReachMaxPendingOpsException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
public class TransactionBufferHandlerImpl implements TransactionBufferHandler, TimerTask {

private final ConcurrentSkipListMap<Long, OpRequestSend> pendingRequests;
private final ConnectionPool connectionPool;
private final NamespaceService namespaceService;
private final AtomicLong requestIdGenerator = new AtomicLong();
private final long operationTimeoutInMills;
private Timeout requestTimeout;
private final HashedWheelTimer timer;
private final Semaphore semaphore;
private final boolean blockIfReachMaxPendingOps;
private final PulsarClient pulsarClient;

public TransactionBufferHandlerImpl(ConnectionPool connectionPool, NamespaceService namespaceService,
private final LoadingCache<String, CompletableFuture<ClientCnx>> cache = CacheBuilder.newBuilder()
.maximumSize(100000)
.expireAfterAccess(30, TimeUnit.MINUTES)
.build(new CacheLoader<String, CompletableFuture<ClientCnx>>() {
@Override
public CompletableFuture<ClientCnx> load(String topic) {
CompletableFuture<ClientCnx> siFuture = getClientCnx(topic);
siFuture.whenComplete((si, cause) -> {
if (null != cause) {
cache.invalidate(topic);
}
});
return siFuture;
}
});

public TransactionBufferHandlerImpl(PulsarClient pulsarClient,
HashedWheelTimer timer) {
this.connectionPool = connectionPool;
this.pulsarClient = pulsarClient;
this.pendingRequests = new ConcurrentSkipListMap<>();
this.namespaceService = namespaceService;
this.operationTimeoutInMills = 3000L;
this.semaphore = new Semaphore(10000);
this.blockIfReachMaxPendingOps = true;
Expand All @@ -82,25 +97,7 @@ public synchronized CompletableFuture<TxnID> endTxnOnTopic(String topic, long tx
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf cmd = Commands.newEndTxnOnPartition(requestId, txnIdLeastBits, txnIdMostBits,
topic, action, lowWaterMark);
OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
pendingRequests.put(requestId, op);
cmd.retain();
cnx(topic).whenComplete((clientCnx, throwable) -> {
if (throwable == null) {
try {
clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
} catch (Exception e) {
cb.completeExceptionally(e);
pendingRequests.remove(requestId);
op.recycle();
}
} else {
cb.completeExceptionally(throwable);
pendingRequests.remove(requestId);
op.recycle();
}
});
return cb;
return endTxn(requestId, topic, cmd, cb);
}

@Override
Expand All @@ -114,24 +111,38 @@ public synchronized CompletableFuture<TxnID> endTxnOnSubscription(String topic,
long requestId = requestIdGenerator.getAndIncrement();
ByteBuf cmd = Commands.newEndTxnOnSubscription(requestId, txnIdLeastBits, txnIdMostBits,
topic, subscription, action, lowWaterMark);
return endTxn(requestId, topic, cmd, cb);
}

private CompletableFuture<TxnID> endTxn(long requestId, String topic, ByteBuf cmd, CompletableFuture<TxnID> cb) {
OpRequestSend op = OpRequestSend.create(requestId, topic, cmd, cb);
pendingRequests.put(requestId, op);
cmd.retain();
cnx(topic).whenComplete((clientCnx, throwable) -> {
if (throwable == null) {
try {
clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
} catch (Exception e) {
cb.completeExceptionally(e);
try {
cache.get(topic).whenComplete((clientCnx, throwable) -> {
if (throwable == null) {
try {
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
clientCnx.ctx().writeAndFlush(cmd, clientCnx.ctx().voidPromise());
} catch (Exception e) {
cache.invalidate(topic);
cb.completeExceptionally(e);
pendingRequests.remove(requestId);
op.recycle();
}
} else {
cache.invalidate(topic);
cb.completeExceptionally(throwable);
pendingRequests.remove(requestId);
op.recycle();
}
} else {
cb.completeExceptionally(throwable);
pendingRequests.remove(requestId);
op.recycle();
}
});
});
} catch (ExecutionException e) {
cache.invalidate(topic);
cb.completeExceptionally(new PulsarClientException.LookupException(e.getCause().getMessage()));
pendingRequests.remove(requestId);
op.recycle();
}
return cb;
}

Expand All @@ -154,6 +165,7 @@ public synchronized void handleEndTxnOnTopicResponse(long requestId, CommandEndT
} else {
log.error("[{}] Got end txn on topic response for request {} error {}", op.topic, response.getRequestId(),
response.getError());
cache.invalidate(op.topic);
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
}
op.recycle();
Expand All @@ -180,42 +192,12 @@ public synchronized void handleEndTxnOnSubscriptionResponse(long requestId,
} else {
log.error("[{}] Got end txn on subscription response for request {} error {}",
op.topic, response.getRequestId(), response.getError());
cache.invalidate(op.topic);
op.cb.completeExceptionally(ClientCnx.getPulsarClientException(response.getError(), response.getMessage()));
}
op.recycle();
}

private CompletableFuture<ClientCnx> cnx(String topic) {
return getServiceUrl(topic).thenCompose(serviceUrl -> {
try {
if (serviceUrl == null) {
return CompletableFuture.completedFuture(null);
}
URI uri = new URI(serviceUrl);
return connectionPool.getConnection(InetSocketAddress.createUnresolved(uri.getHost(),
uri.getPort())).thenCompose(clientCnx -> {
clientCnx.registerTransactionBufferHandler(TransactionBufferHandlerImpl.this);
return CompletableFuture.completedFuture(clientCnx);
});
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
});
}

private CompletableFuture<String> getServiceUrl(String topic) {
TopicName topicName = TopicName.get(topic);
return namespaceService.getBundleAsync(topicName)
.thenCompose(namespaceService::getOwnerAsync)
.thenCompose(ned -> {
String serviceUrl = null;
if (ned.isPresent()) {
serviceUrl = ned.get().getNativeUrl();
}
return CompletableFuture.completedFuture(serviceUrl);
});
}

private boolean canSendRequest(CompletableFuture<?> callback) {
try {
if (blockIfReachMaxPendingOps) {
Expand All @@ -234,13 +216,12 @@ private boolean canSendRequest(CompletableFuture<?> callback) {
return true;
}

@Override
public synchronized void run(Timeout timeout) throws Exception {
if (timeout.isCancelled()) {
return;
}
long timeToWaitMs;
OpRequestSend peeked = null;
OpRequestSend peeked;
Map.Entry<Long, OpRequestSend> firstEntry = pendingRequests.firstEntry();
peeked = firstEntry == null ? null : firstEntry.getValue();
while (peeked != null && peeked.createdAt + operationTimeoutInMills - System.currentTimeMillis() <= 0) {
Expand Down Expand Up @@ -304,6 +285,10 @@ protected OpRequestSend newObject(Handle<OpRequestSend> handle) {
};
}

private CompletableFuture<ClientCnx> getClientCnx(String topic) {
return ((PulsarClientImpl) pulsarClient).getConnection(topic);
}

@Override
public void close() {
this.timer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;

import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.Cleanup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.pulsar.broker.intercept.MockBrokerInterceptor;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
Expand All @@ -40,7 +45,6 @@
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -55,8 +59,6 @@
import java.lang.reflect.Field;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
Expand All @@ -71,18 +73,19 @@ public class TransactionBufferClientTest extends TransactionMetaStoreTestBase {
TopicName partitionedTopicName = TopicName.get("persistent", "public", "test", "tb-client");
int partitions = 10;
BrokerService[] brokerServices;
private final static String namespace = "public/test";

@Override
protected void afterSetup() throws Exception {
pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress()));
pulsarAdmins[0].tenants().createTenant("public", new TenantInfo(Sets.newHashSet(), Sets.newHashSet("my-cluster")));
pulsarAdmins[0].namespaces().createNamespace("public/test", 10);
pulsarAdmins[0].namespaces().createNamespace(namespace, 10);
pulsarAdmins[0].topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions);
pulsarClient.newConsumer()
.topic(partitionedTopicName.getPartitionedTopicName())
.subscriptionName("test").subscribe();
tbClient = TransactionBufferClientImpl.create(pulsarServices[0].getNamespaceService(),
((PulsarClientImpl) pulsarClient).getCnxPool(),
tbClient = TransactionBufferClientImpl.create(
((PulsarClientImpl) pulsarClient),
new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer")));
}

Expand All @@ -103,6 +106,7 @@ protected void cleanup() throws Exception {
@Override
protected void afterPulsarStart() throws Exception {
brokerServices = new BrokerService[pulsarServices.length];
AtomicLong atomicLong = new AtomicLong(0);
for (int i = 0; i < pulsarServices.length; i++) {
Subscription mockSubscription = mock(Subscription.class);
Mockito.when(mockSubscription.endTxn(Mockito.anyLong(),
Expand All @@ -120,6 +124,8 @@ protected void afterPulsarStart() throws Exception {
CompletableFuture.completedFuture(Optional.of(mockTopic)));

BrokerService brokerService = Mockito.spy(new BrokerService(pulsarServices[i]));
doReturn(new MockBrokerInterceptor()).when(brokerService).getInterceptor();
doReturn(atomicLong.getAndIncrement() + "").when(brokerService).generateUniqueProducerName();
brokerServices[i] = brokerService;
Mockito.when(brokerService.getTopics()).thenReturn(topicMap);
Mockito.when(pulsarServices[i].getBrokerService()).thenReturn(brokerService);
Expand Down Expand Up @@ -217,13 +223,12 @@ public void testTransactionBufferOpFail() throws InterruptedException, Execution

@Test
public void testTransactionBufferClientTimeout() throws Exception {
ConnectionPool connectionPool = mock(ConnectionPool.class);
NamespaceService namespaceService = mock(NamespaceService.class);
PulsarClientImpl mockClient = mock(PulsarClientImpl.class);
when(mockClient.getConnection(anyString())).thenReturn(new CompletableFuture<>());
@Cleanup("stop")
HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
doReturn(new CompletableFuture<>()).when(namespaceService).getBundleAsync(anyObject());
TransactionBufferHandlerImpl transactionBufferHandler = new TransactionBufferHandlerImpl(connectionPool,
namespaceService, hashedWheelTimer);
TransactionBufferHandlerImpl transactionBufferHandler =
new TransactionBufferHandlerImpl(mockClient, hashedWheelTimer);
CompletableFuture<TxnID> completableFuture =
transactionBufferHandler.endTxnOnTopic("test", 1, 1, TxnAction.ABORT, 1);

Expand All @@ -248,4 +253,13 @@ public void testTransactionBufferClientTimeout() throws Exception {
assertTrue(e.getCause() instanceof TransactionBufferClientException.RequestTimeoutException);
}
}

@Test
public void testTransactionBufferLookUp() throws ExecutionException, InterruptedException {
String topic = "persistent://" + namespace + "/testTransactionBufferLookUp";
tbClient.abortTxnOnSubscription(topic + "_abort_sub", "test", 1L, 1L, -1L).get();
tbClient.commitTxnOnSubscription(topic + "_commit_sub", "test", 1L, 1L, -1L).get();
tbClient.abortTxnOnTopic(topic + "_abort_topic", 1L, 1L, -1L).get();
tbClient.commitTxnOnTopic(topic + "_commit_topic", 1L, 1L, -1L).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public int getRemoteEndpointProtocolVersion() {
* @return the ClientCnx to use, passed a future. Will complete with an exception when connections are rejected.
*/
@Override
protected CompletableFuture<ClientCnx> getConnection(String topic) {
public CompletableFuture<ClientCnx> getConnection(String topic) {
if (rejectNewConnections) {
CompletableFuture<ClientCnx> result = new CompletableFuture<>();
result.completeExceptionally(new IOException("New connections are rejected."));
Expand Down
Loading

0 comments on commit e970c29

Please sign in to comment.