Skip to content

Commit

Permalink
Auto tuning: Support for parameter set multi-try (linkedin#386)
Browse files Browse the repository at this point in the history
  • Loading branch information
arpang authored and akshayrai committed Jul 11, 2018
1 parent 860dbe6 commit dd31ad5
Show file tree
Hide file tree
Showing 32 changed files with 1,269 additions and 843 deletions.
562 changes: 269 additions & 293 deletions app/com/linkedin/drelephant/tuning/AutoTuningAPIHelper.java

Large diffs are not rendered by default.

59 changes: 35 additions & 24 deletions app/com/linkedin/drelephant/tuning/AzkabanJobCompleteDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import java.util.Map;
import models.JobExecution;
import models.JobExecution.ExecutionState;
import models.TuningJobExecution;
import models.TuningJobExecution.ParamSetStatus;
import models.JobSuggestedParamSet;
import models.JobSuggestedParamSet.ParamSetStatus;
import models.TuningJobExecutionParamSet;
import org.apache.log4j.Logger;


Expand All @@ -39,26 +40,27 @@ public class AzkabanJobCompleteDetector extends JobCompleteDetector {
private AzkabanJobStatusUtil _azkabanJobStatusUtil;

public enum AzkabanJobStatus {
FAILED, CANCELLED, KILLED, SUCCEEDED
FAILED, CANCELLED, KILLED, SUCCEEDED, SKIPPED
}

/**
* Returns the list of completed executions
* @param jobExecutions Started Execution list
* @param inProgressExecutionParamSet List of executions (with corresponding param set) in progress
* @return List of completed executions
* @throws MalformedURLException
* @throws URISyntaxException
* @throws MalformedURLException MalformedURLException
* @throws URISyntaxException URISyntaxException
*/
protected List<TuningJobExecution> getCompletedExecutions(List<TuningJobExecution> jobExecutions)
protected List<JobExecution> getCompletedExecutions(List<TuningJobExecutionParamSet> inProgressExecutionParamSet)
throws MalformedURLException, URISyntaxException {
logger.info("Fetching the list of executions completed since last iteration");
List<TuningJobExecution> completedExecutions = new ArrayList<TuningJobExecution>();
List<JobExecution> completedExecutions = new ArrayList<JobExecution>();
try {
for (TuningJobExecution tuningJobExecution : jobExecutions) {
for (TuningJobExecutionParamSet tuningJobExecutionParamSet : inProgressExecutionParamSet) {

JobExecution jobExecution = tuningJobExecution.jobExecution;
JobSuggestedParamSet jobSuggestedParamSet = tuningJobExecutionParamSet.jobSuggestedParamSet;
JobExecution jobExecution = tuningJobExecutionParamSet.jobExecution;

logger.info("Checking current status of started execution: " + tuningJobExecution.jobExecution.jobExecId);
logger.info("Checking current status of started execution: " + jobExecution.jobExecId);

if (_azkabanJobStatusUtil == null) {
logger.info("Initializing AzkabanJobStatusUtil");
Expand All @@ -72,23 +74,32 @@ protected List<TuningJobExecution> getCompletedExecutions(List<TuningJobExecutio
logger.info("Job Found:" + job.getKey() + ". Status: " + job.getValue());
if (job.getKey().equals(jobExecution.job.jobName)) {
if (job.getValue().equals(AzkabanJobStatus.FAILED.toString())) {
tuningJobExecution.paramSetState = ParamSetStatus.EXECUTED;
if (jobSuggestedParamSet.paramSetState.equals(ParamSetStatus.SENT)) {
jobSuggestedParamSet.paramSetState = ParamSetStatus.EXECUTED;
}
jobExecution.executionState = ExecutionState.FAILED;
}
if (job.getValue().equals(AzkabanJobStatus.CANCELLED.toString()) || job.getValue()
.equals(AzkabanJobStatus.KILLED.toString())) {
tuningJobExecution.paramSetState = ParamSetStatus.EXECUTED;
jobExecution.executionState = ExecutionState.CANCELLED;
}
if (job.getValue().equals(AzkabanJobStatus.SUCCEEDED.toString())) {
tuningJobExecution.paramSetState = ParamSetStatus.EXECUTED;
} else if (job.getValue().equals(AzkabanJobStatus.SUCCEEDED.toString())) {
if (jobSuggestedParamSet.paramSetState.equals(ParamSetStatus.SENT)) {
jobSuggestedParamSet.paramSetState = ParamSetStatus.EXECUTED;
}
jobExecution.executionState = ExecutionState.SUCCEEDED;
} else if (job.getValue().equals(AzkabanJobStatus.CANCELLED.toString()) || job.getValue()
.equals(AzkabanJobStatus.KILLED.toString()) || job.getValue()
.equals(AzkabanJobStatus.SKIPPED.toString())) {
if (jobSuggestedParamSet.paramSetState.equals(ParamSetStatus.SENT)) {
jobSuggestedParamSet.paramSetState = ParamSetStatus.EXECUTED;
}
jobExecution.executionState = ExecutionState.CANCELLED;
}
if (tuningJobExecution.paramSetState.equals(ParamSetStatus.EXECUTED)) {
completedExecutions.add(tuningJobExecution);
logger.info("Execution " + tuningJobExecution.jobExecution.jobExecId + " is completed");

if (jobExecution.executionState.equals(ExecutionState.SUCCEEDED) || jobExecution.executionState.equals(
ExecutionState.FAILED) || jobExecution.executionState.equals(ExecutionState.CANCELLED)) {
jobExecution.update();
jobSuggestedParamSet.update();
completedExecutions.add(jobExecution);
logger.info("Execution " + jobExecution.jobExecId + " is completed");
} else {
logger.info("Execution " + tuningJobExecution.jobExecution.jobExecId + " is still in running state");
logger.info("Execution " + jobExecution.jobExecId + " is still in running state");
}
}
}
Expand Down
13 changes: 3 additions & 10 deletions app/com/linkedin/drelephant/tuning/BaselineComputeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.linkedin.drelephant.mapreduce.heuristics.CommonConstantsHeuristic;
import com.linkedin.drelephant.util.Utils;
import controllers.AutoTuningMetricsController;
import java.util.ArrayList;
import java.util.List;
import models.TuningJobDefinition;
import org.apache.commons.io.FileUtils;
Expand All @@ -37,9 +36,8 @@
public class BaselineComputeUtil {

private static final Integer NUM_JOBS_FOR_BASELINE_DEFAULT = 30;
private final Logger logger = Logger.getLogger(getClass());
private static final String BASELINE_EXECUTION_COUNT = "baseline.execution.count";

private final Logger logger = Logger.getLogger(getClass());
private Integer _numJobsForBaseline = null;

public BaselineComputeUtil() {
Expand Down Expand Up @@ -90,13 +88,8 @@ private void updateMetrics(List<TuningJobDefinition> tuningJobDefinitions) {
*/
private List<TuningJobDefinition> getJobForBaselineComputation() {
logger.info("Fetching jobs for which baseline metrics need to be computed");
List<TuningJobDefinition> tuningJobDefinitions = new ArrayList<TuningJobDefinition>();
try {
tuningJobDefinitions =
TuningJobDefinition.find.where().eq(TuningJobDefinition.TABLE.averageResourceUsage, null).findList();
} catch (NullPointerException e) {
logger.info("There are no jobs for which baseline has to be computed", e);
}
List<TuningJobDefinition> tuningJobDefinitions =
TuningJobDefinition.find.where().eq(TuningJobDefinition.TABLE.averageResourceUsage, null).findList();
return tuningJobDefinitions;
}

Expand Down
Loading

0 comments on commit dd31ad5

Please sign in to comment.