Skip to content

Commit

Permalink
[FLINK-12863][FLINK-12865] Remove concurrency from HeartbeatManager(S…
Browse files Browse the repository at this point in the history
…ender)Impl

This commit makes the HeartbeatManager implementations to use the RpcEndpoint's
main thread executor. Furthermore, this commit changes the HeartbeatListener
interface to directly return a payload instead of returning a future when
HeartbeatListener#retrievePayload is called.

Since the HeartbeatManager implementations now use the RpcEndpoint's main thread,
we remove the splicing into the RpcEndpoint's main thread in the implementations
of the HeartbeatListeners in the TaskExecutor, JobMaster and ResourceManager
components.

* Add test case for FLINK-12863

The test case JobMasterTest#testAllocatedSlotReportDoesNotContainStaleInformation verifies
that the AllocatedSlotReport does not contain stale information. The test case itself is
probabilistic and needs to be executed several times to produce a failure.

* Add test case for FLINK-12865

The test case TaskExecutorTest#testSlotReportDoesNotContainStaleInformation verifies
that the SlotReport does not contain stale information. The test case itself is
probabilistic and needs to be executed several times to produce a failure.

This closes apache#8783.
  • Loading branch information
tillrohrmann committed Jun 21, 2019
1 parent b96f20e commit a95dac5
Show file tree
Hide file tree
Showing 17 changed files with 954 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
Expand Down Expand Up @@ -319,7 +317,6 @@ static class Context implements AutoCloseable {
*/
class MockResourceManagerRuntimeServices {

public final ScheduledExecutor scheduledExecutor;
public final TestingHighAvailabilityServices highAvailabilityServices;
public final HeartbeatServices heartbeatServices;
public final MetricRegistry metricRegistry;
Expand All @@ -332,11 +329,10 @@ class MockResourceManagerRuntimeServices {
public UUID rmLeaderSessionId;

MockResourceManagerRuntimeServices() throws Exception {
scheduledExecutor = mock(ScheduledExecutor.class);
highAvailabilityServices = new TestingHighAvailabilityServices();
rmLeaderElectionService = new TestingLeaderElectionService();
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
heartbeatServices = new HeartbeatServices(5L, 5L);
metricRegistry = mock(MetricRegistryImpl.class);
slotManager = mock(SlotManager.class);
slotManagerStarted = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@

import org.apache.flink.runtime.clusterframework.types.ResourceID;

import java.util.concurrent.CompletableFuture;

/**
* Interface for the interaction with the {@link HeartbeatManager}. The heartbeat listener is used
* for the following things:
Expand Down Expand Up @@ -54,11 +52,10 @@ public interface HeartbeatListener<I, O> {
void reportPayload(ResourceID resourceID, I payload);

/**
* Retrieves the payload value for the next heartbeat message. Since the operation can happen
* asynchronously, the result is returned wrapped in a future.
* Retrieves the payload value for the next heartbeat message.
*
* @param resourceID Resource ID identifying the receiver of the payload
* @return Future containing the next payload for heartbeats
* @return The payload for the next heartbeat
*/
CompletableFuture<O> retrievePayload(ResourceID resourceID);
O retrievePayload(ResourceID resourceID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import javax.annotation.concurrent.ThreadSafe;

import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -57,34 +55,29 @@ public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
private final HeartbeatListener<I, O> heartbeatListener;

/** Executor service used to run heartbeat timeout notifications. */
private final ScheduledExecutor scheduledExecutor;
private final ScheduledExecutor mainThreadExecutor;

protected final Logger log;

/** Map containing the heartbeat monitors associated with the respective resource ID. */
private final ConcurrentHashMap<ResourceID, HeartbeatManagerImpl.HeartbeatMonitor<O>> heartbeatTargets;

/** Execution context used to run future callbacks. */
private final Executor executor;

/** Running state of the heartbeat manager. */
protected volatile boolean stopped;

public HeartbeatManagerImpl(
long heartbeatTimeoutIntervalMs,
ResourceID ownResourceID,
HeartbeatListener<I, O> heartbeatListener,
Executor executor,
ScheduledExecutor scheduledExecutor,
ScheduledExecutor mainThreadExecutor,
Logger log) {
Preconditions.checkArgument(heartbeatTimeoutIntervalMs > 0L, "The heartbeat timeout has to be larger than 0.");

this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;
this.ownResourceID = Preconditions.checkNotNull(ownResourceID);
this.heartbeatListener = Preconditions.checkNotNull(heartbeatListener, "heartbeatListener");
this.scheduledExecutor = Preconditions.checkNotNull(scheduledExecutor);
this.mainThreadExecutor = Preconditions.checkNotNull(mainThreadExecutor);
this.log = Preconditions.checkNotNull(log);
this.executor = Preconditions.checkNotNull(executor);
this.heartbeatTargets = new ConcurrentHashMap<>(16);

stopped = false;
Expand All @@ -98,10 +91,6 @@ ResourceID getOwnResourceID() {
return ownResourceID;
}

Executor getExecutor() {
return executor;
}

HeartbeatListener<I, O> getHeartbeatListener() {
return heartbeatListener;
}
Expand All @@ -123,7 +112,7 @@ public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTar
HeartbeatManagerImpl.HeartbeatMonitor<O> heartbeatMonitor = new HeartbeatManagerImpl.HeartbeatMonitor<>(
resourceID,
heartbeatTarget,
scheduledExecutor,
mainThreadExecutor,
heartbeatListener,
heartbeatTimeoutIntervalMs);

Expand Down Expand Up @@ -174,6 +163,10 @@ public long getLastHeartbeatFrom(ResourceID resourceId) {
}
}

ScheduledExecutor getMainThreadExecutor() {
return mainThreadExecutor;
}

//----------------------------------------------------------------------------------------------
// HeartbeatTarget methods
//----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -202,21 +195,7 @@ public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload)
heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}

CompletableFuture<O> futurePayload = heartbeatListener.retrievePayload(requestOrigin);

if (futurePayload != null) {
CompletableFuture<Void> sendHeartbeatFuture = futurePayload.thenAcceptAsync(
retrievedPayload -> heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload),
executor);

sendHeartbeatFuture.exceptionally((Throwable failure) -> {
log.warn("Could not send heartbeat to target with id {}.", requestOrigin, failure);

return null;
});
} else {
heartbeatTarget.receiveHeartbeat(ownResourceID, null);
}
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@

import org.slf4j.Logger;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -37,56 +34,42 @@
*/
public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {

private final ScheduledFuture<?> triggerFuture;
private final long heartbeatPeriod;

public HeartbeatManagerSenderImpl(
HeartbeatManagerSenderImpl(
long heartbeatPeriod,
long heartbeatTimeout,
ResourceID ownResourceID,
HeartbeatListener<I, O> heartbeatListener,
Executor executor,
ScheduledExecutor scheduledExecutor,
ScheduledExecutor mainThreadExecutor,
Logger log) {
super(
heartbeatTimeout,
ownResourceID,
heartbeatListener,
executor,
scheduledExecutor,
mainThreadExecutor,
log);

triggerFuture = scheduledExecutor.scheduleAtFixedRate(this, 0L, heartbeatPeriod, TimeUnit.MILLISECONDS);
this.heartbeatPeriod = heartbeatPeriod;
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}

@Override
public void run() {
if (!stopped) {
log.debug("Trigger heartbeat request.");
for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets()) {
CompletableFuture<O> futurePayload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();

if (futurePayload != null) {
CompletableFuture<Void> requestHeartbeatFuture = futurePayload.thenAcceptAsync(
payload -> heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload),
getExecutor());

requestHeartbeatFuture.exceptionally(
(Throwable failure) -> {
log.warn("Could not request the heartbeat from target {}.", heartbeatTarget, failure);

return null;
});
} else {
heartbeatTarget.requestHeartbeat(getOwnResourceID(), null);
}
requestHeartbeat(heartbeatMonitor);
}

getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}

@Override
public void stop() {
triggerFuture.cancel(true);
super.stop();
private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();

heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
* @param resourceId Resource Id which identifies the owner of the heartbeat manager
* @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
* targets
* @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
* @param mainThreadExecutor Scheduled executor to be used for scheduling heartbeat timeouts
* @param log Logger to be used for the logging
* @param <I> Type of the incoming payload
* @param <O> Type of the outgoing payload
Expand All @@ -61,15 +61,14 @@ public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor scheduledExecutor,
ScheduledExecutor mainThreadExecutor,
Logger log) {

return new HeartbeatManagerImpl<>(
heartbeatTimeout,
resourceId,
heartbeatListener,
scheduledExecutor,
scheduledExecutor,
mainThreadExecutor,
log);
}

Expand All @@ -79,7 +78,8 @@ public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
* @param resourceId Resource Id which identifies the owner of the heartbeat manager
* @param heartbeatListener Listener which will be notified upon heartbeat timeouts for registered
* targets
* @param scheduledExecutor Scheduled executor to be used for scheduling heartbeat timeouts
* @param mainThreadExecutor Scheduled executor to be used for scheduling heartbeat timeouts and
* periodically send heartbeat requests
* @param log Logger to be used for the logging
* @param <I> Type of the incoming payload
* @param <O> Type of the outgoing payload
Expand All @@ -88,16 +88,15 @@ public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor scheduledExecutor,
ScheduledExecutor mainThreadExecutor,
Logger log) {

return new HeartbeatManagerSenderImpl<>(
heartbeatInterval,
heartbeatTimeout,
resourceId,
heartbeatListener,
scheduledExecutor,
scheduledExecutor,
mainThreadExecutor,
log);
}

Expand Down
Loading

0 comments on commit a95dac5

Please sign in to comment.