Skip to content

Commit

Permalink
Oozie scheduler support (linkedin#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
NoamShaish authored and akshayrai committed Nov 17, 2016
1 parent aa510fe commit c8a9af6
Show file tree
Hide file tree
Showing 10 changed files with 692 additions and 9 deletions.
27 changes: 27 additions & 0 deletions app-conf/SchedulerConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,31 @@
<classname>com.linkedin.drelephant.schedulers.AzkabanScheduler</classname>
</scheduler>

<scheduler>
<name>oozie</name>
<classname>com.linkedin.drelephant.schedulers.OozieScheduler</classname>
<params>
<!-- URL of oozie host -->
<oozie_api_url>http://localhost:11000/oozie</oozie_api_url>

<!-- ### Non mandatory properties ###
### choose authentication method
<oozie_auth_option>KERBEROS/SIMPLE</oozie_auth_option>
### override oozie console url with a template (only parameter will be the id)
<oozie_job_url_template></oozie_job_url_template>
<oozie_job_exec_url_template></oozie_job_exec_url_template>
### (if scheduled jobs are expected make sure to add following templates since oozie doesn't provide their URLS on server v4.1.0)
<oozie_workflow_url_template>http://localhost:11000/oozie/?job=%s</oozie_workflow_url_template>
<oozie_workflow_exec_url_template>http://localhost:11000/oozie/?job=%s</oozie_workflow_exec_url_template>
### Use true if you can assure all app names are unique.
### When true dr-elephant will unit all coordinator runs (in case of coordinator killed and then run again)
<oozie_app_name_uniqueness>false</oozie_app_name_uniqueness>
-->
</params>
</scheduler>


</schedulers>
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@


/**
* The Heuristic Configuration Holder
* Scheduler Configuration Holder
*/
public class SchedulerConfigurationData {
private final String _schedulerName;
Expand Down
259 changes: 259 additions & 0 deletions app/com/linkedin/drelephant/schedulers/OozieScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.schedulers;

import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;
import com.linkedin.drelephant.util.Utils;
import org.apache.log4j.Logger;
import org.apache.oozie.client.*;

import java.util.Properties;


/**
* This class provides methods to load information specific to the Oozie scheduler.
*/
public class OozieScheduler implements Scheduler {

private static final Logger logger = Logger.getLogger(OozieScheduler.class);

private static final String OOZIE_ACTION_ID = "oozie.action.id";

private static final String OOZIE_API_URL = "oozie_api_url";
private static final String OOZIE_AUTH_OPTION = "oozie_auth_option";
private static final String OOZIE_JOB_DEF_URL_TEMPLATE = "oozie_job_url_template";
private static final String OOZIE_JOB_EXEC_URL_TEMPLATE = "oozie_job_exec_url_template";
private static final String OOZIE_WORKFLOW_DEF_URL_TEMPLATE = "oozie_workflow_url_template";
private static final String OOZIE_WORKFLOW_EXEC_URL_TEMPLATE = "oozie_workflow_exec_url_template";
private static final String OOZIE_APP_NAME_UNIQUENESS = "oozie_app_name_uniqueness";
private boolean appNameUniqueness;

private String schedulerName;
private String jobDefId;
private String jobExecId;
private String flowExecId;
private String flowDefId;
private String jobDefIdUrl;
private String jobExecIdUrl;
private String flowExecIdUrl;
private String flowDefIdUrl;
private int workflowDepth;

private OozieClient oozieClient;
private String jobDefUrlTemplate;
private String jobExecUrlTemplate;
private String workflowDefUrlTemplate;
private String workflowExecUrlTemplate;
private String flowDefName;

public OozieScheduler(String appId, Properties properties, SchedulerConfigurationData schedulerConfData) {
this(appId, properties, schedulerConfData, null);
}

public OozieScheduler(String appId, Properties properties, SchedulerConfigurationData schedulerConfData, OozieClient oozieClient) {
schedulerName = schedulerConfData.getSchedulerName();

if (properties != null && properties.getProperty(OOZIE_ACTION_ID) != null) {
this.oozieClient = oozieClient == null ? makeOozieClient(schedulerConfData) : oozieClient;
jobDefUrlTemplate = schedulerConfData.getParamMap().get(OOZIE_JOB_DEF_URL_TEMPLATE);
jobExecUrlTemplate = schedulerConfData.getParamMap().get(OOZIE_JOB_EXEC_URL_TEMPLATE);
workflowDefUrlTemplate = schedulerConfData.getParamMap().get(OOZIE_WORKFLOW_DEF_URL_TEMPLATE);
workflowExecUrlTemplate = schedulerConfData.getParamMap().get(OOZIE_WORKFLOW_EXEC_URL_TEMPLATE);
String appNameUniquenessStr = schedulerConfData.getParamMap().get(OOZIE_APP_NAME_UNIQUENESS);
appNameUniqueness = appNameUniquenessStr != null && Boolean.parseBoolean(appNameUniquenessStr);

loadInfo(properties);
}

// Use default value of data type
}

private void loadInfo(Properties properties) {
// 0004167-160629080632562-oozie-oozi-W@some-action
String actionId = properties.getProperty(OOZIE_ACTION_ID);

if (actionId.contains("@")) {
String workflowId = extractId(actionId);

WorkflowJob workflow;
try {
logger.info("Fetching Oozie workflow info for " + workflowId);

workflow = oozieClient.getJobInfo(workflowId);
logger.info("Oozie workflow for " + workflowId + ": " + workflow);

String superParentId = getSuperParentId(workflow);
logger.info("Oozie super parent for: " + workflowId + ": " + superParentId);

jobExecId = workflow.getId();
jobExecIdUrl = workflow.getConsoleUrl();
jobDefIdUrl = workflow.getConsoleUrl();
flowExecId = superParentId;

if (isCoordinatorJob(superParentId)) {
coordinatedJobInfo(workflow, actionId, superParentId);
} else {
manualCommitedJob(workflow, actionId, superParentId);
}
} catch (OozieClientException e) {
throw new RuntimeException("Failed fetching Oozie workflow " + workflowId + " info", e);
}
}
}

private void manualCommitedJob(WorkflowJob workflow, String actionId, String superParentId) throws OozieClientException {
logger.info("Oozie workflow " + actionId + " was manually submitted");
WorkflowJob flowDefWorkflow = oozieClient.getJobInfo(extractId(superParentId));
flowDefIdUrl = flowDefWorkflow.getConsoleUrl();
flowExecIdUrl = flowDefWorkflow.getConsoleUrl();
if (appNameUniqueness) {
jobDefId = workflow.getAppName() + "-" + extractAction(actionId);
flowDefId = superParentId;
flowDefName = flowDefWorkflow.getAppName();
} else {
jobDefId = workflow.getId();
flowDefId = superParentId;
}
}

private void coordinatedJobInfo(WorkflowJob workflow, String actionId, String superParentId) throws OozieClientException {
logger.info("Oozie workflow " + actionId + " is scheduled with coordinator");
CoordinatorJob flowDefCoordinator = oozieClient.getCoordJobInfo(extractId(superParentId));
flowDefIdUrl = flowDefCoordinator.getConsoleUrl();
flowExecIdUrl = flowDefCoordinator.getConsoleUrl();
if (appNameUniqueness) {
jobDefId = workflow.getAppName() + "-" + extractAction(actionId);
flowDefId = extractId(superParentId);
flowDefName = flowDefCoordinator.getAppName();
} else {
jobDefId = extractId(superParentId) + "-" + extractAction(actionId) + "-" + workflowDepth;
flowDefId = extractId(superParentId);
}
}

private String extractId(String idAndAction) {
return idAndAction.split("@")[0];
}

private String extractAction(String idAndAction) {
return idAndAction.split("@")[1];
}

private String getSuperParentId(WorkflowJob workflow) throws OozieClientException {
String result = workflow.getParentId();
WorkflowJob next = workflow;
workflowDepth = 0;
String parentId = next.getParentId();

while (parentId != null && !parentId.isEmpty() && !isCoordinatorJob(extractId(parentId))) {
next = oozieClient.getJobInfo(parentId);
parentId = next.getParentId();
result = extractId(next.getId());
workflowDepth++;
}

return result;
}

private boolean isCoordinatorJob(String parentId) {
return extractId(parentId).endsWith("C");
}

private OozieClient makeOozieClient(SchedulerConfigurationData schedulerConfData) {
String oozieApiUrl = schedulerConfData.getParamMap().get(OOZIE_API_URL);
String authOption = schedulerConfData.getParamMap().get(OOZIE_AUTH_OPTION);
if (oozieApiUrl == null) {
throw new RuntimeException("Missing " + OOZIE_API_URL + " param for Oozie Scheduler");
}

return new AuthOozieClient(oozieApiUrl, authOption);
}

private String getUrl(String idUrl, String id, String urlTemplate, String propertyName) {
String url;
if (urlTemplate != null) {
url = Utils.formatStringOrNull(urlTemplate, id);
} else if (idUrl != null) {
url = idUrl;
} else {
logger.warn("Missing " + propertyName + " param for Oozie Scheduler");
url = id;
}

return url;
}

@Override
public String getSchedulerName() {
return schedulerName;
}

@Override
public boolean isEmpty() {
return schedulerName == null || jobDefId == null || jobExecId == null || flowDefId == null || flowExecId == null;
}

@Override
public String getJobDefId() {
return Utils.formatStringOrNull("%s", jobDefId);
}

@Override
public String getJobExecId() {
return Utils.formatStringOrNull("%s", jobExecId);
}

@Override
public String getFlowDefId() {
return Utils.formatStringOrNull("%s", appNameUniqueness ? flowDefName : flowDefId);
}

@Override
public String getFlowExecId() {
return Utils.formatStringOrNull("%s", flowExecId);
}

@Override
public String getJobDefUrl() {
return getUrl(jobDefIdUrl, jobDefId, jobDefUrlTemplate, OOZIE_JOB_DEF_URL_TEMPLATE);
}

@Override
public String getJobExecUrl() {
return getUrl(jobExecIdUrl, jobExecId, jobExecUrlTemplate, OOZIE_JOB_EXEC_URL_TEMPLATE);
}

@Override
public String getFlowDefUrl() {
return getUrl(flowDefIdUrl, flowDefId, workflowDefUrlTemplate, OOZIE_WORKFLOW_DEF_URL_TEMPLATE);
}

@Override
public String getFlowExecUrl() {
return getUrl(flowExecIdUrl, flowExecId, workflowExecUrlTemplate, OOZIE_WORKFLOW_EXEC_URL_TEMPLATE);
}

@Override
public int getWorkflowDepth() {
return workflowDepth;
}

@Override
public String getJobName() {
return jobDefId;
}
}
5 changes: 1 addition & 4 deletions app/com/linkedin/drelephant/util/InfoExtractor.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
package com.linkedin.drelephant.util;

import com.linkedin.drelephant.analysis.HadoopApplicationData;
import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfiguration;
import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;
import com.linkedin.drelephant.schedulers.AirflowScheduler;
import com.linkedin.drelephant.schedulers.AzkabanScheduler;
import com.linkedin.drelephant.schedulers.Scheduler;
import com.linkedin.drelephant.spark.data.SparkApplicationData;

Expand Down Expand Up @@ -90,7 +87,7 @@ public static Scheduler getSchedulerInstance(String appId, Properties properties
} catch (IllegalAccessException e) {
throw new RuntimeException("Could not access constructor for class" + data.getClassName(), e);
} catch (RuntimeException e) {
throw new RuntimeException(data.getClassName() + " is not a valid Fetcher class.", e);
throw new RuntimeException(data.getClassName() + " is not a valid Scheduler class.", e);
} catch (InvocationTargetException e) {
throw new RuntimeException("Could not invoke class " + data.getClassName(), e);
} catch (NoSuchMethodException e) {
Expand Down
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public static Map<String, String> parseJavaOptions(String str) {
Map<String, String> options = new HashMap<String, String>();
String[] tokens = str.trim().split("\\s");
for (String token : tokens) {
if (token.isEmpty()) {
if (token.isEmpty() || token.startsWith("-X")) {
continue;
}
if (!token.startsWith("-D")) {
Expand Down
9 changes: 6 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ object Dependencies {
lazy val jacksonMapperAslVersion = "1.7.3"
lazy val jsoupVersion = "1.7.3"
lazy val mysqlConnectorVersion = "5.1.36"
lazy val oozieClientVersion = "4.2.0"

lazy val HADOOP_VERSION = "hadoopversion"
lazy val SPARK_VERSION = "sparkversion"
Expand Down Expand Up @@ -69,13 +70,15 @@ object Dependencies {
"org.apache.hadoop" % "hadoop-common" % hadoopVersion % Test,
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % "compileonly",
"org.apache.hadoop" % "hadoop-hdfs" % hadoopVersion % Test,
"org.codehaus.jackson" % "jackson-mapper-asl" % jacksonMapperAslVersion,
"org.jsoup" % "jsoup" % jsoupVersion,
"org.apache.oozie" % "oozie-client" % oozieClientVersion excludeAll(
ExclusionRule(organization = "org.apache.hadoop")
),
"io.dropwizard.metrics" % "metrics-core" % "3.1.2",
"io.dropwizard.metrics" % "metrics-healthchecks" % "3.1.2",
"org.mockito" % "mockito-core" % "1.10.19",
"org.mockito" % "mockito-core" % "1.10.19" exclude ("org.hamcrest", "hamcrest-core"),
"org.jmockit" % "jmockit" % "1.23" % Test
) :+ sparkExclusion
) :+ sparkExclusion

var dependencies = Seq(javaJdbc, javaEbean, cache)
dependencies ++= requiredDep
Expand Down
Loading

0 comments on commit c8a9af6

Please sign in to comment.