Skip to content

Commit

Permalink
Fix flaky test BrokerServiceLookupTest.testModularLoadManagerSplitBun…
Browse files Browse the repository at this point in the history
…dle (apache#9577)
  • Loading branch information
lhotari authored Feb 12, 2021
1 parent 6ec7717 commit 341fe5e
Showing 1 changed file with 21 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,11 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;

import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Field;
Expand All @@ -54,24 +51,19 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.naming.AuthenticationException;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;

import lombok.Cleanup;

import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl;
Expand Down Expand Up @@ -99,6 +91,7 @@
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -920,7 +913,7 @@ public void testSplitUnloadLookupTest() throws Exception {
*
* @throws Exception
*/
@Test(timeOut = 10000)
@Test(timeOut = 20000)
public void testModularLoadManagerSplitBundle() throws Exception {

log.info("-- Starting {} test --", methodName);
Expand Down Expand Up @@ -972,16 +965,19 @@ public void testModularLoadManagerSplitBundle() throws Exception {

// (3) Broker-2 receives topic-1 request, creates local-policies and sets the watch
final String topic1 = "persistent://" + namespace + "/topic1";
@Cleanup
Consumer<byte[]> consumer1 = pulsarClient2.newConsumer().topic(topic1)
.subscriptionName("my-subscriber-name").subscribe();

// (4) Broker-1 will own topic-1
final String unsplitBundle = namespace + "/0x00000000_0xffffffff";
retryStrategically((test) -> pulsar.getNamespaceService().getOwnedServiceUnits().stream()
.map(nb -> nb.toString()).collect(Collectors.toSet()).contains(unsplitBundle), 5, 100);
Set<String> serviceUnits1 = pulsar.getNamespaceService().getOwnedServiceUnits().stream()
.map(nb -> nb.toString()).collect(Collectors.toSet());
assertTrue(serviceUnits1.contains(unsplitBundle));

Awaitility.await().until(() ->
pulsar.getNamespaceService().getOwnedServiceUnits()
.stream()
.map(nb -> nb.toString())
.collect(Collectors.toSet())
.contains(unsplitBundle));

// broker-2 should have this bundle into the cache
TopicName topicName = TopicName.get(topic1);
Expand Down Expand Up @@ -1012,25 +1008,22 @@ public void testModularLoadManagerSplitBundle() throws Exception {
loadManager.checkNamespaceBundleSplit();

// (6) Broker-2 should get the watch and update bundle cache
final int retry = 5;
for (int i = 0; i < retry; i++) {
if (pulsar2.getNamespaceService().getBundle(topicName).equals(bundleInBroker2) && i != retry - 1) {
Thread.sleep(200);
} else {
break;
}
}
Awaitility.await().untilAsserted(() -> {
assertNotEquals(pulsar2.getNamespaceService().getBundle(topicName), bundleInBroker2);
});

// (7) Make lookup request again to Broker-2 which should succeed.
final String topic2 = "persistent://" + namespace + "/topic2";
Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2).subscriptionName("my-subscriber-name")
@Cleanup
Consumer<byte[]> consumer2 = pulsarClient2.newConsumer().topic(topic2)
.subscriptionName("my-subscriber-name")
.subscribe();

NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService().getBundle(TopicName.get(topic2));
assertNotEquals(unsplitBundle, bundleInBroker1AfterSplit);

consumer1.close();
consumer2.close();
Awaitility.await().untilAsserted(() -> {
NamespaceBundle bundleInBroker1AfterSplit = pulsar2.getNamespaceService()
.getBundle(TopicName.get(topic2));
assertNotEquals(bundleInBroker1AfterSplit.toString(), unsplitBundle);
});
} finally {
conf.setLoadManagerClassName(loadBalancerName);
}
Expand Down

0 comments on commit 341fe5e

Please sign in to comment.