Skip to content

Commit

Permalink
[hotfix][tests] Expose full heartbeat payload
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 29, 2019
1 parent 85905f8 commit acae782
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway {

private volatile Function<Tuple3<ResourceID, InstanceID, SlotReport>, CompletableFuture<Acknowledge>> sendSlotReportFunction;

private volatile BiConsumer<ResourceID, SlotReport> taskExecutorHeartbeatConsumer;
private volatile BiConsumer<ResourceID, TaskExecutorHeartbeatPayload> taskExecutorHeartbeatConsumer;

private volatile Consumer<Tuple3<InstanceID, SlotID, AllocationID>> notifySlotAvailableConsumer;

Expand Down Expand Up @@ -153,7 +153,7 @@ public void setSendSlotReportFunction(Function<Tuple3<ResourceID, InstanceID, Sl
this.sendSlotReportFunction = sendSlotReportFunction;
}

public void setTaskExecutorHeartbeatConsumer(BiConsumer<ResourceID, SlotReport> taskExecutorHeartbeatConsumer) {
public void setTaskExecutorHeartbeatConsumer(BiConsumer<ResourceID, TaskExecutorHeartbeatPayload> taskExecutorHeartbeatConsumer) {
this.taskExecutorHeartbeatConsumer = taskExecutorHeartbeatConsumer;
}

Expand Down Expand Up @@ -248,10 +248,10 @@ public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {

@Override
public void heartbeatFromTaskManager(ResourceID heartbeatOrigin, TaskExecutorHeartbeatPayload heartbeatPayload) {
final BiConsumer<ResourceID, SlotReport> currentTaskExecutorHeartbeatConsumer = taskExecutorHeartbeatConsumer;
final BiConsumer<ResourceID, TaskExecutorHeartbeatPayload> currentTaskExecutorHeartbeatConsumer = taskExecutorHeartbeatConsumer;

if (currentTaskExecutorHeartbeatConsumer != null) {
currentTaskExecutorHeartbeatConsumer.accept(heartbeatOrigin, heartbeatPayload.getSlotReport());
currentTaskExecutorHeartbeatConsumer.accept(heartbeatOrigin, heartbeatPayload);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ public void testHeartbeatSlotReporting() throws Exception {
});

final CompletableFuture<SlotReport> heartbeatSlotReportFuture = new CompletableFuture<>();
rmGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> heartbeatSlotReportFuture.complete(slotReport));
rmGateway.setTaskExecutorHeartbeatConsumer((resourceID, heartbeatPayload) -> heartbeatSlotReportFuture.complete(heartbeatPayload.getSlotReport()));

rpc.registerGateway(rmAddress, rmGateway);

Expand Down Expand Up @@ -1780,9 +1780,9 @@ public void testSlotReportDoesNotContainStaleInformation() throws Exception {
final OneShotLatch terminateSlotReportVerification = new OneShotLatch();
final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
// Assertions for this test
testingResourceManagerGateway.setTaskExecutorHeartbeatConsumer((ignored, slotReport) -> {
testingResourceManagerGateway.setTaskExecutorHeartbeatConsumer((ignored, heartbeatPayload) -> {
try {
final ArrayList<SlotStatus> slots = Lists.newArrayList(slotReport);
final ArrayList<SlotStatus> slots = Lists.newArrayList(heartbeatPayload.getSlotReport());
assertThat(slots, hasSize(1));
final SlotStatus slotStatus = slots.get(0);

Expand Down

0 comments on commit acae782

Please sign in to comment.