Skip to content

Commit

Permalink
[FLINK-18259][tests] Increase heartbeat timeouts for HeartbeatManager…
Browse files Browse the repository at this point in the history
…Test

Increasing the heartbeat timeouts should harden the tests in case of slow
testing machines.

This closes apache#12612.
  • Loading branch information
tillrohrmann committed Jun 12, 2020
1 parent e026274 commit 4d8f55e
Showing 1 changed file with 11 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
*/
public class HeartbeatManagerTest extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class);
public static final long HEARTBEAT_INTERVAL = 50L;
public static final long HEARTBEAT_TIMEOUT = 200L;

/**
* Tests that regular heartbeat signal triggers the right callback functions in the
Expand Down Expand Up @@ -145,9 +147,7 @@ public void testHeartbeatMonitorUpdate() {
*/
@Test
public void testHeartbeatTimeout() throws Exception {
long heartbeatTimeout = 100L;
int numHeartbeats = 6;
long heartbeatInterval = 20L;
final int payload = 42;

ResourceID ownResourceID = new ResourceID("foobar");
Expand All @@ -160,7 +160,7 @@ public void testHeartbeatTimeout() throws Exception {
.createNewTestingHeartbeatListener();

HeartbeatManagerImpl<Integer, Integer> heartbeatManager = new HeartbeatManagerImpl<>(
heartbeatTimeout,
HEARTBEAT_TIMEOUT,
ownResourceID,
heartbeatListener,
TestingUtils.defaultScheduledExecutor(),
Expand All @@ -173,12 +173,12 @@ public void testHeartbeatTimeout() throws Exception {

for (int i = 0; i < numHeartbeats; i++) {
heartbeatManager.receiveHeartbeat(targetResourceID, payload);
Thread.sleep(heartbeatInterval);
Thread.sleep(HEARTBEAT_INTERVAL);
}

assertFalse(timeoutFuture.isDone());

ResourceID timeoutResourceID = timeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
ResourceID timeoutResourceID = timeoutFuture.get(2 * HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);

assertEquals(targetResourceID, timeoutResourceID);
}
Expand All @@ -193,8 +193,6 @@ public void testHeartbeatTimeout() throws Exception {
*/
@Test
public void testHeartbeatCluster() throws Exception {
long heartbeatTimeout = 100L;
long heartbeatPeriod = 20L;
ResourceID resourceIdTarget = new ResourceID("foobar");
ResourceID resourceIDSender = new ResourceID("barfoo");
final int targetPayload = 42;
Expand All @@ -214,15 +212,15 @@ public void testHeartbeatCluster() throws Exception {
.createNewTestingHeartbeatListener();

HeartbeatManagerImpl<String, Integer> heartbeatManagerTarget = new HeartbeatManagerImpl<>(
heartbeatTimeout,
HEARTBEAT_TIMEOUT,
resourceIdTarget,
heartbeatListenerTarget,
TestingUtils.defaultScheduledExecutor(),
LOG);

HeartbeatManagerSenderImpl<Integer, String> heartbeatManagerSender = new HeartbeatManagerSenderImpl<>(
heartbeatPeriod,
heartbeatTimeout,
HEARTBEAT_INTERVAL,
HEARTBEAT_TIMEOUT,
resourceIDSender,
heartbeatListenerSender,
TestingUtils.defaultScheduledExecutor(),
Expand All @@ -231,17 +229,17 @@ public void testHeartbeatCluster() throws Exception {
heartbeatManagerTarget.monitorTarget(resourceIDSender, heartbeatManagerSender);
heartbeatManagerSender.monitorTarget(resourceIdTarget, heartbeatManagerTarget);

Thread.sleep(2 * heartbeatTimeout);
Thread.sleep(2 * HEARTBEAT_TIMEOUT);

assertFalse(targetHeartbeatTimeoutFuture.isDone());

heartbeatManagerTarget.stop();

ResourceID timeoutResourceID = targetHeartbeatTimeoutFuture.get(2 * heartbeatTimeout, TimeUnit.MILLISECONDS);
ResourceID timeoutResourceID = targetHeartbeatTimeoutFuture.get(2 * HEARTBEAT_TIMEOUT, TimeUnit.MILLISECONDS);

assertThat(timeoutResourceID, is(resourceIdTarget));

int numberHeartbeats = (int) (2 * heartbeatTimeout / heartbeatPeriod);
int numberHeartbeats = (int) (2 * HEARTBEAT_TIMEOUT / HEARTBEAT_INTERVAL);

final Matcher<Integer> numberHeartbeatsMatcher = greaterThanOrEqualTo(numberHeartbeats / 2);
assertThat(numReportPayloadCallsTarget.get(), is(numberHeartbeatsMatcher));
Expand Down

0 comments on commit 4d8f55e

Please sign in to comment.