Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

Commit

Permalink
Merge pull request #158 from corindwyer/master
Browse files Browse the repository at this point in the history
Execute autoscaler synchronously on scheduler thread
  • Loading branch information
corindwyer authored Aug 14, 2017
2 parents e669631 + 65a60d2 commit 9617b09
Show file tree
Hide file tree
Showing 13 changed files with 565 additions and 433 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ Map<String, List<String>> createPseudoHosts(Map<String, Integer> groupCounts, Fu
);
}

void removePsuedoHosts(Map<String, List<String>> hostsMap) {
void removePseudoHosts(Map<String, List<String>> hostsMap) {
if (hostsMap != null && !hostsMap.isEmpty()) {
for (Map.Entry<String, List<String>> entry: hostsMap.entrySet()) {
for (String h: entry.getValue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ Map<VMResource, Double> getMaxResources() {
*/
TaskAssignmentResult tryRequest(TaskRequest request, VMTaskFitnessCalculator fitnessCalculator) {
if(logger.isDebugEnabled())
logger.debug("Host {} task {}: #leases=", getHostname(), request.getId(), leasesMap.size());
logger.debug("Host {} task {}: #leases: {}", getHostname(), request.getId(), leasesMap.size());
if(leasesMap.isEmpty())
return null;
if(exclusiveTaskId!=null) {
Expand Down
245 changes: 129 additions & 116 deletions fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java

Large diffs are not rendered by default.

58 changes: 33 additions & 25 deletions fenzo-core/src/main/java/com/netflix/fenzo/InternalVMCloner.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@

package com.netflix.fenzo;

import org.apache.mesos.Protos;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.apache.mesos.Protos;

/* package */ class InternalVMCloner {

/**
* Creates a VM lease object representing the max of resources from the given collection of VMs. Attributes
* are created by adding all unique attributes across the collection.
*
* @param avms Collection of VMs to create the new lease object from.
* @return VM lease object representing the max of resources from the given collection of VMs
*/
Expand All @@ -42,38 +43,45 @@
final Map<String, Double> scalars = new HashMap<>();
final Map<String, Protos.Attribute> attributeMap = new HashMap<>();
if (avms != null) {
for(AssignableVirtualMachine avm: avms) {
final Map<VMResource, Double[]> resourceStatus = avm.getResourceStatus();
Double[] values = resourceStatus.get(VMResource.CPU);
if (values != null && values.length == 2)
cpus = Math.max(cpus, values[0] + values[1]);
values = resourceStatus.get(VMResource.Memory);
if (values != null && values.length == 2)
mem = Math.max(mem, values[0] + values[1]);
values = resourceStatus.get(VMResource.Disk);
if (values != null && values.length == 2)
disk = Math.max(disk, values[0] + values[1]);
values = resourceStatus.get(VMResource.Network);
if (values != null && values.length == 2)
network = Math.max(network, values[0] + values[1]);
values = resourceStatus.get(VMResource.Ports);
if (values != null && values.length == 2)
ports = Math.max(ports, values[0] + values[1]);
for (AssignableVirtualMachine avm : avms) {
Map<VMResource, Double> maxResources = avm.getMaxResources();
Double value = maxResources.get(VMResource.CPU);
if (value != null) {
cpus = Math.max(cpus, value);
}
value = maxResources.get(VMResource.Memory);
if (value != null) {
mem = Math.max(mem, value);
}
value = maxResources.get(VMResource.Disk);
if (value != null) {
disk = Math.max(disk, value);
}
value = maxResources.get(VMResource.Network);
if (value != null) {
network = Math.max(network, value);
}
value = maxResources.get(VMResource.Ports);
if (value != null) {
ports = Math.max(ports, value);
}
final Map<String, Double> maxScalars = avm.getMaxScalars();
if (maxScalars != null && !maxScalars.isEmpty()) {
for (String k: maxScalars.keySet())
for (String k : maxScalars.keySet())
scalars.compute(k, (s, oldVal) -> {
if (oldVal == null)
if (oldVal == null) {
oldVal = 0.0;
}
Double aDouble = maxScalars.get(k);
if (aDouble == null)
if (aDouble == null) {
aDouble = 0.0;
}
return oldVal + aDouble;
});
}
final Map<String, Protos.Attribute> attrs = avm.getCurrTotalLease().getAttributeMap();
if (attrs != null && !attrs.isEmpty()) {
for (Map.Entry<String, Protos.Attribute> e: attrs.entrySet())
for (Map.Entry<String, Protos.Attribute> e : attrs.entrySet())
attributeMap.putIfAbsent(e.getKey(), e.getValue());
}
}
Expand All @@ -83,7 +91,7 @@
final double fdisk = disk;
final double fnetwork = network;
final List<VirtualMachineLease.Range> fports = Collections.singletonList(
new VirtualMachineLease.Range(100, 100 + (int)ports));
new VirtualMachineLease.Range(100, 100 + (int) ports));
return new VirtualMachineLease() {
@Override
public String getId() {
Expand Down Expand Up @@ -156,7 +164,7 @@ public Map<String, Double> getScalarValues() {
final List<VirtualMachineLease.Range> ports = new LinkedList<>();
final List<VirtualMachineLease.Range> ranges = lease.portRanges();
if (ranges != null && !ranges.isEmpty()) {
for (VirtualMachineLease.Range r: ranges)
for (VirtualMachineLease.Range r : ranges)
ports.add(new VirtualMachineLease.Range(r.getBeg(), r.getEnd()));
}
final double cpus = lease.cpuCores();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* The pseudo scheduling run performs an entire scheduling iteration using the cloned queue and pseudo VMs in addition
* to any new VM leases that have been added since previous scheduling iteration. This will invoke any and all task
* constraints as well as fitness function setup in the scheduler. The scheduling result is used to determine the
* number of VMs in each group and then the results are discarded. As expected, the psuedo scheduling run has no impact
* number of VMs in each group and then the results are discarded. As expected, the pseudo scheduling run has no impact
* on the real scheduling assignments made.
* <P>
* Tasks for which scale up is requested by this evaluator are remembered and not requested again until certain delay.
Expand All @@ -63,7 +63,7 @@ public Map<String, Integer> getShortfall(Set<String> vmGroupNames, Set<TaskReque
return Collections.emptyMap();

final InternalTaskQueue taskQueue = createAndFillAlternateQueue(filteredTasks);
return schedulingService.requestPsuedoScheduling(taskQueue, shortfallTasksPerGroup);
return schedulingService.requestPseudoScheduling(taskQueue, shortfallTasksPerGroup);
}

private InternalTaskQueue createAndFillAlternateQueue(List<TaskRequest> shortfallTasks) {
Expand Down
77 changes: 58 additions & 19 deletions fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public final static class Builder {
private Action1<AutoScaleAction> autoscalerCallback=null;
private long delayAutoscaleUpBySecs=0L;
private long delayAutoscaleDownBySecs=0L;
private long disabledVmDurationInSecs =0L;
private List<AutoScaleRule> autoScaleRules=new ArrayList<>();
private Func1<Double, Boolean> isFitnessGoodEnoughFunction = new Func1<Double, Boolean>() {
@Override
Expand Down Expand Up @@ -376,6 +377,27 @@ public Builder withDelayAutoscaleDownBySecs(long delayAutoscaleDownBySecs) {
return this;
}

/**
* How long to disable a VM when going through a scale down action. Note that the value used will be the max
* between this value and the {@link AutoScaleRule#getCoolDownSecs()} value and that this value should be
* greater than the {@link AutoScaleRule#getCoolDownSecs()} value. If the supplied {@link AutoScaleAction}
* does not actually terminate the instance in this time frame then the VM will become enabled. This option is useful
* when you want to increase the disabled time of a VM because the implementation of the {@link AutoScaleAction} may
* take longer than the cooldown period.
*
* @param disabledVmDurationInSecs Disable VMs about to be terminated by this many seconds.
* @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskScheduler}
* @throws IllegalArgumentException if {@code disabledVmDurationInSecs} is not greater than 0.
* @see <a href="https://github.com/Netflix/Fenzo/wiki/Autoscaling">Autoscaling</a>
*/
public Builder withAutoscaleDisabledVmDurationInSecs(long disabledVmDurationInSecs) {
if(disabledVmDurationInSecs <= 0L) {
throw new IllegalArgumentException("disabledVmDurationInSecs must be greater than 0: " + disabledVmDurationInSecs);
}
this.disabledVmDurationInSecs = disabledVmDurationInSecs;
return this;
}

/**
* Indicate that the cluster receives resource offers only once per VM (host). Normally, Mesos sends resource
* offers multiple times, as resources free up on the host upon completion of various tasks. This method
Expand Down Expand Up @@ -466,6 +488,9 @@ private TaskScheduler(Builder builder) {
autoScaler.setDelayScaleDownBySecs(builder.delayAutoscaleDownBySecs);
if(builder.delayAutoscaleUpBySecs > 0L)
autoScaler.setDelayScaleUpBySecs(builder.delayAutoscaleUpBySecs);
if (builder.disabledVmDurationInSecs > 0L) {
autoScaler.setDisabledVmDurationInSecs(builder.disabledVmDurationInSecs);
}
}
else {
autoScaler=null;
Expand Down Expand Up @@ -682,21 +707,8 @@ public Assignable<TaskRequest> next() {
TaskIterator taskIterator,
List<VirtualMachineLease> newLeases) throws IllegalStateException {
checkIfShutdown();
try (AutoCloseable
ac = stateMonitor.enter()) {
long start = System.currentTimeMillis();
final SchedulingResult schedulingResult = doSchedule(taskIterator, newLeases);
if((lastVMPurgeAt + purgeVMsIntervalSecs*1000) < System.currentTimeMillis()) {
lastVMPurgeAt = System.currentTimeMillis();
logger.info("Purging inactive VMs");
assignableVMs.purgeInactiveVMs( // explicitly exclude VMs that have assignments
schedulingResult.getResultMap() == null?
Collections.emptySet() :
new HashSet<>(schedulingResult.getResultMap().keySet())
);
}
schedulingResult.setRuntime(System.currentTimeMillis() - start);
return schedulingResult;
try (AutoCloseable ac = stateMonitor.enter()) {
return doScheduling(taskIterator, newLeases);
} catch (Exception e) {
logger.error("Error with scheduling run: " + e.getMessage(), e);
if(e instanceof IllegalStateException)
Expand All @@ -708,6 +720,33 @@ public Assignable<TaskRequest> next() {
}
}

/**
* Variant of {@link #scheduleOnce(List, List)} that should be only used to schedule a pseudo iteration as it
* ignores the StateMonitor lock.
* @param taskIterator Iterator for tasks to assign resources to.
* @return a {@link SchedulingResult} object that contains a task assignment results map and other summaries
*/
/* package */ SchedulingResult pseudoScheduleOnce(TaskIterator taskIterator) throws Exception {
return doScheduling(taskIterator, Collections.emptyList());
}

private SchedulingResult doScheduling(TaskIterator taskIterator,
List<VirtualMachineLease> newLeases) throws Exception {
long start = System.currentTimeMillis();
final SchedulingResult schedulingResult = doSchedule(taskIterator, newLeases);
if((lastVMPurgeAt + purgeVMsIntervalSecs*1000) < System.currentTimeMillis()) {
lastVMPurgeAt = System.currentTimeMillis();
logger.info("Purging inactive VMs");
assignableVMs.purgeInactiveVMs( // explicitly exclude VMs that have assignments
schedulingResult.getResultMap() == null?
Collections.emptySet() :
new HashSet<>(schedulingResult.getResultMap().keySet())
);
}
schedulingResult.setRuntime(System.currentTimeMillis() - start);
return schedulingResult;
}

private SchedulingResult doSchedule(
TaskIterator taskIterator,
List<VirtualMachineLease> newLeases) throws Exception {
Expand Down Expand Up @@ -858,7 +897,7 @@ public EvalResult call() throws Exception {
rejectedCount.addAndGet(assignableVMs.removeLimitedLeases(expirableLeases));
final AutoScalerInput autoScalerInput = new AutoScalerInput(idleResourcesList, idleInactiveAVMs, failedTasksForAutoScaler);
if (autoScaler != null)
autoScaler.scheduleAutoscale(autoScalerInput);
autoScaler.doAutoscale(autoScalerInput);
}
schedulingResult.setLeasesAdded(newLeases.size());
schedulingResult.setLeasesRejected(rejectedCount.get());
Expand All @@ -872,11 +911,11 @@ public EvalResult call() throws Exception {
return assignableVMs.createPseudoHosts(groupCounts, autoScaler == null? name -> null : autoScaler::getRule);
}

/* package */ void removePsuedoHosts(Map<String, List<String>> hostsMap) {
assignableVMs.removePsuedoHosts(hostsMap);
/* package */ void removePseudoHosts(Map<String, List<String>> hostsMap) {
assignableVMs.removePseudoHosts(hostsMap);
}

/* package */ void removePsuedoAssignments() {
/* package */ void removePseudoAssignments() {
taskTracker.clearAssignedTasks(); // this should suffice for pseudo assignments
}

Expand Down
Loading

0 comments on commit 9617b09

Please sign in to comment.