Skip to content

Commit

Permalink
Adds an option to fetch recently finished apps from RM (linkedin#212)
Browse files Browse the repository at this point in the history
  • Loading branch information
shkhrgpt authored and akshayrai committed Feb 28, 2017
1 parent da7983c commit 0d668ab
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 1 deletion.
8 changes: 8 additions & 0 deletions app-conf/GeneralConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,12 @@
<value>true</value>
<description>If this property is "false", search will only make exact matches</description>
</property>
<!--
Initial window in MS to indicate how much older apps to fetch from RM.
-->
<!--
<property>
<name>drelephant.analysis.fetch.initial.windowMillis</name>
<value>3600000</value>
</property> -->
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
private static final String IS_RM_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
private static final String RESOURCE_MANAGER_IDS = "yarn.resourcemanager.ha.rm-ids";
private static final String RM_NODE_STATE_URL = "http://%s/ws/v1/cluster/info";
private static final String FETCH_INITIAL_WINDOW_MS = "drelephant.analysis.fetch.initial.windowMillis";

private static Configuration configuration;

// We provide one minute job fetch delay due to the job sending lag from AM/NM to JobHistoryServer HDFS
Expand All @@ -58,6 +60,7 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {

private String _resourceManagerAddress;
private long _lastTime = 0;
private long _fetchStartTime = 0;
private long _currentTime = 0;
private long _tokenUpdatedTime = 0;
private AuthenticatedURL.Token _token;
Expand Down Expand Up @@ -109,6 +112,12 @@ public void updateResourceManagerAddresses() {
public void configure(Configuration configuration)
throws IOException {
this.configuration = configuration;
String initialFetchWindowString = configuration.get(FETCH_INITIAL_WINDOW_MS);
if (initialFetchWindowString != null) {
long initialFetchWindow = Long.getLong(initialFetchWindowString);
_lastTime = System.currentTimeMillis() - FETCH_DELAY - initialFetchWindow;
_fetchStartTime = _lastTime;
}
updateResourceManagerAddresses();
}

Expand Down Expand Up @@ -212,7 +221,7 @@ private List<AnalyticJob> readApps(URL url) throws IOException, AuthenticationEx

// When called first time after launch, hit the DB and avoid duplicated analytic jobs that have been analyzed
// before.
if (_lastTime > 0 || (_lastTime == 0 && AppResult.find.byId(appId) == null)) {
if (_lastTime > _fetchStartTime || (_lastTime == _fetchStartTime && AppResult.find.byId(appId) == null)) {
String user = app.get("user").getValueAsText();
String name = app.get("name").getValueAsText();
String queueName = app.get("queue").getValueAsText();
Expand Down

0 comments on commit 0d668ab

Please sign in to comment.