Skip to content

Commit

Permalink
Show exceptions for failed workflows (linkedin#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
nntnag17 authored Jan 24, 2017
1 parent 07529ed commit 29844c9
Show file tree
Hide file tree
Showing 44 changed files with 2,707 additions and 54 deletions.
8 changes: 8 additions & 0 deletions app-conf/SchedulerConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@
<scheduler>
<name>azkaban</name>
<classname>com.linkedin.drelephant.schedulers.AzkabanScheduler</classname>
<params>
<!--<exception_enabled>true</exception_enabled>-->
<!--<exception_class>com.linkedin.drelephant.exceptions.azkaban.AzkabanWorkflowClient</exception_class>-->
<!--<private_key>/home/key/private_key</private_key>-->
<!--<userame>elephant</userame>-->
<!--<password></password>-->
</params>

</scheduler>

<scheduler>
Expand Down
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private void loadAnalyticJobGenerator() {
public void run() {
logger.info("Dr.elephant has started");
try {
_hadoopSecurity = new HadoopSecurity();
_hadoopSecurity = HadoopSecurity.getInstance();
_hadoopSecurity.doAs(new PrivilegedAction<Void>() {
@Override
public Void run() {
Expand Down
111 changes: 111 additions & 0 deletions app/com/linkedin/drelephant/exceptions/EventException.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.exceptions;

import org.apache.log4j.Logger;

import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;


/**
* This class represents an exception in the exception chain(a list of exceptions)
*/

public class EventException {
private final Logger logger = Logger.getLogger(EventException.class);


Pattern stackTraceLinePattern = Pattern.compile("^[\\\\t \\t]*at (.+)\\.(.+(?=\\())\\((.*)\\)");
/**
Example string: '\tat org.testng.Assert.fail(Assert.java:89)'
matches: ['org.testng.Assert', 'fail', "Assert.java:89']
*/


Pattern exceptionDetailsPattern = Pattern.compile("^([^() :]*): (.*)");
/**
Example string: 'java.lang.AssertionError: Failure 1 expected:<true> but was:<false>'
matches: ['java.lang.AssertionError','Failure 1 expected:<true> but was:<false>']
*/


Pattern separateLinesPattern = Pattern.compile(".*\\n");
private String _type;
private int _index;
private String _message;
private List<StackTraceFrame> _stackTrace;

public EventException(int index, String rawEventException) {
this._index = index;
processRawString(rawEventException);
}

/**
* Returns the message in EventException
* @return message in event exception
*/
public String getMessage() {
return _message;
}

/**
* Process a raw exception string and sets the field of EventException Object
* @param rawEventException exception in a string form
*/
private void processRawString(String rawEventException) {
int frameIndex = 0;
List<StackTraceFrame> stackTrace = new ArrayList<StackTraceFrame>();
List<String> lines = stringToListOfLines(rawEventException);

for (String line : lines) {
Matcher exceptionDetailsMatcher = exceptionDetailsPattern.matcher(line);
if (exceptionDetailsMatcher.find()) {
this._type = exceptionDetailsMatcher.group(1);
this._message = exceptionDetailsMatcher.group(2);
} else {
Matcher stackTraceLineMatcher = stackTraceLinePattern.matcher(line);
if (stackTraceLineMatcher.find()) {
String source = stackTraceLineMatcher.group(1);
String call = stackTraceLineMatcher.group(2);
String fileDetails = stackTraceLineMatcher.group(3);
StackTraceFrame stackTraceFrame = new StackTraceFrame(frameIndex, source, call, fileDetails);
stackTrace.add(stackTraceFrame);
frameIndex += 1;
}
}
}
this._stackTrace = stackTrace;
}

/**
* Takes a exception in string form and converts it into a list of string where each string corresponds to a line in
* exception
* @param rawEventException exception in a string form
* @return list of lines in the exception
*/
private List<String> stringToListOfLines(String rawEventException) {
Matcher separateLinesMatcher = separateLinesPattern.matcher(rawEventException);
List<String> lines = new ArrayList<String>();
while (separateLinesMatcher.find()) {
lines.add(separateLinesMatcher.group());
}
return lines;
}
}
227 changes: 227 additions & 0 deletions app/com/linkedin/drelephant/exceptions/ExceptionFinder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
/*
* Copyright 2016 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.linkedin.drelephant.exceptions;

import com.linkedin.drelephant.configurations.scheduler.SchedulerConfigurationData;
import com.linkedin.drelephant.security.HadoopSecurity;
import com.linkedin.drelephant.util.InfoExtractor;
import java.io.File;
import java.io.IOException;
import java.security.PrivilegedAction;
import javax.naming.AuthenticationException;
import org.apache.log4j.Logger;

import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;


/**
* ExceptionFinder class finds the exception along with the level of the exception. It takes the scheduler and the url of the workflow as
* parameters.
*/
public class ExceptionFinder {
private final Logger logger = Logger.getLogger(ExceptionFinder.class);
private HadoopException _exception;
private WorkflowClient _workflowClient;
private MRClient _mrClient;

private static String USERNAME = "username";
private static String PRIVATE_KEY = "private_key";
private static String PASSWORD = "password";
private static int SAMPLE_SIZE = 3;

/**
* Constructor for ExceptionFinder class
* @param url The url of the workflow to analyze
* @param scheduler The scheduler where the workflow was run.
* @throws URISyntaxException
* @throws MalformedURLException
*/
public ExceptionFinder(String url, String scheduler)
throws URISyntaxException, MalformedURLException, AuthenticationException, IOException {

// create a new MRClient
_mrClient = new MRClient();

// create a new workflow client
_workflowClient = InfoExtractor.getWorkflowClientInstance(scheduler, url);

// get the schedulerData
SchedulerConfigurationData schedulerData = InfoExtractor.getSchedulerData(scheduler);


if(schedulerData==null) {
throw new RuntimeException(String.format("Cannot find scheduler %s", scheduler));
}

if(!schedulerData.getParamMap().containsKey(USERNAME)) {
throw new RuntimeException(String.format("Cannot find username for login"));
}

String username = schedulerData.getParamMap().get(USERNAME);

if(schedulerData.getParamMap().containsKey(PRIVATE_KEY)) {
_workflowClient.login(username, new File(schedulerData.getParamMap().get(PRIVATE_KEY)));
} else if (schedulerData.getParamMap().containsKey(PASSWORD)) {
_workflowClient.login(username, schedulerData.getParamMap().get(PASSWORD));
} else {
throw new RuntimeException("Neither private key nor password was specified");
}
_exception = analyzeFlow(url);
}

/**
* Analyzes a Flow and returns a HadoopException object which captures all the exception in the flow.
* @param execUrl the execution URL of the flow
* @return HadoopException object which captures all the exceptions in the given Flow
*/
private HadoopException analyzeFlow(final String execUrl) throws AuthenticationException, IOException {
HadoopSecurity _hadoopSecurity = HadoopSecurity.getInstance();

return _hadoopSecurity.doAs(new PrivilegedAction<HadoopException>() {
@Override
public HadoopException run() {
HadoopException flowLevelException = new HadoopException();
List<HadoopException> childExceptions = new ArrayList<HadoopException>();
Map<String, String> jobIdStatus = _workflowClient.getJobsFromFlow();

// Find exceptions in all the unsuccessful jobs of the workflow
for (String unsuccessfulJobId : jobIdStatus.keySet()) {
if (jobIdStatus.get(unsuccessfulJobId).toLowerCase().equals("failed")) {
HadoopException jobLevelException = analyzeJob(unsuccessfulJobId);
childExceptions.add(jobLevelException);
}
}

flowLevelException.setType(HadoopException.HadoopExceptionType.FLOW);
flowLevelException.setId(execUrl);
flowLevelException.setLoggingEvent(null); // No flow level exception
flowLevelException.setChildExceptions(childExceptions);
return flowLevelException;
}
});
}

/**
* Given a failed Job, this method analyzes the job and returns a HadoopException object which captures all the exception in the given job.
* @param jobId The job execution id/url, specific to the scheduler
* @return HadoopException object which captures all the exceptions in the given job
*/
private HadoopException analyzeJob(String jobId) {
HadoopException jobLevelException = new HadoopException();
List<HadoopException> childExceptions = new ArrayList<HadoopException>();

_workflowClient.analyzeJob(jobId);

// get the set of all the yarn jobs from workflowClient
Set<String> yarnJobIds = _workflowClient.getYarnApplicationsFromJob(jobId);

for (String mrJobId : yarnJobIds) {
//To do: Check if mr job logs are there or not in job history server
String rawMRJobLog = _mrClient.getMRJobLog(mrJobId);
if (rawMRJobLog != null && !rawMRJobLog.isEmpty()) { // null for log not found and empty for successful mr jobs
//To do: rawMRJob is empty for successful mr jobs but this is not a good way to figure out whether a job failed
// or succeeded, do this using the state field in rest api
HadoopException mrJobLevelException = analyzeMRJob(mrJobId, rawMRJobLog);
childExceptions.add(mrJobLevelException);
}
}

if (_workflowClient.getJobState(jobId) == JobState.MRFAIL) {
jobLevelException.setType(HadoopException.HadoopExceptionType.MR);
jobLevelException.setLoggingEvent(_workflowClient.getJobException(jobId));
//LoggingEvent is set only for the case if mr logs could not be found in job history server and childException is
// empty
jobLevelException.setChildExceptions(childExceptions);
} else if (_workflowClient.getJobState(jobId) == JobState.SCHEDULERFAIL) {
jobLevelException.setType(HadoopException.HadoopExceptionType.SCHEDULER);
jobLevelException.setLoggingEvent(_workflowClient.getJobException(jobId));
jobLevelException.setChildExceptions(null);
} else if (_workflowClient.getJobState(jobId) == JobState.SCRIPTFAIL) {
jobLevelException.setType(HadoopException.HadoopExceptionType.SCRIPT);
jobLevelException.setLoggingEvent(_workflowClient.getJobException(jobId));
jobLevelException.setChildExceptions(null);
} else if (_workflowClient.getJobState(jobId) == JobState.KILLED) {
jobLevelException.setType(HadoopException.HadoopExceptionType.KILL);
jobLevelException.setLoggingEvent(null);
jobLevelException.setChildExceptions(null);
}
jobLevelException.setId(jobId);
return jobLevelException;
}

/**
* Given a failed MR Job id and diagnostics of the job, this method analyzes it and returns a HadoopException object which captures all the exception in the given MR Job.
* @param mrJobId Mapreduce job id
* @param rawMRJoblog Diagnostics of the mapreduce job in a string
* @return HadoopException object which captures all the exceptions in the given Mapreduce job
*/
private HadoopException analyzeMRJob(String mrJobId, String rawMRJoblog) {
// This method is called only for unsuccessful MR jobs
HadoopException mrJobLevelException = new HadoopException();
List<HadoopException> childExceptions = new ArrayList<HadoopException>();
MRJobLogAnalyzer analyzedLog = new MRJobLogAnalyzer(rawMRJoblog);
Set<String> failedMRTaskIds = analyzedLog.getFailedSubEvents();

// sampling of tasks
int samplingSize = SAMPLE_SIZE;
for (String failedMRTaskId : failedMRTaskIds) {
if(samplingSize<=0) {
break;
}
String rawMRTaskLog = _mrClient.getMRTaskLog(mrJobId, failedMRTaskId);
HadoopException mrTaskLevelException = analyzeMRTask(failedMRTaskId, rawMRTaskLog);
childExceptions.add(mrTaskLevelException);

samplingSize--;
}

mrJobLevelException.setChildExceptions(childExceptions);
mrJobLevelException.setLoggingEvent(analyzedLog.getException());
mrJobLevelException.setType(HadoopException.HadoopExceptionType.MRJOB);
mrJobLevelException.setId(mrJobId);
return mrJobLevelException;
}

/**
* Given a failed MR Task id and diagnostics of the task, this method analyzes it and returns a HadoopException object which captures all the exception in the given MR task.
* @param mrTaskId The task id of the map reduce job
* @param rawMRTaskLog Raw map-reduce log
* @return HadoopException object which captures all the exceptions in the given Mapreduce task
*/
private HadoopException analyzeMRTask(String mrTaskId, String rawMRTaskLog) {
HadoopException mrTaskLevelException = new HadoopException();
MRTaskLogAnalyzer analyzedLog = new MRTaskLogAnalyzer(rawMRTaskLog);
mrTaskLevelException.setLoggingEvent(analyzedLog.getException());
mrTaskLevelException.setType(HadoopException.HadoopExceptionType.MRTASK);
mrTaskLevelException.setId(mrTaskId);
mrTaskLevelException.setChildExceptions(null);
return mrTaskLevelException;
}

/**
* Returns the Hadoop Exception object
* @return Returns the Hadoop Exception object
*/
public HadoopException getExceptions() {
return this._exception;
}
}
Loading

0 comments on commit 29844c9

Please sign in to comment.