Skip to content

Commit

Permalink
Backfill jobs which are lost while Dr.Elephant was down (linkedin#390)
Browse files Browse the repository at this point in the history
Squashed commit of the following:
    * Backfill - Rename 7.sql to 6.sql
    * Backfill jobs which are lost while Dr.Elephant was down (linkedin#385)
  • Loading branch information
varunsaxena committed Dec 14, 2018
1 parent 90b6453 commit faedb59
Show file tree
Hide file tree
Showing 38 changed files with 2,865 additions and 103 deletions.
3 changes: 2 additions & 1 deletion app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@

<!--
This is an experimental fetcher for Spark applications which uses SHS REST API to get application metrics
and WebHDFS to get application properties from eventlogs.
and WebHDFS to get application properties from eventlogs. Please note that this fetcher also supports backfill.
But backfill implementation in this fetcher relies upon SHS REST APIs' which are only available since Spark 2.3
<fetcher>
<applicationtype>spark</applicationtype>
Expand Down
26 changes: 26 additions & 0 deletions app-conf/GeneralConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,30 @@
<name>drelephant.analysis.fetch.initial.windowMillis</name>
<value>3600000</value>
</property> -->
<property>
<name>drelephant.analysis.backfill.enabled</name>
<value>false</value>
<description>Enables or disables backfilling. Please note that merely enabling backfilling
does not ensure backfilling. Backfilling should also be implemented for the fetcher(s) configured.
</description>
</property>
<property>
<name>drelephant.analysis.backfill.retry.interval</name>
<value>60000</value>
<description>Time interval, in milliseconds, to wait before retrying backfill if it failed
during an earlier attempt.</description>
</property>
<property>
<name>drelephant.analysis.prioritization-on-job-query.max-wait-interval</name>
<value>60000</value>
<description>Maximum amount of time, in milliseconds, to wait for job information to appear in DB
and return a successful result with job info for a job which was queried using job REST endpoint
with prioritize query param set</description>
</property>
<property>
<name>drelephant.analysis.submit-backfill-job.with.low-priority</name>
<value>true</value>
<description>If this property is "true", it indicates that backfill job will be added to executor
for analysis with LOW priority. If "false", it will be submitted with NORMAL priority</description>
</property>
</configuration>
21 changes: 11 additions & 10 deletions app/Global.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import play.GlobalSettings;
import play.Logger;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.logging.Level;
Expand All @@ -32,25 +31,27 @@
*/
public class Global extends GlobalSettings {

DrElephant _drElephant;
Thread _drElephantThread;

public void onStart(Application app) {
Logger.info("Starting Application...");

fixJavaKerberos();

try {
_drElephant = new DrElephant();
_drElephant.start();
} catch (IOException e) {
Logger.error("Application start failed...", e);
}
_drElephantThread = new Thread(DrElephant.getInstance());
_drElephantThread.start();
}

public void onStop(Application app) {
Logger.info("Stopping application...");
if (_drElephant != null) {
_drElephant.kill();
if (_drElephantThread != null) {
DrElephant.getInstance().kill();
_drElephantThread.interrupt();
try {
_drElephantThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

Expand Down
13 changes: 11 additions & 2 deletions app/com/linkedin/drelephant/DrElephant.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@
/**
* The main class which starts Dr. Elephant
*/
public class DrElephant extends Thread {
public class DrElephant implements Runnable {
public static final String AUTO_TUNING_ENABLED = "autotuning.enabled";
private static final Logger logger = Logger.getLogger(DrElephant.class);
private static final DrElephant INSTANCE = new DrElephant();

private ElephantRunner _elephant;
private AutoTuner _autoTuner;
private Thread _autoTunerThread;

private Boolean autoTuningEnabled;

public DrElephant() throws IOException {
private DrElephant() {
HDFSContext.load();
Configuration configuration = ElephantContext.instance().getAutoTuningConf();
autoTuningEnabled = configuration.getBoolean(AUTO_TUNING_ENABLED, false);
Expand All @@ -49,6 +50,14 @@ public DrElephant() throws IOException {
}
}

public static DrElephant getInstance() {
return INSTANCE;
}

public ElephantRunner getElephant() {
return _elephant;
}

@Override
public void run() {
if (_autoTunerThread != null) {
Expand Down
9 changes: 9 additions & 0 deletions app/com/linkedin/drelephant/ElephantContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import com.linkedin.drelephant.analysis.ApplicationType;
import com.linkedin.drelephant.analysis.ElephantBackfillFetcher;
import com.linkedin.drelephant.analysis.ElephantFetcher;
import com.linkedin.drelephant.analysis.HadoopApplicationData;
import com.linkedin.drelephant.analysis.HadoopMetricsAggregator;
Expand Down Expand Up @@ -380,6 +381,14 @@ public ElephantFetcher getFetcherForApplicationType(ApplicationType type) {
return _typeToFetcher.get(type);
}

public ElephantBackfillFetcher getBackfillFetcherForApplicationType(ApplicationType type) {
ElephantFetcher fetcher = _typeToFetcher.get(type);
if (fetcher instanceof ElephantBackfillFetcher) {
return (ElephantBackfillFetcher) fetcher;
}
return null;
}

public HadoopMetricsAggregator getAggregatorForApplicationType(ApplicationType type) {
return _typeToAggregator.get(type);
}
Expand Down
Loading

0 comments on commit faedb59

Please sign in to comment.