Skip to content

Commit

Permalink
[Fix][Client] Fixed cnx channel Inactive causing the request fail to …
Browse files Browse the repository at this point in the history
…time out and fail to return (apache#17051)
  • Loading branch information
liangyepianzhou authored Aug 18, 2022
1 parent 7689133 commit 7913fe5
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;

import com.google.common.collect.Sets;
import io.netty.channel.ChannelHandlerContext;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = "broker-impl")
public class ClientCnxTest extends MockedPulsarServiceBaseTest {

public static final String CLUSTER_NAME = "test";
public static final String TENANT = "tnx";
public static final String NAMESPACE = TENANT + "/ns1";
public static String persistentTopic = "persistent://" + NAMESPACE + "/test";
ExecutorService executorService = Executors.newFixedThreadPool(20);

@BeforeClass
@Override
protected void setup() throws Exception {
super.internalSetup();
admin.clusters().createCluster(CLUSTER_NAME, ClusterData.builder()
.serviceUrl(pulsar.getWebServiceAddress()).build());
admin.tenants().createTenant(TENANT,
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE);
}

@AfterClass(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
this.executorService.shutdown();
}

@Test
public void testRemoveAndHandlePendingRequestInCnx() throws Exception {

String subName = "sub";
int operationTimes = 5000;
CountDownLatch countDownLatch = new CountDownLatch(operationTimes);

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(persistentTopic)
.subscriptionName(subName)
.subscribe();

new Thread(() -> {
for (int i = 0; i < operationTimes; i++) {
executorService.submit(() -> {
consumer.getLastMessageIdAsync().whenComplete((ignore, exception) -> {
countDownLatch.countDown();
});
});
}
}).start();

for (int i = 0; i < operationTimes; i++) {
ClientCnx cnx = ((ConsumerImpl<?>) consumer).getClientCnx();
if (cnx != null) {
ChannelHandlerContext context = cnx.ctx();
if (context != null) {
cnx.ctx().close();
}
}
}

Awaitility.await().until(() -> {
countDownLatch.await();
return true;
});

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,11 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
"Disconnected from server at " + ctx.channel().remoteAddress());

// Fail out all the pending ops
pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
pendingRequests.forEach((key, future) -> {
if (pendingRequests.remove(key, future) && !future.isDone()) {
future.completeExceptionally(e);
}
});
waitingLookupRequests.forEach(pair -> pair.getRight().getRight().completeExceptionally(e));

// Notify all attached producers/consumers so they have a chance to reconnect
Expand All @@ -299,7 +303,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
transactionMetaStoreHandlers.forEach((id, handler) -> handler.connectionClosed(this));
topicListWatchers.forEach((__, watcher) -> watcher.connectionClosed(this));

pendingRequests.clear();
waitingLookupRequests.clear();

producers.clear();
Expand Down Expand Up @@ -900,8 +903,7 @@ private <T> void sendRequestAndHandleTimeout(ByteBuf requestMessage, long reques
if (flush) {
ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
CompletableFuture<?> newFuture = pendingRequests.remove(requestId);
if (newFuture != null && !newFuture.isDone()) {
if (pendingRequests.remove(requestId, future) && !future.isDone()) {
log.warn("{} Failed to send {} to broker: {}", ctx.channel(),
requestType.getDescription(), writeFuture.cause().getMessage());
future.completeExceptionally(writeFuture.cause());
Expand Down

0 comments on commit 7913fe5

Please sign in to comment.