Skip to content

Commit

Permalink
Separate configuration logic from RM address update logic (linkedin#38)
Browse files Browse the repository at this point in the history
  • Loading branch information
tglstory authored and akshayrai committed May 6, 2016
1 parent 68d74d3 commit af70800
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 8 deletions.
1 change: 1 addition & 0 deletions app/com/linkedin/drelephant/ElephantRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public Void run() {
}

while (_running.get() && !Thread.currentThread().isInterrupted()) {
_analyticJobGenerator.updateResourceManagerAddresses();
lastRun = System.currentTimeMillis();

logger.info("Fetching analytic job list...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public interface AnalyticJobGenerator {
public void configure(Configuration configuration)
throws IOException;

/**
* Configures the resource manager addresses considering HA
*/
public void updateResourceManagerAddresses();

/**
* Provides a list of AnalyticJobs that should be calculated
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {
private static final String IS_RM_HA_ENABLED = "yarn.resourcemanager.ha.enabled";
private static final String RESOURCE_MANAGER_IDS = "yarn.resourcemanager.ha.rm-ids";
private static final String RM_NODE_STATE_URL = "http://%s/ws/v1/cluster/info";
private static Configuration configuration;

// We provide one minute job fetch delay due to the job sending lag from AM/NM to JobHistoryServer HDFS
private static final long FETCH_DELAY = 60000;
Expand All @@ -64,9 +65,7 @@ public class AnalyticJobGeneratorHadoop2 implements AnalyticJobGenerator {

private final Queue<AnalyticJob> _retryQueue = new ConcurrentLinkedQueue<AnalyticJob>();

@Override
public void configure(Configuration configuration)
throws IOException {
public void updateResourceManagerAddresses() {
if (Boolean.valueOf(configuration.get(IS_RM_HA_ENABLED))) {
String resourceManagers = configuration.get(RESOURCE_MANAGER_IDS);
if (resourceManagers != null) {
Expand Down Expand Up @@ -94,19 +93,27 @@ public void configure(Configuration configuration)
catch (AuthenticationException e) {
logger.error("Error fetching resource manager state " + e.getMessage());
}
catch (IOException e) {
logger.error("Error fetching Json for resource manager status " + e.getMessage());
}
}
}
else {
} else {
_resourceManagerAddress = configuration.get(RESOURCE_MANAGER_ADDRESS);
}

if (_resourceManagerAddress == null) {
throw new RuntimeException(
"Cannot get YARN resource manager address from Hadoop Configuration property: [" + RESOURCE_MANAGER_ADDRESS
+ "].");
"Cannot get YARN resource manager address from Hadoop Configuration property: [" + RESOURCE_MANAGER_ADDRESS
+ "].");
}
}

@Override
public void configure(Configuration configuration)
throws IOException {
this.configuration = configuration;
updateResourceManagerAddresses();
}

/**
* Fetch all the succeeded and failed applications/analytic jobs from the resource manager.
*
Expand Down

0 comments on commit af70800

Please sign in to comment.