Skip to content

Commit

Permalink
[FLINK-7856][flip6] Port JobVertexBackPressureHandler to REST endpoint
Browse files Browse the repository at this point in the history
This closes apache#5397.
  • Loading branch information
GJL authored and tillrohrmann committed Feb 6, 2018
1 parent 7137d61 commit c1280a5
Show file tree
Hide file tree
Showing 26 changed files with 671 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,11 @@ public WebRuntimeMonitor(
Time delayBetweenSamples = Time.milliseconds(delay);

backPressureStatsTracker = new BackPressureStatsTracker(
stackTraceSamples, cleanUpInterval, numSamples, delayBetweenSamples);
stackTraceSamples,
cleanUpInterval,
numSamples,
config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
delayBetweenSamples);

// --------------------------------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
Expand All @@ -49,6 +50,7 @@
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -364,6 +366,18 @@ public CompletableFuture<JobStatus> requestJobStatus(JobID jobId, Time timeout)
}
}

@Override
public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(
final JobID jobId, final JobVertexID jobVertexId) {
final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
if (jobManagerRunner == null) {
return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
} else {
return jobManagerRunner.getJobManagerGateway()
.requestOperatorBackPressureStats(jobId, jobVertexId);
}
}

@Override
public CompletableFuture<ArchivedExecutionGraph> requestJob(JobID jobId, Time timeout) {
final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
import org.apache.flink.runtime.rest.handler.job.JobVertexBackPressureHandler;
import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
Expand Down Expand Up @@ -102,17 +100,9 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
timeout,
responseHeaders);

JobVertexBackPressureHandler jobVertexBackPressureHandler = new JobVertexBackPressureHandler(
restAddressFuture,
leaderRetriever,
timeout,
responseHeaders,
JobVertexBackPressureHeaders.getInstance());

handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
handlers.add(Tuple2.of(JobVertexBackPressureHeaders.getInstance(), jobVertexBackPressureHandler));

return handlers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ public JobManagerRunner(
this,
userCodeLoader,
restAddress,
metricRegistry.getMetricQueryServicePath());
metricRegistry.getMetricQueryServicePath(),
jobManagerServices.backPressureStatsTracker);

this.timeout = jobManagerServices.rpcAskTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.util.ExceptionUtils;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.FiniteDuration;

Expand All @@ -51,16 +55,29 @@ public class JobManagerServices {

public final Time rpcAskTimeout;

private final StackTraceSampleCoordinator stackTraceSampleCoordinator;
public final BackPressureStatsTracker backPressureStatsTracker;

public JobManagerServices(
ScheduledExecutorService executorService,
BlobLibraryCacheManager libraryCacheManager,
RestartStrategyFactory restartStrategyFactory,
Time rpcAskTimeout) {
Time rpcAskTimeout,
StackTraceSampleCoordinator stackTraceSampleCoordinator,
BackPressureStatsTracker backPressureStatsTracker) {

this.executorService = checkNotNull(executorService);
this.libraryCacheManager = checkNotNull(libraryCacheManager);
this.restartStrategyFactory = checkNotNull(restartStrategyFactory);
this.rpcAskTimeout = checkNotNull(rpcAskTimeout);
this.stackTraceSampleCoordinator = checkNotNull(stackTraceSampleCoordinator);
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);

executorService.scheduleWithFixedDelay(
backPressureStatsTracker::cleanUpOperatorStatsCache,
backPressureStatsTracker.getCleanUpInterval(),
backPressureStatsTracker.getCleanUpInterval(),
TimeUnit.MILLISECONDS);
}

/**
Expand All @@ -83,6 +100,9 @@ public void shutdown() throws Exception {

libraryCacheManager.shutdown();

stackTraceSampleCoordinator.shutDown();
backPressureStatsTracker.shutDown();

if (firstException != null) {
ExceptionUtils.rethrowException(firstException, "Error while shutting down JobManager services");
}
Expand Down Expand Up @@ -123,10 +143,21 @@ public static JobManagerServices fromConfiguration(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("jobmanager-future"));

final StackTraceSampleCoordinator stackTraceSampleCoordinator =
new StackTraceSampleCoordinator(futureExecutor, timeout.toMillis());
final BackPressureStatsTracker backPressureStatsTracker = new BackPressureStatsTracker(
stackTraceSampleCoordinator,
config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL),
config.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES),
config.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL),
Time.milliseconds(config.getInteger(WebOptions.BACKPRESSURE_DELAY)));

return new JobManagerServices(
futureExecutor,
libraryCacheManager,
RestartStrategyFactory.createRestartStrategyFactory(config),
Time.of(timeout.length(), timeout.unit()));
Time.of(timeout.length(), timeout.unit()),
stackTraceSampleCoordinator,
backPressureStatsTracker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.ResourceOverview;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStatsResponse;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
Expand Down Expand Up @@ -185,6 +188,10 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast

private final String metricQueryServicePath;

// --------- BackPressure --------

private final BackPressureStatsTracker backPressureStatsTracker;

// --------- ResourceManager --------

/** Leader retriever service used to locate ResourceManager's address. */
Expand Down Expand Up @@ -215,7 +222,8 @@ public JobMaster(
FatalErrorHandler errorHandler,
ClassLoader userCodeLoader,
@Nullable String restAddress,
@Nullable String metricQueryServicePath) throws Exception {
@Nullable String metricQueryServicePath,
BackPressureStatsTracker backPressureStatsTracker) throws Exception {

super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));

Expand Down Expand Up @@ -298,6 +306,7 @@ public JobMaster(
.orElse(FutureUtils.completedExceptionally(new JobMasterException("The JobMaster has not been started with a REST endpoint.")));

this.metricQueryServicePath = metricQueryServicePath;
this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker);
}

//----------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -881,6 +890,21 @@ public CompletableFuture<String> triggerSavepoint(
}
}

@Override
public CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(
final JobID jobId, final JobVertexID jobVertexId) {
final ExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexId);
if (jobVertex == null) {
return FutureUtils.completedExceptionally(new FlinkException("JobVertexID not found " +
jobVertexId));
}

final Optional<OperatorBackPressureStats> operatorBackPressureStats =
backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
return CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(
operatorBackPressureStats.orElse(null)));
}

//----------------------------------------------------------------------------------------------
// Internal methods
//----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,13 @@ public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample(
int maxStackTraceDepth,
Time timeout) {

throw new UnsupportedOperationException("Operation is not yet supported.");
return taskExecutorGateway.requestStackTraceSample(
executionAttemptID,
sampleId,
numSamples,
delayBetweenSamples,
maxStackTraceDepth,
timeout);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,87 @@

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.messages.EmptyJobVertexBackPressureInfo;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import javax.annotation.Nonnull;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Request handler for the job vertex back pressure.
*/
public class JobVertexBackPressureHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters> {
public class JobVertexBackPressureHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters> {

public JobVertexBackPressureHandler(
CompletableFuture<String> localRestAddress,
GatewayRetriever<DispatcherGateway> leaderRetriever,
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Time timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, JobVertexBackPressureInfo, JobVertexMessageParameters> messageHeaders) {
super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders);
}

@Override
protected CompletableFuture<JobVertexBackPressureInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
JobID jobId = request.getPathParameter(JobIDPathParameter.class);
JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
///TODO Get JobVertexBackPressureInfo from DispatcherGateway with JobID and JobVertexID here
protected CompletableFuture<JobVertexBackPressureInfo> handleRequest(
@Nonnull HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request,
@Nonnull RestfulGateway gateway) throws RestHandlerException {
final JobID jobId = request.getPathParameter(JobIDPathParameter.class);
final JobVertexID jobVertexId = request.getPathParameter(JobVertexIdPathParameter.class);
return gateway
.requestOperatorBackPressureStats(jobId, jobVertexId)
.thenApply(
operatorBackPressureStats ->
operatorBackPressureStats.getOperatorBackPressureStats().map(
JobVertexBackPressureHandler::createJobVertexBackPressureInfo).orElse(
JobVertexBackPressureInfo.deprecated()));
}

private static JobVertexBackPressureInfo createJobVertexBackPressureInfo(
final OperatorBackPressureStats operatorBackPressureStats) {
return new JobVertexBackPressureInfo(
JobVertexBackPressureInfo.VertexBackPressureStatus.OK,
getBackPressureLevel(operatorBackPressureStats.getMaxBackPressureRatio()),
operatorBackPressureStats.getEndTimestamp(),
IntStream.range(0, operatorBackPressureStats.getNumberOfSubTasks())
.mapToObj(subtask -> {
final double backPressureRatio = operatorBackPressureStats.getBackPressureRatio(subtask);
return new JobVertexBackPressureInfo.SubtaskBackPressureInfo(
subtask,
getBackPressureLevel(backPressureRatio),
backPressureRatio);
})
.collect(Collectors.toList()));
}

return CompletableFuture.completedFuture(EmptyJobVertexBackPressureInfo.getInstance());
/**
* Returns the back pressure level as a String.
*
* @param backPressureRatio Ratio of back pressures samples to total number of samples.
*
* @return Back pressure level ('ok', 'low', or 'high')
*/
private static JobVertexBackPressureInfo.VertexBackPressureLevel getBackPressureLevel(double backPressureRatio) {
if (backPressureRatio <= 0.10) {
return JobVertexBackPressureInfo.VertexBackPressureLevel.OK;
} else if (backPressureRatio <= 0.5) {
return JobVertexBackPressureInfo.VertexBackPressureLevel.LOW;
} else {
return JobVertexBackPressureInfo.VertexBackPressureLevel.HIGH;
}
}
}
Loading

0 comments on commit c1280a5

Please sign in to comment.