Skip to content

Commit

Permalink
Fix flaky DiscoveryServiceTest (apache#9258)
Browse files Browse the repository at this point in the history
Fix this flaky test DiscoveryServiceTest

The test may result flaky on slow machines because the update of the cache happens asynchronously.

Change:
- wait for the cache to be updated before performing the assertions
- add more information during the failure
- add awaitility test dependency to the pulsar-discovery-service module
 
This is the error, quite frequent on GitHub Actions
```
java.lang.AssertionError: did not expect [pulsar://broker-:150000] but found [pulsar://broker-:150000]
at org.testng.Assert.fail(Assert.java:99)
at org.testng.Assert.failEquals(Assert.java:1041)
at org.testng.Assert.assertNotEqualsImpl(Assert.java:147)
at org.testng.Assert.assertNotEquals(Assert.java:1531)
at org.testng.Assert.assertNotEquals(Assert.java:1535)
at org.apache.pulsar.discovery.service.DiscoveryServiceTest.testBrokerDiscoveryRoundRobin(DiscoveryServiceTest.java:95)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
  • Loading branch information
eolivelli authored Jan 28, 2021
1 parent 4cea22b commit 24f759c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 4 deletions.
6 changes: 6 additions & 0 deletions pulsar-discovery-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,11 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public BrokerDiscoveryProvider(ServiceConfig config, ZooKeeperClientFactory zkCl
* @throws PulsarServerException
*/
LoadManagerReport nextBroker() throws PulsarServerException {
List<LoadManagerReport> availableBrokers = localZkCache.getAvailableBrokers();
List<LoadManagerReport> availableBrokers = getAvailableBrokers();

if (availableBrokers.isEmpty()) {
throw new PulsarServerException("No active broker is available");
Expand All @@ -101,6 +101,11 @@ LoadManagerReport nextBroker() throws PulsarServerException {
}
}

List<LoadManagerReport> getAvailableBrokers() {
List<LoadManagerReport> availableBrokers = localZkCache.getAvailableBrokers();
return availableBrokers;
}

CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(DiscoveryService service,
TopicName topicName, String role, AuthenticationDataSource authenticationData) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.awaitility.Awaitility;

public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {

Expand All @@ -76,7 +77,7 @@ private void init() throws Exception {
super.setup();
}

@AfterMethod
@AfterMethod(alwaysRun = true)
private void clean() throws Exception {
super.cleanup();
}
Expand All @@ -90,9 +91,10 @@ private void clean() throws Exception {
public void testBrokerDiscoveryRoundRobin() throws Exception {
addBrokerToZk(5);
String prevUrl = null;
BrokerDiscoveryProvider discoveryProvider = service.getDiscoveryProvider();
for (int i = 0; i < 10; i++) {
String current = service.getDiscoveryProvider().nextBroker().getPulsarServiceUrl();
assertNotEquals(prevUrl, current);
String current = discoveryProvider.nextBroker().getPulsarServiceUrl();
assertNotEquals(prevUrl, current, "unexpected " + current + " vs " + prevUrl + ", available " + discoveryProvider.getAvailableBrokers());
prevUrl = current;
}
}
Expand Down Expand Up @@ -245,6 +247,9 @@ private void addBrokerToZk(int number) throws Exception {
ZooKeeperChildrenCache availableBrokersCache = (ZooKeeperChildrenCache) field
.get(service.getDiscoveryProvider().localZkCache);
availableBrokersCache.reloadCache(LOADBALANCE_BROKERS_ROOT);

Awaitility.await().until(()
-> service.getDiscoveryProvider().getAvailableBrokers().size() == number);
}

}

0 comments on commit 24f759c

Please sign in to comment.