Skip to content

Commit

Permalink
[FLINK-30301][tests] Use NoOp HeartbeatServices in TaskExecutorTest
Browse files Browse the repository at this point in the history
To introduce the no-op implementation, HeartbeatServices interface is extracted.
Additionally, '-1' constant is moved from various places to HeartbeatManagerOptions.
  • Loading branch information
rkhachatryan committed Jan 25, 2023
1 parent f8105bb commit b4fe4a4
Show file tree
Hide file tree
Showing 23 changed files with 276 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class HeartbeatManagerOptions {
private static final String HEARTBEAT_RPC_FAILURE_THRESHOLD_KEY =
"heartbeat.rpc-failure-threshold";

public static final int FAILED_RPC_DETECTION_DISABLED = -1;

@Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
public static final ConfigOption<Integer> HEARTBEAT_RPC_FAILURE_THRESHOLD =
key(HEARTBEAT_RPC_FAILURE_THRESHOLD_KEY)
Expand All @@ -68,7 +70,9 @@ public class HeartbeatManagerOptions {
TextElement.code(HEARTBEAT_RPC_FAILURE_THRESHOLD_KEY),
TextElement.code(HEARTBEAT_TIMEOUT.key()),
TextElement.code(HEARTBEAT_INTERVAL.key()),
TextElement.code("-1"))
TextElement.code(
Integer.toString(
FAILED_RPC_DETECTION_DISABLED)))
.build());

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.heartbeat;

import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
Expand All @@ -35,9 +36,9 @@
*
* @param <O> Type of the payload being sent to the associated heartbeat target
*/
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {
public class DefaultHeartbeatMonitor<O> implements HeartbeatMonitor<O>, Runnable {

private static final Logger LOG = LoggerFactory.getLogger(HeartbeatMonitorImpl.class);
private static final Logger LOG = LoggerFactory.getLogger(DefaultHeartbeatMonitor.class);

/** Resource ID of the monitored heartbeat target. */
private final ResourceID resourceID;
Expand All @@ -63,7 +64,7 @@ public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {

private volatile long lastHeartbeat;

HeartbeatMonitorImpl(
DefaultHeartbeatMonitor(
ResourceID resourceID,
HeartbeatTarget<O> heartbeatTarget,
ScheduledExecutor scheduledExecutor,
Expand All @@ -82,7 +83,9 @@ public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {
this.heartbeatTimeoutIntervalMs = heartbeatTimeoutIntervalMs;

Preconditions.checkArgument(
failedRpcRequestsUntilUnreachable > 0 || failedRpcRequestsUntilUnreachable == -1,
failedRpcRequestsUntilUnreachable > 0
|| failedRpcRequestsUntilUnreachable
== HeartbeatManagerOptions.FAILED_RPC_DETECTION_DISABLED,
"The number of failed heartbeat RPC requests has to be larger than 0 or -1 (deactivated).");
this.failedRpcRequestsUntilUnreachable = failedRpcRequestsUntilUnreachable;

Expand Down Expand Up @@ -188,7 +191,7 @@ private enum State {
}

/**
* The factory that instantiates {@link HeartbeatMonitorImpl}.
* The factory that instantiates {@link DefaultHeartbeatMonitor}.
*
* @param <O> Type of the outgoing heartbeat payload
*/
Expand All @@ -203,7 +206,7 @@ public HeartbeatMonitor<O> createHeartbeatMonitor(
long heartbeatTimeoutIntervalMs,
int failedRpcRequestsUntilUnreachable) {

return new HeartbeatMonitorImpl<>(
return new DefaultHeartbeatMonitor<>(
resourceID,
heartbeatTarget,
mainThreadExecutor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
* @param <O> Type of the outgoing heartbeat payload
*/
@ThreadSafe
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {
class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {

/** Heartbeat timeout interval in milli seconds. */
private final long heartbeatTimeoutIntervalMs;
Expand Down Expand Up @@ -86,7 +86,7 @@ public HeartbeatManagerImpl(
heartbeatListener,
mainThreadExecutor,
log,
new HeartbeatMonitorImpl.Factory<>());
new DefaultHeartbeatMonitor.Factory<>());
}

public HeartbeatManagerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
* @param <I> Type of the incoming heartbeat payload
* @param <O> Type of the outgoing heartbeat payload
*/
public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O>
implements Runnable {
class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {

private final long heartbeatPeriod;

Expand All @@ -53,7 +52,7 @@ public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O>
heartbeatListener,
mainThreadExecutor,
log,
new HeartbeatMonitorImpl.Factory<>());
new DefaultHeartbeatMonitor.Factory<>());
}

HeartbeatManagerSenderImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import org.slf4j.Logger;
Expand All @@ -30,35 +29,7 @@
* HeartbeatServices gives access to all services needed for heartbeating. This includes the
* creation of heartbeat receivers and heartbeat senders.
*/
public class HeartbeatServices {

/** Heartbeat interval for the created services. */
protected final long heartbeatInterval;

/** Heartbeat timeout for the created services. */
protected final long heartbeatTimeout;

protected final int failedRpcRequestsUntilUnreachable;

public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
this(heartbeatInterval, heartbeatTimeout, -1);
}

public HeartbeatServices(
long heartbeatInterval, long heartbeatTimeout, int failedRpcRequestsUntilUnreachable) {
Preconditions.checkArgument(
0L < heartbeatInterval, "The heartbeat interval must be larger than 0.");
Preconditions.checkArgument(
heartbeatInterval <= heartbeatTimeout,
"The heartbeat timeout should be larger or equal than the heartbeat interval.");
Preconditions.checkArgument(
failedRpcRequestsUntilUnreachable > 0 || failedRpcRequestsUntilUnreachable == -1,
"The number of failed heartbeat RPC requests has to be larger than 0 or -1 (deactivated).");

this.heartbeatInterval = heartbeatInterval;
this.heartbeatTimeout = heartbeatTimeout;
this.failedRpcRequestsUntilUnreachable = failedRpcRequestsUntilUnreachable;
}
public interface HeartbeatServices {

/**
* Creates a heartbeat manager which does not actively send heartbeats.
Expand All @@ -72,20 +43,11 @@ public HeartbeatServices(
* @param <O> Type of the outgoing payload
* @return A new HeartbeatManager instance
*/
public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
<I, O> HeartbeatManager<I, O> createHeartbeatManager(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {

return new HeartbeatManagerImpl<>(
heartbeatTimeout,
failedRpcRequestsUntilUnreachable,
resourceId,
heartbeatListener,
mainThreadExecutor,
log);
}
Logger log);

/**
* Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
Expand All @@ -100,37 +62,31 @@ public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
* @param <O> Type of the outgoing payload
* @return A new HeartbeatManager instance which actively sends heartbeats
*/
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
<I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {

return new HeartbeatManagerSenderImpl<>(
heartbeatInterval,
heartbeatTimeout,
failedRpcRequestsUntilUnreachable,
resourceId,
heartbeatListener,
mainThreadExecutor,
log);
}
Logger log);

/**
* Creates an HeartbeatServices instance from a {@link Configuration}.
*
* @param configuration Configuration to be used for the HeartbeatServices creation
* @return An HeartbeatServices instance created from the given configuration
*/
public static HeartbeatServices fromConfiguration(Configuration configuration) {
static HeartbeatServices fromConfiguration(Configuration configuration) {
long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);

long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

int failedRpcRequestsUntilUnreachable =
configuration.get(HeartbeatManagerOptions.HEARTBEAT_RPC_FAILURE_THRESHOLD);

return new HeartbeatServices(
return new HeartbeatServicesImpl(
heartbeatInterval, heartbeatTimeout, failedRpcRequestsUntilUnreachable);
}

static HeartbeatServices noOp() {
return NoOpHeartbeatServices.getInstance();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.heartbeat;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import org.slf4j.Logger;

import static org.apache.flink.configuration.HeartbeatManagerOptions.FAILED_RPC_DETECTION_DISABLED;

/** A default {@link HeartbeatServices} implementation. */
public final class HeartbeatServicesImpl implements HeartbeatServices {

/** Heartbeat interval for the created services. */
private final long heartbeatInterval;

/** Heartbeat timeout for the created services. */
private final long heartbeatTimeout;

private final int failedRpcRequestsUntilUnreachable;

public HeartbeatServicesImpl(long heartbeatInterval, long heartbeatTimeout) {
this(heartbeatInterval, heartbeatTimeout, FAILED_RPC_DETECTION_DISABLED);
}

public HeartbeatServicesImpl(
long heartbeatInterval, long heartbeatTimeout, int failedRpcRequestsUntilUnreachable) {
Preconditions.checkArgument(
0L < heartbeatInterval, "The heartbeat interval must be larger than 0.");
Preconditions.checkArgument(
heartbeatInterval <= heartbeatTimeout,
"The heartbeat timeout should be larger or equal than the heartbeat interval.");
Preconditions.checkArgument(
failedRpcRequestsUntilUnreachable > 0
|| failedRpcRequestsUntilUnreachable == FAILED_RPC_DETECTION_DISABLED,
"The number of failed heartbeat RPC requests has to be larger than 0 or -1 (deactivated).");

this.heartbeatInterval = heartbeatInterval;
this.heartbeatTimeout = heartbeatTimeout;
this.failedRpcRequestsUntilUnreachable = failedRpcRequestsUntilUnreachable;
}

@Override
public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {

return new HeartbeatManagerImpl<>(
heartbeatTimeout,
failedRpcRequestsUntilUnreachable,
resourceId,
heartbeatListener,
mainThreadExecutor,
log);
}

@Override
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {

return new HeartbeatManagerSenderImpl<>(
heartbeatInterval,
heartbeatTimeout,
failedRpcRequestsUntilUnreachable,
resourceId,
heartbeatListener,
mainThreadExecutor,
log);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.heartbeat;

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import org.slf4j.Logger;

/** {@link HeartbeatServices} implementation which does nothing. */
public class NoOpHeartbeatServices implements HeartbeatServices {
private static final NoOpHeartbeatServices INSTANCE = new NoOpHeartbeatServices();

private NoOpHeartbeatServices() {}

@Override
public <I, O> HeartbeatManager<I, O> createHeartbeatManager(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {
return NoOpHeartbeatManager.getInstance();
}

@Override
public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(
ResourceID resourceId,
HeartbeatListener<I, O> heartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {
return NoOpHeartbeatManager.getInstance();
}

public static NoOpHeartbeatServices getInstance() {
return INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.dispatcher.cleanup.CheckpointResourcesCleanupRunnerFactory;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResultStore;
import org.apache.flink.runtime.jobmanager.StandaloneJobGraphStore;
Expand Down Expand Up @@ -88,7 +89,7 @@ static void awaitStatus(DispatcherGateway dispatcherGateway, JobID jobId, JobSta

@Before
public void setUp() throws Exception {
heartbeatServices = new HeartbeatServices(1000L, 10000L);
heartbeatServices = new HeartbeatServicesImpl(1000L, 10000L);

haServices = new TestingHighAvailabilityServices();
haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
Expand Down
Loading

0 comments on commit b4fe4a4

Please sign in to comment.