Skip to content

Commit

Permalink
Attempt to fix flakiness in ZKSessionTest and quarantine the test (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari authored Dec 2, 2021
1 parent 7adfffd commit 9eaf2b5
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ public final void setup() throws Exception {
@Override
public final void cleanup() throws Exception {
markCurrentSetupNumberCleaned();
zks.close();
if (zks != null) {
zks.close();
zks = null;
}
}

private static String createTempFolder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

@Slf4j
public class TestZKServer implements AutoCloseable {
public static final int TICK_TIME = 1000;
protected ZooKeeperServer zks;
private final File zkDataDir;
private ServerCnxnFactory serverFactory;
Expand All @@ -63,7 +64,7 @@ public TestZKServer() throws Exception {
}

public void start() throws Exception {
this.zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME);
this.zks = new ZooKeeperServer(zkDataDir, zkDataDir, TICK_TIME);
this.serverFactory = new NIOServerCnxnFactory();
this.serverFactory.configure(new InetSocketAddress(zkPort), 1000);
this.serverFactory.startup(zks, true);
Expand Down Expand Up @@ -97,16 +98,28 @@ public void checkContainers() throws Exception {
}

public void stop() throws Exception {
if (zks != null) {
zks.shutdown();
zks.getZKDatabase().close();
zks = null;
if (containerManager != null) {
containerManager.stop();
containerManager = null;
}

if (serverFactory != null) {
serverFactory.shutdown();
serverFactory = null;
}

if (zks != null) {
SessionTracker sessionTracker = zks.getSessionTracker();
zks.shutdown();
zks.getZKDatabase().close();
if (sessionTracker instanceof Thread) {
Thread sessionTrackerThread = (Thread) sessionTracker;
sessionTrackerThread.interrupt();
sessionTrackerThread.join();
}
zks = null;
}

log.info("Stopped test ZK server");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,6 +40,7 @@
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

@Test(groups = "quarantine")
public class ZKSessionTest extends BaseMetadataStoreTest {

@Test
Expand Down Expand Up @@ -70,7 +72,7 @@ public void testSessionLost() throws Exception {
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(zks.getConnectionString(),
MetadataStoreConfig.builder()
.sessionTimeoutMillis(10_000)
.sessionTimeoutMillis(5_000)
.build());

BlockingQueue<SessionEvent> sessionEvents = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -166,13 +168,13 @@ public void testReacquireLeadershipAfterSessionLost() throws Exception {
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.SessionLost);
// --- test le1 can be leader
Awaitility.await()
Awaitility.await().atMost(Duration.ofSeconds(15))
.untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading)); // reacquire leadership
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.Reconnected);
e = sessionEvents.poll(10, TimeUnit.SECONDS);
assertEquals(e, SessionEvent.SessionReestablished);
Awaitility.await()
Awaitility.await().atMost(Duration.ofSeconds(15))
.untilAsserted(()-> assertEquals(le1.getState(),LeaderElectionState.Leading));
assertTrue(store.get(path).join().isPresent());
}
Expand Down

0 comments on commit 9eaf2b5

Please sign in to comment.