Skip to content

Commit

Permalink
YARN-3632. Ordering policy should be allowed to reorder an applicatio…
Browse files Browse the repository at this point in the history
…n when demand changes. Contributed by Craig Welch
  • Loading branch information
jian-he committed May 26, 2015
1 parent 500a1d9 commit 10732d5
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 9 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ Release 2.8.0 - UNRELEASED

YARN-3707. RM Web UI queue filter doesn't work. (Wangda Tan via jianhe)

YARN-3632. Ordering policy should be allowed to reorder an application when
demand changes. (Craig Welch via jianhe)

Release 2.7.1 - UNRELEASED

INCOMPATIBLE CHANGES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,14 @@ public long getNewContainerId() {
*
* @param requests resources to be acquired
* @param recoverPreemptedRequest recover Resource Request on preemption
* @return true if any resource was updated, false else
*/
synchronized public void updateResourceRequests(
synchronized public boolean updateResourceRequests(
List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
QueueMetrics metrics = queue.getMetrics();

boolean anyResourcesUpdated = false;

// Update resource requests
for (ResourceRequest request : requests) {
Priority priority = request.getPriority();
Expand All @@ -146,6 +149,7 @@ synchronized public void updateResourceRequests(
+ request);
}
updatePendingResources = true;
anyResourcesUpdated = true;

// Premature optimization?
// Assumes that we won't see more than one priority request updated
Expand Down Expand Up @@ -209,6 +213,7 @@ synchronized public void updateResourceRequests(
}
}
}
return anyResourcesUpdated;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,12 @@ public Queue getQueue() {
return queue;
}

public synchronized void updateResourceRequests(
public synchronized boolean updateResourceRequests(
List<ResourceRequest> requests) {
if (!isStopped) {
appSchedulingInfo.updateResourceRequests(requests, false);
return appSchedulingInfo.updateResourceRequests(requests, false);
}
return false;
}

public synchronized void recoverResourceRequests(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,10 @@ ask, getResourceCalculator(), getClusterResource(),
// Release containers
releaseContainers(release, application);

Allocation allocation;

LeafQueue updateDemandForQueue = null;

synchronized (application) {

// make sure we aren't stopping/removing the application
Expand All @@ -915,8 +919,10 @@ ask, getResourceCalculator(), getClusterResource(),
application.showRequests();

// Update application requests
application.updateResourceRequests(ask);

if (application.updateResourceRequests(ask)) {
updateDemandForQueue = (LeafQueue) application.getQueue();
}

LOG.debug("allocate: post-update");
application.showRequests();
}
Expand All @@ -929,9 +935,16 @@ ask, getResourceCalculator(), getClusterResource(),

application.updateBlacklist(blacklistAdditions, blacklistRemovals);

return application.getAllocation(getResourceCalculator(),
allocation = application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
}

if (updateDemandForQueue != null) {
updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
}

return allocation;

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti

protected TreeSet<S> schedulableEntities;
protected Comparator<SchedulableEntity> comparator;
protected Map<String, S> entitiesToReorder = new HashMap<String, S>();

public AbstractComparatorOrderingPolicy() { }

Expand All @@ -47,11 +48,13 @@ public Collection<S> getSchedulableEntities() {

@Override
public Iterator<S> getAssignmentIterator() {
reorderScheduleEntities();
return schedulableEntities.iterator();
}

@Override
public Iterator<S> getPreemptionIterator() {
reorderScheduleEntities();
return schedulableEntities.descendingIterator();
}

Expand All @@ -68,6 +71,22 @@ protected void reorderSchedulableEntity(S schedulableEntity) {
schedulableEntities.add(schedulableEntity);
}

protected void reorderScheduleEntities() {
synchronized (entitiesToReorder) {
for (Map.Entry<String, S> entry :
entitiesToReorder.entrySet()) {
reorderSchedulableEntity(entry.getValue());
}
entitiesToReorder.clear();
}
}

protected void entityRequiresReordering(S schedulableEntity) {
synchronized (entitiesToReorder) {
entitiesToReorder.put(schedulableEntity.getId(), schedulableEntity);
}
}

@VisibleForTesting
public Comparator<SchedulableEntity> getComparator() {
return comparator;
Expand All @@ -80,6 +99,9 @@ public void addSchedulableEntity(S s) {

@Override
public boolean removeSchedulableEntity(S s) {
synchronized (entitiesToReorder) {
entitiesToReorder.remove(s.getId());
}
return schedulableEntities.remove(s);
}

Expand All @@ -104,6 +126,9 @@ public abstract void containerAllocated(S schedulableEntity,
public abstract void containerReleased(S schedulableEntity,
RMContainer r);

@Override
public abstract void demandUpdated(S schedulableEntity);

@Override
public abstract String getInfo();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,22 @@ public void configure(Map<String, String> conf) {
@Override
public void containerAllocated(S schedulableEntity,
RMContainer r) {
reorderSchedulableEntity(schedulableEntity);
entityRequiresReordering(schedulableEntity);
}

@Override
public void containerReleased(S schedulableEntity,
RMContainer r) {
reorderSchedulableEntity(schedulableEntity);
entityRequiresReordering(schedulableEntity);
}

@Override
public void demandUpdated(S schedulableEntity) {
if (sizeBasedWeight) {
entityRequiresReordering(schedulableEntity);
}
}

@Override
public String getInfo() {
String sbw = sizeBasedWeight ? " with sizeBasedWeight" : "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ public void containerAllocated(S schedulableEntity,
public void containerReleased(S schedulableEntity,
RMContainer r) {
}


@Override
public void demandUpdated(S schedulableEntity) {
}

@Override
public String getInfo() {
return "FifoOrderingPolicy";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public void containerAllocated(S schedulableEntity,
public void containerReleased(S schedulableEntity,
RMContainer r);

/**
* Demand Updated for the passed schedulableEntity, reorder if needed.
*/
void demandUpdated(S schedulableEntity);

/**
* Display information regarding configuration & status
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
Expand Down Expand Up @@ -126,6 +128,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
Expand Down Expand Up @@ -676,6 +679,118 @@ public void testBlackListNodes() throws Exception {
rm.stop();
}

@Test
public void testAllocateReorder() throws Exception {

//Confirm that allocation (resource request) alone will trigger a change in
//application ordering where appropriate

Configuration conf = new Configuration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();

LeafQueue q = (LeafQueue) cs.getQueue("default");
Assert.assertNotNull(q);

FairOrderingPolicy fop = new FairOrderingPolicy();
fop.setSizeBasedWeight(true);
q.setOrderingPolicy(fop);

String host = "127.0.0.1";
RMNode node =
MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
cs.handle(new NodeAddedSchedulerEvent(node));

//add app begin
ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
appId1, 1);

RMAppAttemptMetrics attemptMetric1 =
new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
RMAppImpl app1 = mock(RMAppImpl.class);
when(app1.getApplicationId()).thenReturn(appId1);
RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
when(app1.getCurrentAppAttempt()).thenReturn(attempt1);

rm.getRMContext().getRMApps().put(appId1, app1);

SchedulerEvent addAppEvent1 =
new AppAddedSchedulerEvent(appId1, "default", "user");
cs.handle(addAppEvent1);
SchedulerEvent addAttemptEvent1 =
new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
cs.handle(addAttemptEvent1);
//add app end

//add app begin
ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2);
ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
appId2, 1);

RMAppAttemptMetrics attemptMetric2 =
new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext());
RMAppImpl app2 = mock(RMAppImpl.class);
when(app2.getApplicationId()).thenReturn(appId2);
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);

rm.getRMContext().getRMApps().put(appId2, app2);

SchedulerEvent addAppEvent2 =
new AppAddedSchedulerEvent(appId2, "default", "user");
cs.handle(addAppEvent2);
SchedulerEvent addAttemptEvent2 =
new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
cs.handle(addAttemptEvent2);
//add app end

RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);

Priority priority = TestUtils.createMockPriority(1);
ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);

//This will allocate for app1
cs.allocate(appAttemptId1,
Collections.<ResourceRequest>singletonList(r1),
Collections.<ContainerId>emptyList(),
null, null);

//And this will result in container assignment for app1
CapacityScheduler.schedule(cs);

//Verify that app1 is still first in assignment order
//This happens because app2 has no demand/a magnitude of NaN, which
//results in app1 and app2 being equal in the fairness comparison and
//failling back to fifo (start) ordering
assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
appId1.toString());

//Now, allocate for app2 (this would be the first/AM allocation)
ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
cs.allocate(appAttemptId2,
Collections.<ResourceRequest>singletonList(r2),
Collections.<ContainerId>emptyList(),
null, null);

//In this case we do not perform container assignment because we want to
//verify re-ordering based on the allocation alone

//Now, the first app for assignment is app2
assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
appId2.toString());

rm.stop();
}

@Test
public void testResourceOverCommit() throws Exception {
Configuration conf = new Configuration();
Expand Down

0 comments on commit 10732d5

Please sign in to comment.