Skip to content

Commit

Permalink
LIHADOOP-16126: Easy integration of new schedulers to Dr. Elephant
Browse files Browse the repository at this point in the history
RB=676088

G=superfriends-reviewers
R=annag,fli,shanm,viramach
A=annag,shanm
  • Loading branch information
akshayrai committed Mar 14, 2016
1 parent 8d080cf commit 8906b39
Show file tree
Hide file tree
Showing 10 changed files with 572 additions and 118 deletions.
16 changes: 3 additions & 13 deletions app/com/linkedin/drelephant/analysis/AnalyticJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class AnalyticJob {
private int _retries = 0;
private ApplicationType _type;
private String _appId;
private String _jobId;
private String _name;
private String _queueName;
private String _user;
Expand Down Expand Up @@ -154,15 +153,6 @@ public String getAppId() {
return _appId;
}

/**
* Returns the job id
*
* @return The job id
*/
public String getJobId() {
return _jobId;
}

/**
* Returns the name of the analytic job
*
Expand Down Expand Up @@ -260,7 +250,7 @@ public AppResult getAnalysis() throws Exception {
JobType jobType = ElephantContext.instance().matchJobType(data);
String jobTypeName = jobType == null ? UNKNOWN_JOB_TYPE : jobType.getName();

// Load job information
// Load app information
AppResult result = new AppResult();
result.id = Utils.truncateField(getAppId(), AppResult.ID_LIMIT, getAppId());
result.trackingUrl = Utils.truncateField(getTrackingUrl(), AppResult.TRACKING_URL_LIMIT, getAppId());
Expand Down Expand Up @@ -303,8 +293,8 @@ public AppResult getAnalysis() throws Exception {
result.severity = worstSeverity;
result.score = jobScore;

// Retrieve Azkaban execution, flow and jobs URLs from jobData and store them into result.
InfoExtractor.retrieveURLs(result, data);
// Retrieve information from job configuration like scheduler information and store them into result.
InfoExtractor.loadInfo(result, data);

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.linkedin.drelephant.ElephantContext;
import com.linkedin.drelephant.math.Statistics;
import com.linkedin.drelephant.util.Utils;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
Expand Down
131 changes: 131 additions & 0 deletions app/com/linkedin/drelephant/schedulers/AzkabanScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.schedulers;

import com.linkedin.drelephant.util.Utils;
import java.util.Properties;
import org.apache.log4j.Logger;


/**
* This class provides methods to load information specific to the Azkaban scheduler.
*/
public class AzkabanScheduler implements Scheduler {

private static final Logger logger = Logger.getLogger(AzkabanScheduler.class);

public static final String SCHEDULER_NAME = "azkaban";
public static final String AZKABAN_WORKFLOW_URL = "azkaban.link.workflow.url";
public static final String AZKABAN_JOB_URL = "azkaban.link.job.url";
public static final String AZKABAN_EXECUTION_URL = "azkaban.link.execution.url";
public static final String AZKABAN_ATTEMPT_URL = "azkaban.link.attempt.url";
public static final String AZKABAN_JOB_NAME = "azkaban.job.id";

private String jobDefId;
private String jobExecId;
private String flowDefId;
private String flowExecId;

private String jobDefUrl;
private String jobExecUrl;
private String flowDefUrl;
private String flowExecUrl;

private String jobName;
private int workflowDepth;


public AzkabanScheduler(String appId, Properties properties) {
if (properties != null) {
loadInfo(appId, properties);
} else {
// Use default value of data type
}
}

private void loadInfo(String appId, Properties properties) {
// Update the 4 Ids
jobDefId = properties.getProperty(AZKABAN_JOB_URL);
jobExecId = properties.getProperty(AZKABAN_ATTEMPT_URL);
flowDefId = properties.getProperty(AZKABAN_WORKFLOW_URL);
flowExecId = properties.getProperty(AZKABAN_EXECUTION_URL);

// For Azkaban, The url and ids are the same
jobExecUrl = jobExecId;
jobDefUrl = jobDefId;
flowExecUrl = flowExecId;
flowDefUrl = flowDefId;

workflowDepth = 0; // TODO: Add sub-workflow support
jobName = properties.getProperty(AZKABAN_JOB_NAME);
}

@Override
public String getSchedulerName() {
return SCHEDULER_NAME;
}

@Override
public String getJobDefId() {
return jobDefId;
}

@Override
public String getJobExecId() {
return jobExecId;
}

@Override
public String getFlowDefId() {
return flowDefId;
}

@Override
public String getFlowExecId() {
return flowExecId;
}

@Override
public String getJobDefUrl() {
return jobDefUrl;
}

@Override
public String getJobExecUrl() {
return jobExecUrl;
}

@Override
public String getFlowDefUrl() {
return flowDefUrl;
}

@Override
public String getFlowExecUrl() {
return flowExecUrl;
}

@Override
public int getWorkflowDepth() {
return workflowDepth;
}

@Override
public String getJobName() {
return jobName;
}
}
101 changes: 101 additions & 0 deletions app/com/linkedin/drelephant/schedulers/Scheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.schedulers;


/**
* Scheduler interface defining the
*/
public interface Scheduler {

/**
* Return the Scheduler Name
*
* @return the scheduler name
*/
public String getSchedulerName();

/**
* Return the Job Definition Id of the job in the workflow
*
* @return the job definition id
*/
public String getJobDefId();

/**
* Return the Job Execution Id of the job in the workflow
*
* @return the job execution id
*/
public String getJobExecId();

/**
* Return the Flow Definition Id of the workflow
*
* @return the flow definition id
*/
public String getFlowDefId();

/**
* Return the Flow Execution Id of the workflow
*
* @return the flow execution id
*/
public String getFlowExecId();

/**
* Return a link to the job's definition
*
* @return the job definition url
*/
public String getJobDefUrl();

/**
* Return a link to the job's execution
*
* @return the job execution url
*/
public String getJobExecUrl();

/**
* Return a link to the flow's definition
*
* @return the flow definition url
*/
public String getFlowDefUrl();

/**
* Return a link to the flow's execution
*
* @return the flow execution url
*/
public String getFlowExecUrl();

/**
* Return the name of the Job/Action in the Flow
*
* @return the job/action name
*/
public String getJobName();

/**
* Return the workflow depth
*
* @return the workflow depth
*/
public int getWorkflowDepth();
}
Loading

0 comments on commit 8906b39

Please sign in to comment.