From 488433b69e7522440ef16a158b05df7c67fafb55 Mon Sep 17 00:00:00 2001 From: Corin Dwyer Date: Fri, 11 Aug 2017 22:17:10 -0700 Subject: [PATCH 1/4] change autoscaler to be synchronously executed as part of the scheduling iteration while keeping the callback execution on a separate thread change shortfall analysis implementation to be synchronously executed as part of the autoscaler add withAutoscaleDisabledVmDurationInSecs for users that want to disable VMs for a fixed duration that is different from the cooldown value --- .../fenzo/AssignableVirtualMachine.java | 4 +- .../java/com/netflix/fenzo/AutoScaler.java | 243 ++++---- .../com/netflix/fenzo/InternalVMCloner.java | 58 +- .../java/com/netflix/fenzo/StateMonitor.java | 20 +- .../java/com/netflix/fenzo/TaskScheduler.java | 24 +- .../netflix/fenzo/TaskSchedulingService.java | 35 +- .../java/com/netflix/fenzo/VMCollection.java | 6 +- .../fenzo/samples/SampleQbasedScheduling.java | 2 +- .../com/netflix/fenzo/AutoScalerTest.java | 550 ++++++++++-------- .../fenzo/ShortfallAutoscalerTest.java | 2 +- .../fenzo/TaskSchedulingServiceTest.java | 2 +- .../fenzo/queues/tiered/TieredQueueTest.java | 2 +- 12 files changed, 528 insertions(+), 420 deletions(-) diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java index 31fa074..0fd37a3 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java @@ -617,7 +617,7 @@ Map getMaxScalars() { return result; } - Map getMaxResources() { + public Map getMaxResources() { double cpus=0.0; double memory=0.0; double network=0.0; @@ -660,7 +660,7 @@ Map getMaxResources() { */ TaskAssignmentResult tryRequest(TaskRequest request, VMTaskFitnessCalculator fitnessCalculator) { if(logger.isDebugEnabled()) - logger.debug("Host {} task {}: #leases=", getHostname(), request.getId(), leasesMap.size()); + logger.debug("Host {} task {}: #leases: {}", getHostname(), request.getId(), leasesMap.size()); if(leasesMap.isEmpty()) return null; if(exclusiveTaskId!=null) { diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java b/fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java index 8f1cb78..67ef52c 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java @@ -16,13 +16,6 @@ package com.netflix.fenzo; -import com.netflix.fenzo.functions.Action1; -import com.netflix.fenzo.functions.Func1; -import com.netflix.fenzo.queues.QueuableTask; -import org.apache.mesos.Protos; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -32,78 +25,45 @@ import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; -class AutoScaler { - - private static class HostAttributeGroup { - String name; - List idleHosts; - List idleInactiveHosts; - int shortFall; - AutoScaleRule rule; - - private HostAttributeGroup(String name, AutoScaleRule rule) { - this.name = name; - this.rule = rule; - this.idleHosts = new ArrayList<>(); - this.idleInactiveHosts = new ArrayList<>(); - this.shortFall=0; - } - } - - private static class ScalingActivity { - private long scaleUpAt; - private long scaleUpRequestedAt; - private long scaleDownAt; - private long scaleDownRequestedAt; - private long inactiveScaleDownAt; - private long inactiveScaleDownRequestedAt; - private int shortfall; - private int scaledNumInstances; - private AutoScaleAction.Type type; +import com.netflix.fenzo.functions.Action1; +import com.netflix.fenzo.functions.Func1; +import com.netflix.fenzo.queues.QueuableTask; +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; - private ScalingActivity(long scaleUpAt, long scaleDownAt, int shortfall, int scaledNumInstances, AutoScaleAction.Type type) { - this.scaleUpAt = scaleUpAt; - scaleUpRequestedAt = 0L; - this.scaleDownAt = scaleDownAt; - scaleDownRequestedAt = 0L; - this.shortfall = shortfall; - this.scaledNumInstances = scaledNumInstances; - this.type = type; - } - } +class AutoScaler { private static final Logger logger = LoggerFactory.getLogger(AutoScaler.class); - private volatile Action1 callback=null; + final VMCollection vmCollection; private final String mapHostnameAttributeName; private final String scaleDownBalancedByAttributeName; - private ShortfallEvaluator shortfallEvaluator; private final ActiveVmGroups activeVmGroups; private final AutoScaleRules autoScaleRules; private final boolean disableShortfallEvaluation; private final String attributeName; private final AssignableVMs assignableVMs; - private long delayScaleUpBySecs =0L; - private long delayScaleDownBySecs =0L; - private volatile Func1> taskToClustersGetter = null; private final ExecutorService executor = new ThreadPoolExecutor(1, 1, Long.MAX_VALUE, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), new ThreadPoolExecutor.DiscardPolicy()); - private final AtomicBoolean isShutdown = new AtomicBoolean(); private final ConcurrentMap scalingActivityMap = new ConcurrentHashMap<>(); - final VMCollection vmCollection; private final ScaleDownConstraintExecutor scaleDownConstraintExecutor; + private volatile Action1 callback = null; + private ShortfallEvaluator shortfallEvaluator; + private long delayScaleUpBySecs = 0L; + private long delayScaleDownBySecs = 0L; + private long disabledVmDurationInSecs = 0L; + private volatile Func1> taskToClustersGetter = null; AutoScaler(final String attributeName, String mapHostnameAttributeName, String scaleDownBalancedByAttributeName, final List autoScaleRules, final AssignableVMs assignableVMs, @@ -134,7 +94,7 @@ Collection getRules() { } void replaceRule(AutoScaleRule rule) { - if(rule == null) + if (rule == null) throw new NullPointerException("Can't add null rule"); autoScaleRules.replaceRule(rule); } @@ -144,7 +104,7 @@ AutoScaleRule getRule(String name) { } void removeRule(String ruleName) { - if(ruleName != null) + if (ruleName != null) autoScaleRules.remRule(ruleName); } @@ -156,39 +116,52 @@ void setDelayScaleDownBySecs(long secs) { delayScaleDownBySecs = secs; } + void setDisabledVmDurationInSecs(long disabledVmDurationInSecs) { + this.disabledVmDurationInSecs = disabledVmDurationInSecs; + } + void setTaskToClustersGetter(Func1> getter) { this.taskToClustersGetter = getter; } void scheduleAutoscale(final AutoScalerInput autoScalerInput) { - if(isShutdown.get()) + if (isShutdown.get()) { return; + } + try { + shortfallEvaluator.setTaskToClustersGetter(taskToClustersGetter); + autoScaleRules.prepare(); + Map hostAttributeGroupMap = setupHostAttributeGroupMap(autoScaleRules, scalingActivityMap); + if (!disableShortfallEvaluation) { + Map shortfall = shortfallEvaluator.getShortfall(hostAttributeGroupMap.keySet(), autoScalerInput.getFailures(), autoScaleRules); + for (Map.Entry entry : shortfall.entrySet()) { + hostAttributeGroupMap.get(entry.getKey()).shortFall = entry.getValue() == null ? 0 : entry.getValue(); + } + } + populateIdleResources(autoScalerInput.getIdleResourcesList(), autoScalerInput.getIdleInactiveResourceList(), hostAttributeGroupMap); + List callbacks = new ArrayList<>(); + for (HostAttributeGroup hostAttributeGroup : hostAttributeGroupMap.values()) { + callbacks.addAll(processScalingNeeds(hostAttributeGroup, scalingActivityMap, assignableVMs)); + } + executor.submit(() -> { - if(isShutdown.get()) + if (isShutdown.get()) { return; - shortfallEvaluator.setTaskToClustersGetter(taskToClustersGetter); - autoScaleRules.prepare(); - Map hostAttributeGroupMap = setupHostAttributeGroupMap(autoScaleRules, scalingActivityMap); - if (!disableShortfallEvaluation) { - Map shortfall = shortfallEvaluator.getShortfall(hostAttributeGroupMap.keySet(), autoScalerInput.getFailures(), autoScaleRules); - for (Map.Entry entry : shortfall.entrySet()) { - hostAttributeGroupMap.get(entry.getKey()).shortFall = entry.getValue() == null ? 0 : entry.getValue(); - } } - populateIdleResources(autoScalerInput.getIdleResourcesList(), autoScalerInput.getIdleInactiveResourceList(), hostAttributeGroupMap); - for (HostAttributeGroup hostAttributeGroup : hostAttributeGroupMap.values()) { - processScalingNeeds(hostAttributeGroup, scalingActivityMap, assignableVMs); + //Since the agents are disabled synchronously for each attribute group, the agents will become enabled again + //if the sum duration of all callback calls takes longer than the cooldown period + for (Runnable callback : callbacks) { + callback.run(); } }); - } - catch (RejectedExecutionException e) { - logger.warn("Autoscaler execution request rejected: " + e.getMessage()); + } catch (Exception e) { + logger.error("Autoscaler failure: ", e); } } private boolean shouldScaleNow(boolean scaleUp, long now, ScalingActivity prevScalingActivity, AutoScaleRule rule) { - return scaleUp? + return scaleUp ? now > (Math.max(activeVmGroups.getLastSetAt(), prevScalingActivity.scaleUpAt) + rule.getCoolDownSecs() * 1000) : now > (Math.max(activeVmGroups.getLastSetAt(), Math.max(prevScalingActivity.scaleDownAt, prevScalingActivity.scaleUpAt)) + rule.getCoolDownSecs() * 1000); @@ -206,22 +179,22 @@ private boolean shouldScaleDownInactive(long now, ScalingActivity prevScalingAct return now > (Math.max(activeVmGroups.getLastSetAt(), prevScalingActivity.inactiveScaleDownAt) + rule.getCoolDownSecs() * 1000); } - private void processScalingNeeds(HostAttributeGroup hostAttributeGroup, ConcurrentMap scalingActivityMap, AssignableVMs assignableVMs) { + private List processScalingNeeds(HostAttributeGroup hostAttributeGroup, ConcurrentMap scalingActivityMap, AssignableVMs assignableVMs) { + List callbacks = new ArrayList<>(); AutoScaleRule rule = hostAttributeGroup.rule; long now = System.currentTimeMillis(); - ScalingActivity prevScalingActivity= scalingActivityMap.get(rule.getRuleName()); + ScalingActivity prevScalingActivity = scalingActivityMap.get(rule.getRuleName()); - int excess = hostAttributeGroup.shortFall>0? 0 : hostAttributeGroup.idleHosts.size() - rule.getMaxIdleHostsToKeep(); + int excess = hostAttributeGroup.shortFall > 0 ? 0 : hostAttributeGroup.idleHosts.size() - rule.getMaxIdleHostsToKeep(); int inactiveIdleCount = hostAttributeGroup.idleInactiveHosts.size(); List allHostsToTerminate = new ArrayList<>(); - if(inactiveIdleCount > 0 && shouldScaleDownInactive(now, prevScalingActivity, rule)) { + if (inactiveIdleCount > 0 && shouldScaleDownInactive(now, prevScalingActivity, rule)) { ScalingActivity scalingActivity = scalingActivityMap.get(rule.getRuleName()); long lastReqstAge = (now - scalingActivity.inactiveScaleDownRequestedAt) / 1000L; - if(delayScaleDownBySecs>0L && lastReqstAge > 2 * delayScaleDownBySecs) { // reset the request at time + if (delayScaleDownBySecs > 0L && lastReqstAge > 2 * delayScaleDownBySecs) { // reset the request at time scalingActivity.inactiveScaleDownRequestedAt = now; - } - else if(delayScaleDownBySecs == 0L || lastReqstAge > delayScaleDownBySecs) { + } else if (delayScaleDownBySecs == 0L || lastReqstAge > delayScaleDownBySecs) { scalingActivity.inactiveScaleDownRequestedAt = 0L; scalingActivity.inactiveScaleDownAt = now; Map hostsToTerminate = getInactiveHostsToTerminate(hostAttributeGroup.idleInactiveHosts); @@ -238,10 +211,9 @@ else if(delayScaleDownBySecs == 0L || lastReqstAge > delayScaleDownBySecs) { if (excess > 0 && shouldScaleDown(now, prevScalingActivity, rule)) { ScalingActivity scalingActivity = scalingActivityMap.get(rule.getRuleName()); long lastReqstAge = (now - scalingActivity.scaleDownRequestedAt) / 1000L; - if(delayScaleDownBySecs>0L && lastReqstAge > 2 * delayScaleDownBySecs) { // reset the request at time + if (delayScaleDownBySecs > 0L && lastReqstAge > 2 * delayScaleDownBySecs) { // reset the request at time scalingActivity.scaleDownRequestedAt = now; - } - else if(delayScaleDownBySecs == 0L || lastReqstAge > delayScaleDownBySecs) { + } else if (delayScaleDownBySecs == 0L || lastReqstAge > delayScaleDownBySecs) { final int size = vmCollection.size(rule.getRuleName()); if (rule.getMinSize() > (size - excess)) excess = Math.max(0, size - rule.getMinSize()); @@ -255,24 +227,24 @@ else if(delayScaleDownBySecs == 0L || lastReqstAge > delayScaleDownBySecs) { StringBuilder sBuilder = new StringBuilder(); for (String host : hostsToTerminate.keySet()) { sBuilder.append(host).append(", "); - assignableVMs.disableUntil(host, now + rule.getCoolDownSecs() * 1000); + long disabledDurationInSecs = (disabledVmDurationInSecs > 0L) ? disabledVmDurationInSecs : rule.getCoolDownSecs(); + assignableVMs.disableUntil(host, now + disabledDurationInSecs * 1000); } logger.info("Scaling down " + rule.getRuleName() + " by " + hostsToTerminate.size() + " hosts (" + sBuilder.toString() + ")"); allHostsToTerminate.addAll(hostsToTerminate.values()); } } - } else if(hostAttributeGroup.shortFall>0 || (excess<=0 && shouldScaleUp(now, prevScalingActivity, rule))) { - if (hostAttributeGroup.shortFall>0 || rule.getMinIdleHostsToKeep() > hostAttributeGroup.idleHosts.size()) { + } else if (hostAttributeGroup.shortFall > 0 || (excess <= 0 && shouldScaleUp(now, prevScalingActivity, rule))) { + if (hostAttributeGroup.shortFall > 0 || rule.getMinIdleHostsToKeep() > hostAttributeGroup.idleHosts.size()) { // scale up to rule.getMaxIdleHostsToKeep() instead of just until rule.getMinIdleHostsToKeep() // but, if not shouldScaleUp(), then, scale up due to shortfall ScalingActivity scalingActivity = scalingActivityMap.get(rule.getRuleName()); long lastReqstAge = (now - scalingActivity.scaleUpRequestedAt) / 1000L; - if(delayScaleUpBySecs >0L && lastReqstAge > 2 * delayScaleUpBySecs) { // reset the request at time + if (delayScaleUpBySecs > 0L && lastReqstAge > 2 * delayScaleUpBySecs) { // reset the request at time scalingActivity.scaleUpRequestedAt = now; - } - else if(delayScaleUpBySecs ==0L || lastReqstAge > delayScaleUpBySecs) { - int shortage = (excess<=0 && shouldScaleUp(now, prevScalingActivity, rule))? + } else if (delayScaleUpBySecs == 0L || lastReqstAge > delayScaleUpBySecs) { + int shortage = (excess <= 0 && shouldScaleUp(now, prevScalingActivity, rule)) ? rule.getMaxIdleHostsToKeep() - hostAttributeGroup.idleHosts.size() : 0; shortage = Math.max(shortage, hostAttributeGroup.shortFall); @@ -287,15 +259,18 @@ else if(delayScaleUpBySecs ==0L || lastReqstAge > delayScaleUpBySecs) { scalingActivity.type = AutoScaleAction.Type.Up; logger.info("Scaling up " + rule.getRuleName() + " by " + shortage + " hosts"); - callback.call(new ScaleUpAction(rule.getRuleName(), shortage)); + int finalShortage = shortage; + callbacks.add(() -> callback.call(new ScaleUpAction(rule.getRuleName(), finalShortage))); } } } } // Run scale-down action after scale up (if any) - if(!allHostsToTerminate.isEmpty()) { - callback.call(new ScaleDownAction(rule.getRuleName(), allHostsToTerminate)); + if (!allHostsToTerminate.isEmpty()) { + callbacks.add(() -> callback.call(new ScaleDownAction(rule.getRuleName(), allHostsToTerminate))); } + + return callbacks; } private void populateIdleResources(List idleResources, @@ -312,7 +287,7 @@ private void populateIdleResources(List idleResources, } // Leases from inactive clusters - for(VirtualMachineLease l : idleInactiveResources) { + for (VirtualMachineLease l : idleInactiveResources) { getAttribute(l).ifPresent(attrValue -> { if (leasesMap.containsKey(attrValue)) { leasesMap.get(attrValue).idleInactiveHosts.add(l); @@ -341,13 +316,13 @@ private Map setupHostAttributeGroupMap(AutoScaleRule // make scaling activity happen after a fixed delayed time for the first time encountered (e.g., server start) private long getInitialCoolDown(long coolDownSecs) { - long initialCoolDownInPastSecs=120; + long initialCoolDownInPastSecs = 120; initialCoolDownInPastSecs = Math.min(coolDownSecs, initialCoolDownInPastSecs); - return System.currentTimeMillis()- coolDownSecs*1000 + initialCoolDownInPastSecs*1000; + return System.currentTimeMillis() - coolDownSecs * 1000 + initialCoolDownInPastSecs * 1000; } private Map getHostsToTerminate(List idleHosts, int excess) { - if(scaleDownConstraintExecutor == null) { + if (scaleDownConstraintExecutor == null) { return idleHosts.isEmpty() ? Collections.emptyMap() : getHostsToTerminateLegacy(idleHosts, excess); } else { return getHostsToTerminateUsingCriteria(idleHosts, excess); @@ -355,7 +330,7 @@ private Map getHostsToTerminate(List idleHo } private Map getInactiveHostsToTerminate(List inactiveIdleHosts) { - if(scaleDownConstraintExecutor == null) { + if (scaleDownConstraintExecutor == null) { return Collections.emptyMap(); } else { Map hostsMap = new HashMap<>(); @@ -375,10 +350,10 @@ private Map getHostsToTerminateUsingCriteria(List activeIds = idleHosts.stream().map(VirtualMachineLease::hostname).collect(Collectors.toSet()); int activeCounter = 0; Iterator it = result.iterator(); - while(it.hasNext()) { + while (it.hasNext()) { VirtualMachineLease leaseToRemove = it.next(); - if(activeIds.contains(leaseToRemove.hostname())) { - if(activeCounter < excess) { + if (activeIds.contains(leaseToRemove.hostname())) { + if (activeCounter < excess) { activeCounter++; } else { it.remove(); @@ -394,22 +369,22 @@ private Map getHostsToTerminateLegacy(List Map result = new HashMap<>(); final Map> hostsMap = new HashMap<>(); final String defaultAttributeName = "default"; - for(VirtualMachineLease host: hosts) { + for (VirtualMachineLease host : hosts) { final Protos.Attribute attribute = host.getAttributeMap().get(scaleDownBalancedByAttributeName); - String val = (attribute!=null && attribute.hasText())? attribute.getText().getValue() : defaultAttributeName; - if(hostsMap.get(val) == null) + String val = (attribute != null && attribute.hasText()) ? attribute.getText().getValue() : defaultAttributeName; + if (hostsMap.get(val) == null) hostsMap.put(val, new ArrayList()); hostsMap.get(val).add(host); } final List> lists = new ArrayList<>(); - for(List l: hostsMap.values()) + for (List l : hostsMap.values()) lists.add(l); - int taken=0; - while(taken takeFrom=null; - int max=0; - for(List l: lists) { - if(l.size()>max) { + int taken = 0; + while (taken < excess) { + List takeFrom = null; + int max = 0; + for (List l : lists) { + if (l.size() > max) { max = l.size(); takeFrom = l; } @@ -422,10 +397,10 @@ private Map getHostsToTerminateLegacy(List } private String getMappedHostname(VirtualMachineLease lease) { - if(mapHostnameAttributeName==null || mapHostnameAttributeName.isEmpty()) + if (mapHostnameAttributeName == null || mapHostnameAttributeName.isEmpty()) return lease.hostname(); Protos.Attribute attribute = lease.getAttributeMap().get(mapHostnameAttributeName); - if(attribute==null) { + if (attribute == null) { logger.error("Didn't find attribute " + mapHostnameAttributeName + " for host " + lease.hostname()); return lease.hostname(); } @@ -437,7 +412,45 @@ public void setCallback(Action1 callback) { } void shutdown() { - if(isShutdown.compareAndSet(false, true)) + if (isShutdown.compareAndSet(false, true)) executor.shutdown(); } -} + + private static class HostAttributeGroup { + String name; + List idleHosts; + List idleInactiveHosts; + int shortFall; + AutoScaleRule rule; + + private HostAttributeGroup(String name, AutoScaleRule rule) { + this.name = name; + this.rule = rule; + this.idleHosts = new ArrayList<>(); + this.idleInactiveHosts = new ArrayList<>(); + this.shortFall = 0; + } + } + + private static class ScalingActivity { + private long scaleUpAt; + private long scaleUpRequestedAt; + private long scaleDownAt; + private long scaleDownRequestedAt; + private long inactiveScaleDownAt; + private long inactiveScaleDownRequestedAt; + private int shortfall; + private int scaledNumInstances; + private AutoScaleAction.Type type; + + private ScalingActivity(long scaleUpAt, long scaleDownAt, int shortfall, int scaledNumInstances, AutoScaleAction.Type type) { + this.scaleUpAt = scaleUpAt; + scaleUpRequestedAt = 0L; + this.scaleDownAt = scaleDownAt; + scaleDownRequestedAt = 0L; + this.shortfall = shortfall; + this.scaledNumInstances = scaledNumInstances; + this.type = type; + } + } +} \ No newline at end of file diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/InternalVMCloner.java b/fenzo-core/src/main/java/com/netflix/fenzo/InternalVMCloner.java index 178c7d8..fa27451 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/InternalVMCloner.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/InternalVMCloner.java @@ -16,8 +16,6 @@ package com.netflix.fenzo; -import org.apache.mesos.Protos; - import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -25,11 +23,14 @@ import java.util.List; import java.util.Map; +import org.apache.mesos.Protos; + /* package */ class InternalVMCloner { /** * Creates a VM lease object representing the max of resources from the given collection of VMs. Attributes * are created by adding all unique attributes across the collection. + * * @param avms Collection of VMs to create the new lease object from. * @return VM lease object representing the max of resources from the given collection of VMs */ @@ -42,38 +43,45 @@ final Map scalars = new HashMap<>(); final Map attributeMap = new HashMap<>(); if (avms != null) { - for(AssignableVirtualMachine avm: avms) { - final Map resourceStatus = avm.getResourceStatus(); - Double[] values = resourceStatus.get(VMResource.CPU); - if (values != null && values.length == 2) - cpus = Math.max(cpus, values[0] + values[1]); - values = resourceStatus.get(VMResource.Memory); - if (values != null && values.length == 2) - mem = Math.max(mem, values[0] + values[1]); - values = resourceStatus.get(VMResource.Disk); - if (values != null && values.length == 2) - disk = Math.max(disk, values[0] + values[1]); - values = resourceStatus.get(VMResource.Network); - if (values != null && values.length == 2) - network = Math.max(network, values[0] + values[1]); - values = resourceStatus.get(VMResource.Ports); - if (values != null && values.length == 2) - ports = Math.max(ports, values[0] + values[1]); + for (AssignableVirtualMachine avm : avms) { + Map maxResources = avm.getMaxResources(); + Double value = maxResources.get(VMResource.CPU); + if (value != null) { + cpus = Math.max(cpus, value); + } + value = maxResources.get(VMResource.Memory); + if (value != null) { + mem = Math.max(mem, value); + } + value = maxResources.get(VMResource.Disk); + if (value != null) { + disk = Math.max(disk, value); + } + value = maxResources.get(VMResource.Network); + if (value != null) { + network = Math.max(network, value); + } + value = maxResources.get(VMResource.Ports); + if (value != null) { + ports = Math.max(ports, value); + } final Map maxScalars = avm.getMaxScalars(); if (maxScalars != null && !maxScalars.isEmpty()) { - for (String k: maxScalars.keySet()) + for (String k : maxScalars.keySet()) scalars.compute(k, (s, oldVal) -> { - if (oldVal == null) + if (oldVal == null) { oldVal = 0.0; + } Double aDouble = maxScalars.get(k); - if (aDouble == null) + if (aDouble == null) { aDouble = 0.0; + } return oldVal + aDouble; }); } final Map attrs = avm.getCurrTotalLease().getAttributeMap(); if (attrs != null && !attrs.isEmpty()) { - for (Map.Entry e: attrs.entrySet()) + for (Map.Entry e : attrs.entrySet()) attributeMap.putIfAbsent(e.getKey(), e.getValue()); } } @@ -83,7 +91,7 @@ final double fdisk = disk; final double fnetwork = network; final List fports = Collections.singletonList( - new VirtualMachineLease.Range(100, 100 + (int)ports)); + new VirtualMachineLease.Range(100, 100 + (int) ports)); return new VirtualMachineLease() { @Override public String getId() { @@ -156,7 +164,7 @@ public Map getScalarValues() { final List ports = new LinkedList<>(); final List ranges = lease.portRanges(); if (ranges != null && !ranges.isEmpty()) { - for (VirtualMachineLease.Range r: ranges) + for (VirtualMachineLease.Range r : ranges) ports.add(new VirtualMachineLease.Range(r.getBeg(), r.getEnd())); } final double cpus = lease.cpuCores(); diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/StateMonitor.java b/fenzo-core/src/main/java/com/netflix/fenzo/StateMonitor.java index bcc0748..43082b1 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/StateMonitor.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/StateMonitor.java @@ -16,27 +16,21 @@ package com.netflix.fenzo; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; /** * A monitor to ensure scheduler state is not compromised by concurrent calls that are disallowed. */ class StateMonitor { - private final AtomicBoolean lock; - - StateMonitor() { - lock = new AtomicBoolean(false); - } + private final ReentrantLock lock = new ReentrantLock(); AutoCloseable enter() { - if(!lock.compareAndSet(false, true)) + if (!lock.tryLock()) { throw new IllegalStateException(); - return new AutoCloseable() { - @Override - public void close() throws Exception { - if(!lock.compareAndSet(true, false)) - throw new IllegalStateException(); - } + } + return () -> { + if (!lock.tryLock()) + throw new IllegalStateException(); }; } diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java index 9d9577a..5a97d12 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java @@ -89,6 +89,7 @@ public final static class Builder { private Action1 autoscalerCallback=null; private long delayAutoscaleUpBySecs=0L; private long delayAutoscaleDownBySecs=0L; + private long disabledVmDurationInSecs =0L; private List autoScaleRules=new ArrayList<>(); private Func1 isFitnessGoodEnoughFunction = new Func1() { @Override @@ -376,6 +377,23 @@ public Builder withDelayAutoscaleDownBySecs(long delayAutoscaleDownBySecs) { return this; } + /** + * How long to disable a VM when going through a scale down action. If this value is not set then the + * value used will be the {@link AutoScaleRule#getCoolDownSecs()} value. If the supplied {@link AutoScaleAction} + * does not actually terminate the instance in this time frame then VM will become enabled. + + * @param disabledVmDurationInSecs Disable VMs about to be terminated by this many seconds. + * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskScheduler} + * @throws IllegalArgumentException if {@code disabledVmDurationInSecs} is not greater than 0. + * @see Autoscaling + */ + public Builder withAutoscaleDisabledVmDurationInSecs(long disabledVmDurationInSecs) { + if(disabledVmDurationInSecs > 0L) + throw new IllegalArgumentException("disabledVmDurationInSecs must be greater than 0: " + disabledVmDurationInSecs); + this.disabledVmDurationInSecs = disabledVmDurationInSecs; + return this; + } + /** * Indicate that the cluster receives resource offers only once per VM (host). Normally, Mesos sends resource * offers multiple times, as resources free up on the host upon completion of various tasks. This method @@ -466,6 +484,9 @@ private TaskScheduler(Builder builder) { autoScaler.setDelayScaleDownBySecs(builder.delayAutoscaleDownBySecs); if(builder.delayAutoscaleUpBySecs > 0L) autoScaler.setDelayScaleUpBySecs(builder.delayAutoscaleUpBySecs); + if (builder.disabledVmDurationInSecs > 0L) { + autoScaler.setDisabledVmDurationInSecs(builder.disabledVmDurationInSecs); + } } else { autoScaler=null; @@ -682,8 +703,7 @@ public Assignable next() { TaskIterator taskIterator, List newLeases) throws IllegalStateException { checkIfShutdown(); - try (AutoCloseable - ac = stateMonitor.enter()) { + try (AutoCloseable ac = stateMonitor.enter()) { long start = System.currentTimeMillis(); final SchedulingResult schedulingResult = doSchedule(taskIterator, newLeases); if((lastVMPurgeAt + purgeVMsIntervalSecs*1000) < System.currentTimeMillis()) { diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java b/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java index 7c4fdd5..b3676cb 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java @@ -99,7 +99,6 @@ public RemoveTaskRequest(String taskId, QAttributes qAttributes, String hostname private final BlockingQueue leaseBlockingQueue = new LinkedBlockingQueue<>(); private final BlockingQueue> addRunningTasksQueue = new LinkedBlockingQueue<>(); private final BlockingQueue removeTasksQueue = new LinkedBlockingQueue<>(); - private final BlockingQueue>> pseudoSchedulingRequestQ = new LinkedBlockingQueue<>(); private final BlockingQueue>>> taskMapRequest = new LinkedBlockingQueue<>(10); private final BlockingQueue>>> resStatusRequest = new LinkedBlockingQueue<>(10); private final BlockingQueue>> vmCurrStateRequest = new LinkedBlockingQueue<>(10); @@ -152,9 +151,7 @@ public boolean isShutdown() { } /* package */ Map requestPsuedoScheduling(final InternalTaskQueue pTaskQueue, Map groupCounts) { - final AtomicReference> pseudoSchedResult = new AtomicReference<>(Collections.emptyMap()); - final CountDownLatch latch = new CountDownLatch(1); - pseudoSchedulingRequestQ.offer((newLeases) -> { + Map psuedoSchedulingResult; try { logger.debug("Creating pseudo hosts"); final Map> pseudoHosts = taskScheduler.createPseudoHosts(groupCounts); @@ -185,8 +182,8 @@ public boolean isShutdown() { } // temporarily replace usage tracker in taskTracker to the pseudoQ and then put back the original one taskScheduler.getTaskTracker().setUsageTrackedQueue(pTaskQueue.getUsageTracker()); - logger.debug("Scheduling with pseudoQ and " + newLeases.size() + " new leases"); - final SchedulingResult schedulingResult = taskScheduler.scheduleOnce(pTaskQueue, newLeases); + logger.debug("Scheduling with pseudoQ"); + final SchedulingResult schedulingResult = taskScheduler.scheduleOnce(pTaskQueue, Collections.emptyList()); final Map resultMap = schedulingResult.getResultMap(); Map result = new HashMap<>(); if (!resultMap.isEmpty()) { @@ -212,8 +209,7 @@ else if(pHostsAdded > 0) { if (tars == null || tars.isEmpty()) logger.debug("No pseudo assignment failures for task " + entry.getKey()); else { - StringBuilder b = new StringBuilder("Pseudo assignment failures for task ").append(entry.getKey()) - .append(": "); + StringBuilder b = new StringBuilder("Pseudo assignment failures for task ").append(entry.getKey()).append(": "); for (TaskAssignmentResult r: tars) { b.append("HOST: ").append(r.getHostname()).append(":"); final List afs = r.getFailures(); @@ -227,7 +223,7 @@ else if(pHostsAdded > 0) { } } } - pseudoSchedResult.set(result); + psuedoSchedulingResult = result; } catch (Exception e) { logger.error("Error in pseudo scheduling", e); @@ -243,17 +239,7 @@ else if(pHostsAdded > 0) { logger.error("Error in pseudo scheduling", e); throw e; } - finally { - latch.countDown(); - } - }); - try { - if (latch.await(Math.max(5000, maxSchedIterDelay * 3), TimeUnit.SECONDS))// arbitrary long sleep, latch should return earlier than that - return pseudoSchedResult.get(); - } catch (InterruptedException e) { - logger.warn("Timeout waiting for psuedo scheduling result"); - } - return Collections.emptyMap(); + return psuedoSchedulingResult; } private void scheduleOnce() { @@ -271,18 +257,13 @@ private void scheduleOnce() { addPendingRunningTasks(); removeTasks(); final boolean newLeaseExists = leaseBlockingQueue.peek() != null; - final Action1> pseudoSchedAction = pseudoSchedulingRequestQ.poll(); - if ( qModified || newLeaseExists || doNextIteration() || pseudoSchedAction != null) { + if (qModified || newLeaseExists || doNextIteration()) { taskScheduler.setTaskToClusterAutoScalerMapGetter(taskToClusterAutoScalerMapGetter); lastSchedIterationAt.set(System.currentTimeMillis()); if (preHook != null) preHook.call(); List currentLeases = new ArrayList<>(); leaseBlockingQueue.drainTo(currentLeases); - if (pseudoSchedAction != null) { - pseudoSchedAction.call(currentLeases); - currentLeases.clear(); - } final SchedulingResult schedulingResult = taskScheduler.scheduleOnce(taskQueue, currentLeases); // mark end of scheduling iteration before assigning tasks. taskQueue.getUsageTracker().reset(); @@ -510,7 +491,7 @@ public Builder withSchedulingResultCallback(Action1 callback) * @param taskQ The task queue from which to get tasks for assignment of resoruces. * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskSchedulingService}. */ - public Builder withTaskQuue(TaskQueue taskQ) { + public Builder withTaskQueue(TaskQueue taskQ) { if (!(taskQ instanceof InternalTaskQueue)) throw new IllegalArgumentException("Argument is not a valid implementation of task queue"); taskQueue = (InternalTaskQueue) taskQ; diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/VMCollection.java b/fenzo-core/src/main/java/com/netflix/fenzo/VMCollection.java index 47ae846..2b80104 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/VMCollection.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/VMCollection.java @@ -78,8 +78,10 @@ Map> clonePseudoVMsForGroups(Map groupCoun result.put(g, hostnames); final ConcurrentMap avmsMap = vms.get(g); if (avmsMap != null) { - final List vmsList = avmsMap.values().stream() - .filter(avm -> vmFilter.test(avm.getCurrTotalLease())).collect(Collectors.toList()); + final List vmsList = avmsMap.values() + .stream() + .filter(avm -> vmFilter.test(avm.getCurrTotalLease())) + .collect(Collectors.toList()); if (vmsList != null && !vmsList.isEmpty()) { // NOTE: a shortcoming here is that the attributes of VMs across a group may not be homogeneous. // By creating one lease object and cloning from it, we pick one combination of the attributes diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/samples/SampleQbasedScheduling.java b/fenzo-core/src/main/java/com/netflix/fenzo/samples/SampleQbasedScheduling.java index 2c57000..71aabdb 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/samples/SampleQbasedScheduling.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/samples/SampleQbasedScheduling.java @@ -168,7 +168,7 @@ public void call() { System.out.println("Starting scheduling iteration " + schedCounter.incrementAndGet()); } }) - .withTaskQuue(queue) + .withTaskQueue(queue) .withTaskScheduler(taskScheduler) // TaskSchedulingService will call us back when there are task assignments. Handle them by launching // tasks using MesosDriver diff --git a/fenzo-core/src/test/java/com/netflix/fenzo/AutoScalerTest.java b/fenzo-core/src/test/java/com/netflix/fenzo/AutoScalerTest.java index 1128562..11ce63e 100644 --- a/fenzo-core/src/test/java/com/netflix/fenzo/AutoScalerTest.java +++ b/fenzo-core/src/test/java/com/netflix/fenzo/AutoScalerTest.java @@ -16,10 +16,25 @@ package com.netflix.fenzo; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import com.netflix.fenzo.functions.Action1; +import com.netflix.fenzo.functions.Func1; import com.netflix.fenzo.plugins.BinPackingFitnessCalculators; import com.netflix.fenzo.plugins.HostAttrValueConstraint; import com.netflix.fenzo.queues.QAttributes; -import com.netflix.fenzo.queues.QueuableTask; import com.netflix.fenzo.queues.TaskQueue; import com.netflix.fenzo.queues.TaskQueues; import com.netflix.fenzo.queues.tiered.QueuableTaskProvider; @@ -28,30 +43,47 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.netflix.fenzo.functions.Action1; -import com.netflix.fenzo.functions.Func1; - -import java.util.*; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; public class AutoScalerTest { + static final Action1 noOpLeaseReject = lease -> { + }; static String hostAttrName = "MachineType"; - final int minIdle=5; - final int maxIdle=10; - final long coolDownSecs=2; - final String hostAttrVal1="4coreServers"; - final String hostAttrVal2="8coreServers"; - int cpus1=4; - int memory1=40000; - int cpus2=8; - int memory2=800; // make this less than memory1/cpus1 to ensure jobs don't get on these - final AutoScaleRule rule1 = AutoScaleRuleProvider.createRule(hostAttrVal1, minIdle, maxIdle, coolDownSecs, cpus1/2, memory1/2); - final AutoScaleRule rule2 = AutoScaleRuleProvider.createRule(hostAttrVal2, minIdle, maxIdle, coolDownSecs, cpus2/2, memory2/2); + final int minIdle = 5; + final int maxIdle = 10; + final long coolDownSecs = 2; + final String hostAttrVal1 = "4coreServers"; + final String hostAttrVal2 = "8coreServers"; + int cpus1 = 4; + int memory1 = 40000; + final AutoScaleRule rule1 = AutoScaleRuleProvider.createRule(hostAttrVal1, minIdle, maxIdle, coolDownSecs, cpus1 / 2, memory1 / 2); + int cpus2 = 8; + int memory2 = 800; // make this less than memory1/cpus1 to ensure jobs don't get on these + final AutoScaleRule rule2 = AutoScaleRuleProvider.createRule(hostAttrVal2, minIdle, maxIdle, coolDownSecs, cpus2 / 2, memory2 / 2); + + /* package */ + static TaskScheduler getScheduler(final Action1 leaseRejectCalback, final Action1 callback, + long delayScaleUpBySecs, long delayScaleDownByDecs, + AutoScaleRule... rules) { + TaskScheduler.Builder builder = new TaskScheduler.Builder() + .withAutoScaleByAttributeName(hostAttrName); + for (AutoScaleRule rule : rules) + builder.withAutoScaleRule(rule); + if (callback != null) + builder.withAutoScalerCallback(callback); + return builder + .withDelayAutoscaleDownBySecs(delayScaleDownByDecs) + .withDelayAutoscaleUpBySecs(delayScaleUpBySecs) + .withFitnessCalculator(BinPackingFitnessCalculators.cpuMemBinPacker) + .withLeaseOfferExpirySecs(3600) + .withLeaseRejectAction(lease -> { + if (leaseRejectCalback == null) + Assert.fail("Unexpected to reject lease " + lease.hostname()); + else + leaseRejectCalback.call(lease); + }) + .build(); + } @Before public void setUp() throws Exception { @@ -63,40 +95,18 @@ public void tearDown() throws Exception { } - static final Action1 noOpLeaseReject = lease -> {}; - private TaskScheduler getScheduler(AutoScaleRule... rules) { return getScheduler(null, rules); } + private TaskScheduler getScheduler(final Action1 leaseRejectCalback, AutoScaleRule... rules) { return getScheduler(leaseRejectCalback, null, 0, 0, rules); } + private TaskScheduler getScheduler(final Action1 leaseRejectCalback, final Action1 callback, AutoScaleRule... rules) { return getScheduler(leaseRejectCalback, callback, 0, 0, rules); } - /* package */ static TaskScheduler getScheduler(final Action1 leaseRejectCalback, final Action1 callback, - long delayScaleUpBySecs, long delayScaleDownByDecs, - AutoScaleRule... rules) { - TaskScheduler.Builder builder = new TaskScheduler.Builder() - .withAutoScaleByAttributeName(hostAttrName); - for(AutoScaleRule rule: rules) - builder.withAutoScaleRule(rule); - if(callback != null) - builder.withAutoScalerCallback(callback); - return builder - .withDelayAutoscaleDownBySecs(delayScaleDownByDecs) - .withDelayAutoscaleUpBySecs(delayScaleUpBySecs) - .withFitnessCalculator(BinPackingFitnessCalculators.cpuMemBinPacker) - .withLeaseOfferExpirySecs(3600) - .withLeaseRejectAction(lease -> { - if(leaseRejectCalback == null) - Assert.fail("Unexpected to reject lease " + lease.hostname()); - else - leaseRejectCalback.call(lease); - }) - .build(); - } // Test autoscale up on a simple rule // - Setup an auto scale rule @@ -111,20 +121,20 @@ public void scaleUpTest1() throws Exception { scheduler.setAutoscalerCallback(new Action1() { @Override public void call(AutoScaleAction action) { - if(action instanceof ScaleUpAction) { - int needed = ((ScaleUpAction)action).getScaleUpCount(); + if (action instanceof ScaleUpAction) { + int needed = ((ScaleUpAction) action).getScaleUpCount(); scaleUpRequest.set(needed); latch.countDown(); } } }); List requests = new ArrayList<>(); - int i=0; + int i = 0; do { Thread.sleep(1000); scheduler.scheduleOnce(requests, leases); - } while (i++<(coolDownSecs+2) && latch.getCount()>0); - if(latch.getCount()>0) + } while (i++ < (coolDownSecs + 2) && latch.getCount() > 0); + if (latch.getCount() > 0) Assert.fail("Timed out scale up action"); else Assert.assertEquals(maxIdle, scaleUpRequest.get()); @@ -145,7 +155,7 @@ public void scaleUpTest2() throws Exception { final CountDownLatch latch = new CountDownLatch(1); final AtomicBoolean addVMs = new AtomicBoolean(false); final List requests = new ArrayList<>(); - for(int i=0; i() { @Override @@ -158,30 +168,31 @@ public void call(AutoScaleAction action) { } } }); - int i=0; - boolean added=false; + int i = 0; + boolean added = false; do { Thread.sleep(1000); - if(!added && addVMs.get()) { + if (!added && addVMs.get()) { leases.addAll(LeaseProvider.getLeases(maxIdle, cpus1, memory1, 1, 10)); - added=true; + added = true; } SchedulingResult schedulingResult = scheduler.scheduleOnce(requests, leases); - Map resultMap = schedulingResult.getResultMap(); - if(added) { - int count=0; - for(VMAssignmentResult result: resultMap.values()) + Map resultMap = schedulingResult.getResultMap(); + if (added) { + int count = 0; + for (VMAssignmentResult result : resultMap.values()) count += result.getTasksAssigned().size(); Assert.assertEquals(requests.size(), count); requests.clear(); leases.clear(); } - } while (i++<(2*coolDownSecs+2) && latch.getCount()>0); - Assert.assertTrue("Second scale up action didn't arrive on time", latch.getCount()==0); + } while (i++ < (2 * coolDownSecs + 2) && latch.getCount() > 0); + Assert.assertTrue("Second scale up action didn't arrive on time", latch.getCount() == 0); } /** * Tests that the rule applies only to the host types specified and not to the other host type. + * * @throws Exception upon any error */ @Test @@ -195,19 +206,20 @@ public void testOneRuleTwoTypes() throws Exception { }, rule2); final List requests = new ArrayList<>(); final List leases = new ArrayList<>(); - for(int i=0; i0); - if(latch.getCount()<1) + } while (i++ < coolDownSecs + 2 && latch.getCount() > 0); + if (latch.getCount() < 1) Assert.fail("Should not have gotten scale up action for " + rule1.getRuleName()); } /** * Tests that of the two AutoScale rules setup, scale up action is called only on the one that is actually short. + * * @throws Exception */ @Test @@ -232,22 +244,23 @@ public void call(AutoScaleAction action) { .setType(Protos.Value.Type.TEXT) .setText(Protos.Value.Text.newBuilder().setValue(hostAttrVal2)).build(); attributes.put(hostAttrName, attribute); - for(int l=0; l0); - if(latch.getCount()<1) + } while (i++ < coolDownSecs + 2 && latch.getCount() > 0); + if (latch.getCount() < 1) Assert.fail("Scale up action received for " + rule2.getRuleName() + " rule, was expecting only on " + rule1.getRuleName()); } /** * Tests simple scale down action on host type that has excess capacity + * * @throws Exception */ @Test @@ -255,7 +268,7 @@ public void testSimpleScaleDownAction() throws Exception { final AtomicInteger scaleDownCount = new AtomicInteger(); TaskScheduler scheduler = getScheduler(noOpLeaseReject, rule1); final List requests = new ArrayList<>(); - for(int c=0; c leases = new ArrayList<>(); final CountDownLatch latch = new CountDownLatch(1); @@ -275,28 +288,29 @@ public void call(AutoScaleAction action) { .setType(Protos.Value.Type.TEXT) .setText(Protos.Value.Text.newBuilder().setValue(hostAttrVal1)).build(); attributes.put(hostAttrName, attribute); - int excess=3; - for(int l=0; l0); + } while (i++ < coolDownSecs + 2 && latch.getCount() > 0); Assert.assertEquals(0, latch.getCount()); // expect scale down count to be excess-1 since we used up 1 host - Assert.assertEquals(excess-1, scaleDownCount.get()); + Assert.assertEquals(excess - 1, scaleDownCount.get()); } /** * Tests that of the two rules, scale down is called only on the one that is in excess + * * @throws Exception */ @Test @@ -316,8 +330,8 @@ public void call(AutoScaleAction action) { } }); // use up servers covered by rule1 - for(int r=0; r ports = new ArrayList<>(); ports.add(new VirtualMachineLease.Range(1, 10)); Map attributes1 = new HashMap<>(); @@ -330,22 +344,22 @@ public void call(AutoScaleAction action) { .setType(Protos.Value.Type.TEXT) .setText(Protos.Value.Text.newBuilder().setValue(hostAttrVal2)).build(); attributes2.put(hostAttrName, attribute2); - for(int l=0; l0); - if(latch.getCount()<1) + } while (i++ < coolDownSecs + 2 && latch.getCount() > 0); + if (latch.getCount() < 1) Assert.fail("Scale down action received for " + wrongScaleDownRulename + " rule, was expecting only on " + rule1.getRuleName()); } @@ -356,19 +370,19 @@ public void call(AutoScaleAction action) { // hosts for each zone. @Test public void testScaleDownBalanced() throws Exception { - final String zoneAttrName="Zone"; - final int mxIdl=12; + final String zoneAttrName = "Zone"; + final int mxIdl = 12; final CountDownLatch latch = new CountDownLatch(1); final int[] zoneCounts = {0, 0, 0}; final List requests = new ArrayList<>(); // add enough jobs to fill two machines of zone 0 List hardConstraints = new ArrayList<>(); hardConstraints.add(ConstraintsProvider.getHostAttributeHardConstraint(zoneAttrName, "" + 1)); - for(int j=0; j leases = new ArrayList<>(); - final AutoScaleRule rule = AutoScaleRuleProvider.createRule(hostAttrVal1, 3, mxIdl, coolDownSecs, cpus1/2, memory1/2); + final AutoScaleRule rule = AutoScaleRuleProvider.createRule(hostAttrVal1, 3, mxIdl, coolDownSecs, cpus1 / 2, memory1 / 2); final TaskScheduler scheduler = new TaskScheduler.Builder() .withAutoScaleByAttributeName(hostAttrName) .withAutoScaleDownBalancedByAttributeName(zoneAttrName) @@ -404,21 +418,21 @@ public void call(AutoScaleAction action) { Protos.Attribute attr = Protos.Attribute.newBuilder().setName(hostAttrName) .setType(Protos.Value.Type.TEXT) .setText(Protos.Value.Text.newBuilder().setValue(hostAttrVal1)).build(); - for(int i=0; i<3; i++) { + for (int i = 0; i < 3; i++) { attributes.add(new HashMap()); Protos.Attribute attribute = Protos.Attribute.newBuilder().setName(zoneAttrName) .setType(Protos.Value.Type.TEXT) - .setText(Protos.Value.Text.newBuilder().setValue(""+i)).build(); + .setText(Protos.Value.Text.newBuilder().setValue("" + i)).build(); attributes.get(i).put(zoneAttrName, attribute); attributes.get(i).put(hostAttrName, attr); } - for(int l=0; l> scaleDownHostsRef = new AtomicReference<>(); final List requests = new ArrayList<>(); final List leases = new ArrayList<>(); - for(int j=0; j ports = new ArrayList<>(); ports.add(new VirtualMachineLease.Range(1, 10)); Map attributes1 = new HashMap<>(); @@ -463,9 +478,9 @@ public void testScaledDownHostOffer() throws Exception { .setType(Protos.Value.Type.TEXT) .setText(Protos.Value.Text.newBuilder().setValue(hostAttrVal1)).build(); attributes1.put(hostAttrName, attribute); - int excess=3; - for(int l=0; l() { @Override @@ -476,12 +491,12 @@ public void call(AutoScaleAction action) { } } }); - int i=0; - boolean first=true; - while (i++0) { + int i = 0; + boolean first = true; + while (i++ < coolDownSecs + 2 && latch.getCount() > 0) { scheduler.scheduleOnce(requests, leases); - if(first) { - first=false; + if (first) { + first = false; requests.clear(); leases.clear(); } @@ -490,37 +505,37 @@ public void call(AutoScaleAction action) { Assert.assertEquals(0, latch.getCount()); final List vmCurrentStates = scheduler.getVmCurrentStates(); long now = System.currentTimeMillis(); - for (VirtualMachineCurrentState s: vmCurrentStates) { + for (VirtualMachineCurrentState s : vmCurrentStates) { if (s.getDisabledUntil() > 0) System.out.println("********** " + s.getHostname() + " disabled for " + (s.getDisabledUntil() - now) + " mSecs"); } // remove any existing leases in scheduler // now generate offers for hosts that were scale down and ensure they don't get used - for(String hostname: scaleDownHostsRef.get()) { + for (String hostname : scaleDownHostsRef.get()) { leases.add(LeaseProvider.getLeaseOffer(hostname, cpus1, memory1, ports, attributes1)); } // now try to fill all machines minus one that we filled before - for(int j=0; j<(maxIdle+excess-1)*cpus1; j++) - requests.add(TaskRequestProvider.getTaskRequest(1, memory1/cpus1, 1)); - i=0; - first=true; + for (int j = 0; j < (maxIdle + excess - 1) * cpus1; j++) + requests.add(TaskRequestProvider.getTaskRequest(1, memory1 / cpus1, 1)); + i = 0; + first = true; do { SchedulingResult schedulingResult = scheduler.scheduleOnce(requests, leases); - if(!schedulingResult.getResultMap().isEmpty()) { - for(Map.Entry entry: schedulingResult.getResultMap().entrySet()) { + if (!schedulingResult.getResultMap().isEmpty()) { + for (Map.Entry entry : schedulingResult.getResultMap().entrySet()) { Assert.assertFalse("Did not expect scaled down host " + entry.getKey() + " to be assigned again", isInCollection(entry.getKey(), scaleDownHostsRef.get())); - for(int j=0; j requests = new ArrayList<>(); final List leases = new ArrayList<>(); - for(int i=0; i()); @@ -582,9 +597,9 @@ public void testAddingNewRule() throws Exception { attributes2.put(hostAttrName, attribute2); List ports = new ArrayList<>(); ports.add(new VirtualMachineLease.Range(1, 10)); - for(int l=0; l resultMap = schedulingResult.getResultMap(); @@ -628,7 +643,7 @@ public void call(AutoScaleAction action) { if (r.isSuccessful()) { successes++; scheduler.getTaskAssigner().call(r.getRequest(), entry.getKey()); - switch ((int)r.getRequest().getCPUs()) { + switch ((int) r.getRequest().getCPUs()) { case 1: Assert.assertTrue("Expecting assignment on small host", entry.getKey().startsWith("smallhost")); break; @@ -646,11 +661,11 @@ public void call(AutoScaleAction action) { } Thread.sleep(1000); } - if(!latchSmallHosts.await(5, TimeUnit.SECONDS)) + if (!latchSmallHosts.await(5, TimeUnit.SECONDS)) Assert.fail("Small hosts scale up not triggered"); - Assert.assertTrue("Small hosts to add>0", hostsToAdd.get()>0); - for(int i=0; i0", hostsToAdd.get() > 0); + for (int i = 0; i < hostsToAdd.get(); i++) + leases.add(LeaseProvider.getLeaseOffer("smallhost" + 100 + i, cpus1, memory1, ports, attributes1)); final CountDownLatch latchBigHosts = new CountDownLatch(1); scheduler.addOrReplaceAutoScaleRule(AutoScaleRuleProvider.createRule(hostAttrVal2, minIdle, maxIdle, coolDownSecs, 1, 1000)); scheduler.setAutoscalerCallback(new Action1() { @@ -669,14 +684,14 @@ public void call(AutoScaleAction action) { } } }); - for(int i=0; i ports = new ArrayList<>(); ports.add(new VirtualMachineLease.Range(1, 10)); - for(int l=0; l resultMap = schedulingResult.getResultMap(); @@ -728,7 +743,7 @@ public void call(AutoScaleAction action) { } Thread.sleep(1000); } - if(!latch.await(2, TimeUnit.SECONDS)) + if (!latch.await(2, TimeUnit.SECONDS)) Assert.fail("Didn't get scale up action"); scheduler.removeAutoScaleRule(hostAttrVal1); scheduler.addOrReplaceAutoScaleRule(AutoScaleRuleProvider.createRule(hostAttrVal2, minIdle, maxIdle, coolDownSecs, 1, 1000)); @@ -736,7 +751,7 @@ public void call(AutoScaleAction action) { scheduler.setAutoscalerCallback(new Action1() { @Override public void call(AutoScaleAction action) { - if(action.getType() == AutoScaleAction.Type.Up) { + if (action.getType() == AutoScaleAction.Type.Up) { switch (action.getRuleName()) { case hostAttrVal1: Assert.fail("Shouldn't have gotten autoscale action"); @@ -748,7 +763,7 @@ public void call(AutoScaleAction action) { } } }); - for(int i=0; i hostList) { - for(String h: hostList) - if(h.equals(host)) + for (String h : hostList) + if (h.equals(host)) return true; return false; } @@ -800,31 +815,30 @@ public void call(AutoScaleAction autoScaleAction) { .setText(Protos.Value.Text.newBuilder().setValue(hostAttrVal1)).build(); attributes.put(hostAttrName, attribute); final List leases = new ArrayList<>(); - for(int l=0; l0) + for (int l = 0; l < minIdle; l++) + leases.add(LeaseProvider.getLeaseOffer("host" + l, cpus1, memory1, ports, attributes)); + for (int i = 0; i < coolDownSecs + 1; i++) { + if (i > 0) leases.clear(); final SchedulingResult result = scheduler.scheduleOnce(Collections.emptyList(), leases); Thread.sleep(1000); } Assert.assertFalse("Unexpected to scale DOWN", scaleDownReceived.get()); Assert.assertFalse("Unexpected to scale UP", scaleUpReceived.get()); - for(int o=0; o tasks = new ArrayList<>(); - if(i==0) { + if (i == 0) { tasks.add(TaskRequestProvider.getTaskRequest(1, 1000, 1)); } final SchedulingResult result = scheduler.scheduleOnce(tasks, leases); final Map resultMap = result.getResultMap(); - if(!tasks.isEmpty()) { + if (!tasks.isEmpty()) { Assert.assertTrue(resultMap.size() == 1); leases.add(LeaseProvider.getConsumedLease(resultMap.values().iterator().next())); - } - else + } else leases.clear(); Thread.sleep(1000); } @@ -865,7 +879,7 @@ public void call(AutoScaleAction autoScaleAction) { } } }; - long scaleDownDelay=3; + long scaleDownDelay = 3; TaskScheduler scheduler = getScheduler(noOpLeaseReject, callback, 0, scaleDownDelay, rule1); List ports = new ArrayList<>(); ports.add(new VirtualMachineLease.Range(1, 10)); @@ -875,10 +889,10 @@ public void call(AutoScaleAction autoScaleAction) { .setText(Protos.Value.Text.newBuilder().setValue(hostAttrVal1)).build(); attributes.put(hostAttrName, attribute); final List leases = new ArrayList<>(); - for(int l=0; l tasks = new ArrayList<>(); - for(int t=0; t<2; t++) + for (int t = 0; t < 2; t++) tasks.add(TaskRequestProvider.getTaskRequest(cpus1, memory1, 1)); final SchedulingResult result = scheduler.scheduleOnce(tasks, leases); final Map resultMap = result.getResultMap(); @@ -889,22 +903,21 @@ public void call(AutoScaleAction autoScaleAction) { // mark completion of 1 task; add back one of the leases leases.add(resultMap.values().iterator().next().getLeasesUsed().iterator().next()); // run scheduler for (scaleDownDelay-1) secs and ensure we didn't get scale down request - for(int i=0; iemptyList(), leases); leases.clear(); Thread.sleep(1000); @@ -929,9 +942,9 @@ public void call(AutoScaleAction autoScaleAction) { // Test that with a rule that has min size defined, we don't try to scale down below the min size, even though there are too many idle @Test public void testRuleWithMinSize() throws Exception { - final int minSize=10; - final int extra=2; - long cooldown=2; + final int minSize = 10; + final int extra = 2; + long cooldown = 2; AutoScaleRule rule = AutoScaleRuleProvider.createWithMinSize("cluster1", 2, 5, cooldown, 1.0, 1000, minSize); Map attributes = new HashMap<>(); Protos.Attribute attribute = Protos.Attribute.newBuilder().setName(hostAttrName) @@ -941,16 +954,16 @@ public void testRuleWithMinSize() throws Exception { ports.add(new VirtualMachineLease.Range(1, 10)); attributes.put(hostAttrName, attribute); final List leases = new ArrayList<>(); - for(int l=0; l callback = autoScaleAction -> { if (autoScaleAction instanceof ScaleDownAction) { - scaleDown.addAndGet(((ScaleDownAction)autoScaleAction).getHosts().size()); + scaleDown.addAndGet(((ScaleDownAction) autoScaleAction).getHosts().size()); } }; final TaskScheduler scheduler = getScheduler(noOpLeaseReject, callback, rule); - for (int i=0; i attributes = new HashMap<>(); Protos.Attribute attribute = Protos.Attribute.newBuilder().setName(hostAttrName) @@ -972,16 +985,16 @@ public void testRuleWithMinSize2() throws Exception { ports.add(new VirtualMachineLease.Range(1, 10)); attributes.put(hostAttrName, attribute); final List leases = new ArrayList<>(); - for(int l=0; l callback = autoScaleAction -> { if (autoScaleAction instanceof ScaleDownAction) { - scaleDown.addAndGet(((ScaleDownAction)autoScaleAction).getHosts().size()); + scaleDown.addAndGet(((ScaleDownAction) autoScaleAction).getHosts().size()); } }; final TaskScheduler scheduler = getScheduler(noOpLeaseReject, callback, rule); - for (int i=0; i attributes = new HashMap<>(); Protos.Attribute attribute = Protos.Attribute.newBuilder().setName(hostAttrName) @@ -1004,8 +1017,8 @@ public void testRuleWithMaxSize() throws Exception { ports.add(new VirtualMachineLease.Range(1, 10)); attributes.put(hostAttrName, attribute); final List leases = new ArrayList<>(); - for(int l=0; l callback = autoScaleAction -> { if (autoScaleAction instanceof ScaleUpAction) { @@ -1016,23 +1029,23 @@ public void testRuleWithMaxSize() throws Exception { final TaskScheduler scheduler = getScheduler(noOpLeaseReject, callback, rule); // create enough tasks to fill up the Vms List tasks = new ArrayList<>(); - for (int l=0; l0); - for(VMAssignmentResult r: result.getResultMap().values()) { - for (TaskAssignmentResult task: r.getTasksAssigned()) { + int assigned = 0; + Assert.assertTrue(result.getResultMap().size() > 0); + for (VMAssignmentResult r : result.getResultMap().values()) { + for (TaskAssignmentResult task : r.getTasksAssigned()) { assigned++; scheduler.getTaskAssigner().call(task.getRequest(), r.getHostname()); } } - Assert.assertEquals(maxSize-leaveIdle, assigned); + Assert.assertEquals(maxSize - leaveIdle, assigned); tasks.clear(); } Thread.sleep(1000); @@ -1043,8 +1056,8 @@ public void testRuleWithMaxSize() throws Exception { // Test that with a rule that has the max size defined, we don't try to scale up at all if total size > maxSize, even if there's no idle left @Test public void testRuleWithMaxSize2() throws Exception { - final int maxSize=10; - final long cooldown=2; + final int maxSize = 10; + final long cooldown = 2; final AutoScaleRule rule = AutoScaleRuleProvider.createWithMaxSize("cluster1", 2, 5, cooldown, 1, 1000, maxSize); Map attributes = new HashMap<>(); Protos.Attribute attribute = Protos.Attribute.newBuilder().setName(hostAttrName) @@ -1054,8 +1067,8 @@ public void testRuleWithMaxSize2() throws Exception { ports.add(new VirtualMachineLease.Range(1, 10)); attributes.put(hostAttrName, attribute); final List leases = new ArrayList<>(); - for(int l=0; l callback = autoScaleAction -> { if (autoScaleAction instanceof ScaleUpAction) { @@ -1066,23 +1079,23 @@ public void testRuleWithMaxSize2() throws Exception { final TaskScheduler scheduler = getScheduler(noOpLeaseReject, callback, rule); // create enough tasks to fill up the Vms List tasks = new ArrayList<>(); - for (int l=0; l0); - for(VMAssignmentResult r: result.getResultMap().values()) { - for (TaskAssignmentResult task: r.getTasksAssigned()) { + int assigned = 0; + Assert.assertTrue(result.getResultMap().size() > 0); + for (VMAssignmentResult r : result.getResultMap().values()) { + for (TaskAssignmentResult task : r.getTasksAssigned()) { assigned++; scheduler.getTaskAssigner().call(task.getRequest(), r.getHostname()); } } - Assert.assertEquals(maxSize+2, assigned); + Assert.assertEquals(maxSize + 2, assigned); tasks.clear(); } Thread.sleep(1000); @@ -1095,9 +1108,9 @@ public void testRuleWithMaxSize2() throws Exception { // aggressive scale up. @Test public void testTask2ClustersGetterAggressiveScaleUp() throws Exception { - final long cooldownMillis=3000; - final AutoScaleRule rule1 = AutoScaleRuleProvider.createRule("cluster1", minIdle, maxIdle, cooldownMillis/1000L, 1, 1000); - final AutoScaleRule rule2 = AutoScaleRuleProvider.createRule("cluster2", minIdle, maxIdle, cooldownMillis/1000L, 1, 1000); + final long cooldownMillis = 3000; + final AutoScaleRule rule1 = AutoScaleRuleProvider.createRule("cluster1", minIdle, maxIdle, cooldownMillis / 1000L, 1, 1000); + final AutoScaleRule rule2 = AutoScaleRuleProvider.createRule("cluster2", minIdle, maxIdle, cooldownMillis / 1000L, 1, 1000); Map attributes1 = new HashMap<>(); Protos.Attribute attribute1 = Protos.Attribute.newBuilder().setName(hostAttrName) .setType(Protos.Value.Type.TEXT) @@ -1111,8 +1124,8 @@ public void testTask2ClustersGetterAggressiveScaleUp() throws Exception { .setText(Protos.Value.Text.newBuilder().setValue("cluster2")).build(); attributes2.put(hostAttrName, attribute1); final List leases = new ArrayList<>(); - for(int l=0; l scaleUpRequests = new HashMap<>(); final CountDownLatch initialScaleUpLatch = new CountDownLatch(2); final AtomicReference scaleUpLatchRef = new AtomicReference<>(); @@ -1135,7 +1148,7 @@ public void testTask2ClustersGetterAggressiveScaleUp() throws Exception { final TaskSchedulingService schedulingService = new TaskSchedulingService.Builder() .withMaxDelayMillis(100) .withLoopIntervalMillis(20) - .withTaskQuue(queue) + .withTaskQueue(queue) .withTaskScheduler(scheduler) .withSchedulingResultCallback(schedulingResult -> { final List elist = schedulingResult.getExceptions(); @@ -1143,7 +1156,7 @@ public void testTask2ClustersGetterAggressiveScaleUp() throws Exception { exceptions.set(elist); final Map resultMap = schedulingResult.getResultMap(); if (resultMap != null && !resultMap.isEmpty()) { - for (VMAssignmentResult vmar: resultMap.values()) { + for (VMAssignmentResult vmar : resultMap.values()) { vmar.getTasksAssigned().forEach(t -> latch.countDown()); } } @@ -1152,7 +1165,7 @@ public void testTask2ClustersGetterAggressiveScaleUp() throws Exception { schedulingService.setTaskToClusterAutoScalerMapGetter(task -> Collections.singletonList("cluster1")); schedulingService.start(); schedulingService.addLeases(leases); - for (int i=0; i attributes1 = new HashMap<>(); + Protos.Attribute attribute1 = Protos.Attribute.newBuilder().setName(hostAttrName) + .setType(Protos.Value.Type.TEXT) + .setText(Protos.Value.Text.newBuilder().setValue(CLUSTER1)).build(); + List ports = new ArrayList<>(); + ports.add(new VirtualMachineLease.Range(1, 10)); + attributes1.put(hostAttrName, attribute1); + + Map attributes2 = new HashMap<>(); + Protos.Attribute attribute2 = Protos.Attribute.newBuilder().setName(hostAttrName) + .setType(Protos.Value.Type.TEXT) + .setText(Protos.Value.Text.newBuilder().setValue(CLUSTER2)).build(); + attributes2.put(hostAttrName, attribute2); + final List leases = new ArrayList<>(); + + for (int l = 0; l < numberOfHostsPerCluster; l++) { + leases.add(LeaseProvider.getLeaseOffer("cluster1Host" + l, cpus1, memory1, ports, attributes1)); + leases.add(LeaseProvider.getLeaseOffer("cluster2Host" + l, cpus1, memory1, ports, attributes2)); + } + + // create task scheduler with a callback action that blocks for 100ms + Set hostsToScaleDown = new HashSet<>(); + Action1 callback = action -> { + try { + if (action instanceof ScaleDownAction) { + ScaleDownAction scaleDownAction = (ScaleDownAction) action; + hostsToScaleDown.addAll(scaleDownAction.getHosts()); + countDownLatch.countDown(); + Thread.sleep(100); + } + + } catch (InterruptedException ignored) { + } + }; + final TaskScheduler scheduler = getScheduler(noOpLeaseReject, callback, cluster1Rule, cluster2Rule); + + // run a scheduling iteration and wait until the callback is called + scheduler.scheduleOnce(Collections.emptyList(), leases); + while (countDownLatch.getCount() != 1) { + Thread.sleep(10); + scheduler.scheduleOnce(Collections.emptyList(), Collections.emptyList()); + } + + // create maxIdle tasks that each fill up a machine and run scheduling iteration + final List requests = new ArrayList<>(); + for (int i = 0; i < (numberOfHostsPerCluster * 1.5); i++) { + requests.add(TaskRequestProvider.getTaskRequest(cpus1, memory1, 0)); + } + + SchedulingResult schedulingResult = scheduler.scheduleOnce(requests, Collections.emptyList()); + + // verify that half of the tasks failed placement as half of the instances should have been disabled by the autoscaler + int expectedFailures = numberOfHostsPerCluster / 2; + Map> failures = schedulingResult.getFailures(); + Assert.assertEquals(expectedFailures + " tasks should have failed placement", expectedFailures, failures.size()); + + // verify that none of the placements happened on a down scaled host + Map resultMap = schedulingResult.getResultMap(); + resultMap.values().forEach(result -> Assert.assertFalse(hostsToScaleDown.contains(result.getHostname()))); + } } diff --git a/fenzo-core/src/test/java/com/netflix/fenzo/ShortfallAutoscalerTest.java b/fenzo-core/src/test/java/com/netflix/fenzo/ShortfallAutoscalerTest.java index 6d9703d..6541407 100644 --- a/fenzo-core/src/test/java/com/netflix/fenzo/ShortfallAutoscalerTest.java +++ b/fenzo-core/src/test/java/com/netflix/fenzo/ShortfallAutoscalerTest.java @@ -97,7 +97,7 @@ private TaskSchedulingService getSchedulingService(TaskQueue queue, Action0 preH .withMaxDelayMillis(500) .withPreSchedulingLoopHook(preHook) .withSchedulingResultCallback(resultCallback) - .withTaskQuue(queue) + .withTaskQueue(queue) .withTaskScheduler(scheduler) .withOptimizingShortfallEvaluator() .build(); diff --git a/fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java b/fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java index 0c9a1e3..03bb909 100644 --- a/fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java +++ b/fenzo-core/src/test/java/com/netflix/fenzo/TaskSchedulingServiceTest.java @@ -48,7 +48,7 @@ private TaskSchedulingService getSchedulingService(TaskQueue queue, TaskSchedule private TaskSchedulingService getSchedulingService(TaskQueue queue, TaskScheduler scheduler, long loopMillis, long maxDelayMillis, Action1 resultCallback) { return new TaskSchedulingService.Builder() - .withTaskQuue(queue) + .withTaskQueue(queue) .withLoopIntervalMillis(loopMillis) .withMaxDelayMillis(maxDelayMillis) .withPreSchedulingLoopHook(new Action0() { diff --git a/fenzo-core/src/test/java/com/netflix/fenzo/queues/tiered/TieredQueueTest.java b/fenzo-core/src/test/java/com/netflix/fenzo/queues/tiered/TieredQueueTest.java index 4225e68..3532d12 100644 --- a/fenzo-core/src/test/java/com/netflix/fenzo/queues/tiered/TieredQueueTest.java +++ b/fenzo-core/src/test/java/com/netflix/fenzo/queues/tiered/TieredQueueTest.java @@ -362,7 +362,7 @@ private Map> getTierAllocsForBuckets(Map resultCallback) { return new TaskSchedulingService.Builder() - .withTaskQuue(queue) + .withTaskQueue(queue) .withLoopIntervalMillis(100L) .withPreSchedulingLoopHook(new Action0() { @Override From 7a0687c6f5738d82109f80b50ace4af002eec8de Mon Sep 17 00:00:00 2001 From: Corin Dwyer Date: Mon, 14 Aug 2017 13:20:05 -0700 Subject: [PATCH 2/4] fix review comments revert StateMonitor to the previous implementation create a new pseudo scheduling method that ignores the lock --- .../java/com/netflix/fenzo/AssignableVMs.java | 2 +- .../java/com/netflix/fenzo/AutoScaler.java | 4 +- .../fenzo/OptimizingShortfallEvaluator.java | 4 +- .../java/com/netflix/fenzo/StateMonitor.java | 20 ++++++--- .../java/com/netflix/fenzo/TaskScheduler.java | 44 +++++++++++++++---- .../netflix/fenzo/TaskSchedulingService.java | 15 +++---- .../java/com/netflix/fenzo/VMCollection.java | 4 +- 7 files changed, 62 insertions(+), 31 deletions(-) diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java index eb26d2b..ccaa8aa 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVMs.java @@ -117,7 +117,7 @@ Map> createPseudoHosts(Map groupCounts, Fu ); } - void removePsuedoHosts(Map> hostsMap) { + void removePseudoHosts(Map> hostsMap) { if (hostsMap != null && !hostsMap.isEmpty()) { for (Map.Entry> entry: hostsMap.entrySet()) { for (String h: entry.getValue()) { diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java b/fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java index 67ef52c..d6daea7 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/AutoScaler.java @@ -124,7 +124,7 @@ void setTaskToClustersGetter(Func1> getter) { this.taskToClustersGetter = getter; } - void scheduleAutoscale(final AutoScalerInput autoScalerInput) { + void doAutoscale(final AutoScalerInput autoScalerInput) { if (isShutdown.get()) { return; } @@ -227,7 +227,7 @@ private List processScalingNeeds(HostAttributeGroup hostAttributeGroup StringBuilder sBuilder = new StringBuilder(); for (String host : hostsToTerminate.keySet()) { sBuilder.append(host).append(", "); - long disabledDurationInSecs = (disabledVmDurationInSecs > 0L) ? disabledVmDurationInSecs : rule.getCoolDownSecs(); + long disabledDurationInSecs = Math.max(disabledVmDurationInSecs, rule.getCoolDownSecs()); assignableVMs.disableUntil(host, now + disabledDurationInSecs * 1000); } logger.info("Scaling down " + rule.getRuleName() + " by " diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/OptimizingShortfallEvaluator.java b/fenzo-core/src/main/java/com/netflix/fenzo/OptimizingShortfallEvaluator.java index cdb58f4..123fac6 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/OptimizingShortfallEvaluator.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/OptimizingShortfallEvaluator.java @@ -42,7 +42,7 @@ * The pseudo scheduling run performs an entire scheduling iteration using the cloned queue and pseudo VMs in addition * to any new VM leases that have been added since previous scheduling iteration. This will invoke any and all task * constraints as well as fitness function setup in the scheduler. The scheduling result is used to determine the - * number of VMs in each group and then the results are discarded. As expected, the psuedo scheduling run has no impact + * number of VMs in each group and then the results are discarded. As expected, the pseudo scheduling run has no impact * on the real scheduling assignments made. *

* Tasks for which scale up is requested by this evaluator are remembered and not requested again until certain delay. @@ -63,7 +63,7 @@ public Map getShortfall(Set vmGroupNames, Set shortfallTasks) { diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/StateMonitor.java b/fenzo-core/src/main/java/com/netflix/fenzo/StateMonitor.java index 43082b1..bcc0748 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/StateMonitor.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/StateMonitor.java @@ -16,21 +16,27 @@ package com.netflix.fenzo; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; /** * A monitor to ensure scheduler state is not compromised by concurrent calls that are disallowed. */ class StateMonitor { - private final ReentrantLock lock = new ReentrantLock(); + private final AtomicBoolean lock; + + StateMonitor() { + lock = new AtomicBoolean(false); + } AutoCloseable enter() { - if (!lock.tryLock()) { + if(!lock.compareAndSet(false, true)) throw new IllegalStateException(); - } - return () -> { - if (!lock.tryLock()) - throw new IllegalStateException(); + return new AutoCloseable() { + @Override + public void close() throws Exception { + if(!lock.compareAndSet(true, false)) + throw new IllegalStateException(); + } }; } diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java index 5a97d12..e7053e7 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java @@ -378,18 +378,22 @@ public Builder withDelayAutoscaleDownBySecs(long delayAutoscaleDownBySecs) { } /** - * How long to disable a VM when going through a scale down action. If this value is not set then the - * value used will be the {@link AutoScaleRule#getCoolDownSecs()} value. If the supplied {@link AutoScaleAction} - * does not actually terminate the instance in this time frame then VM will become enabled. - + * How long to disable a VM when going through a scale down action. Note that the value used will be the max + * between this value and the {@link AutoScaleRule#getCoolDownSecs()} value and that this value should be + * greater than the {@link AutoScaleRule#getCoolDownSecs()} value. If the supplied {@link AutoScaleAction} + * does not actually terminate the instance in this time frame then the VM will become enabled. This option is useful + * when you want to increase the disabled time of a VM because the implementation of the {@link AutoScaleAction} may + * take longer than the cooldown period. + * * @param disabledVmDurationInSecs Disable VMs about to be terminated by this many seconds. * @return this same {@code Builder}, suitable for further chaining or to build the {@link TaskScheduler} * @throws IllegalArgumentException if {@code disabledVmDurationInSecs} is not greater than 0. * @see Autoscaling */ public Builder withAutoscaleDisabledVmDurationInSecs(long disabledVmDurationInSecs) { - if(disabledVmDurationInSecs > 0L) + if(disabledVmDurationInSecs <= 0L) { throw new IllegalArgumentException("disabledVmDurationInSecs must be greater than 0: " + disabledVmDurationInSecs); + } this.disabledVmDurationInSecs = disabledVmDurationInSecs; return this; } @@ -728,6 +732,28 @@ public Assignable next() { } } + /** + * Variant of {@link #scheduleOnce(List, List)} that should be only used to schedule a pseudo iteration as it + * ignores the StateMonitor lock. + * @param taskIterator Iterator for tasks to assign resources to. + * @return a {@link SchedulingResult} object that contains a task assignment results map and other summaries + */ + /* package */ SchedulingResult pseudoScheduleOnce(TaskIterator taskIterator) throws Exception { + long start = System.currentTimeMillis(); + final SchedulingResult schedulingResult = doSchedule(taskIterator, Collections.emptyList()); + if((lastVMPurgeAt + purgeVMsIntervalSecs*1000) < System.currentTimeMillis()) { + lastVMPurgeAt = System.currentTimeMillis(); + logger.info("Purging inactive VMs"); + assignableVMs.purgeInactiveVMs( // explicitly exclude VMs that have assignments + schedulingResult.getResultMap() == null? + Collections.emptySet() : + new HashSet<>(schedulingResult.getResultMap().keySet()) + ); + } + schedulingResult.setRuntime(System.currentTimeMillis() - start); + return schedulingResult; + } + private SchedulingResult doSchedule( TaskIterator taskIterator, List newLeases) throws Exception { @@ -878,7 +904,7 @@ public EvalResult call() throws Exception { rejectedCount.addAndGet(assignableVMs.removeLimitedLeases(expirableLeases)); final AutoScalerInput autoScalerInput = new AutoScalerInput(idleResourcesList, idleInactiveAVMs, failedTasksForAutoScaler); if (autoScaler != null) - autoScaler.scheduleAutoscale(autoScalerInput); + autoScaler.doAutoscale(autoScalerInput); } schedulingResult.setLeasesAdded(newLeases.size()); schedulingResult.setLeasesRejected(rejectedCount.get()); @@ -892,11 +918,11 @@ public EvalResult call() throws Exception { return assignableVMs.createPseudoHosts(groupCounts, autoScaler == null? name -> null : autoScaler::getRule); } - /* package */ void removePsuedoHosts(Map> hostsMap) { - assignableVMs.removePsuedoHosts(hostsMap); + /* package */ void removePseudoHosts(Map> hostsMap) { + assignableVMs.removePseudoHosts(hostsMap); } - /* package */ void removePsuedoAssignments() { + /* package */ void removePseudoAssignments() { taskTracker.clearAssignedTasks(); // this should suffice for pseudo assignments } diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java b/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java index b3676cb..29ccd3c 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/TaskSchedulingService.java @@ -150,8 +150,8 @@ public boolean isShutdown() { return taskQueue; } - /* package */ Map requestPsuedoScheduling(final InternalTaskQueue pTaskQueue, Map groupCounts) { - Map psuedoSchedulingResult; + /* package */ Map requestPseudoScheduling(final InternalTaskQueue pTaskQueue, Map groupCounts) { + Map pseudoSchedulingResult = new HashMap<>(); try { logger.debug("Creating pseudo hosts"); final Map> pseudoHosts = taskScheduler.createPseudoHosts(groupCounts); @@ -183,7 +183,7 @@ public boolean isShutdown() { // temporarily replace usage tracker in taskTracker to the pseudoQ and then put back the original one taskScheduler.getTaskTracker().setUsageTrackedQueue(pTaskQueue.getUsageTracker()); logger.debug("Scheduling with pseudoQ"); - final SchedulingResult schedulingResult = taskScheduler.scheduleOnce(pTaskQueue, Collections.emptyList()); + final SchedulingResult schedulingResult = taskScheduler.pseudoScheduleOnce(pTaskQueue); final Map resultMap = schedulingResult.getResultMap(); Map result = new HashMap<>(); if (!resultMap.isEmpty()) { @@ -223,23 +223,22 @@ else if(pHostsAdded > 0) { } } } - psuedoSchedulingResult = result; + pseudoSchedulingResult = result; } catch (Exception e) { logger.error("Error in pseudo scheduling", e); throw e; } finally { - taskScheduler.removePsuedoHosts(pseudoHosts); - taskScheduler.removePsuedoAssignments(); + taskScheduler.removePseudoHosts(pseudoHosts); + taskScheduler.removePseudoAssignments(); taskScheduler.getTaskTracker().setUsageTrackedQueue(taskQueue.getUsageTracker()); } } catch (Exception e) { logger.error("Error in pseudo scheduling", e); - throw e; } - return psuedoSchedulingResult; + return pseudoSchedulingResult; } private void scheduleOnce() { diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/VMCollection.java b/fenzo-core/src/main/java/com/netflix/fenzo/VMCollection.java index 2b80104..8ab52f5 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/VMCollection.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/VMCollection.java @@ -59,10 +59,10 @@ Collection getGroups() { } /** - * Create n psuedo VMs for each group by cloning a VM in each group. + * Create n pseudo VMs for each group by cloning a VM in each group. * @param groupCounts Map with keys contain group names and values containing number of agents to clone * @param ruleGetter Getter function for autoscale rules - * @return Collection of psuedo host names added. + * @return Collection of pseudo host names added. */ Map> clonePseudoVMsForGroups(Map groupCounts, Func1 ruleGetter, From 780e9fd9c4278a12c8584fef5657a4432e3651bc Mon Sep 17 00:00:00 2001 From: Corin Dwyer Date: Mon, 14 Aug 2017 13:33:29 -0700 Subject: [PATCH 3/4] change getMaxResources to package scope --- .../main/java/com/netflix/fenzo/AssignableVirtualMachine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java index 0fd37a3..d39fb8d 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/AssignableVirtualMachine.java @@ -617,7 +617,7 @@ Map getMaxScalars() { return result; } - public Map getMaxResources() { + Map getMaxResources() { double cpus=0.0; double memory=0.0; double network=0.0; From 65a60d2b7d08fae0d4034d1278e43fa645cb7aa2 Mon Sep 17 00:00:00 2001 From: Corin Dwyer Date: Mon, 14 Aug 2017 13:45:04 -0700 Subject: [PATCH 4/4] extract method to get rid of duplicate code --- .../java/com/netflix/fenzo/TaskScheduler.java | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java index e7053e7..4b75712 100644 --- a/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java +++ b/fenzo-core/src/main/java/com/netflix/fenzo/TaskScheduler.java @@ -708,19 +708,7 @@ public Assignable next() { List newLeases) throws IllegalStateException { checkIfShutdown(); try (AutoCloseable ac = stateMonitor.enter()) { - long start = System.currentTimeMillis(); - final SchedulingResult schedulingResult = doSchedule(taskIterator, newLeases); - if((lastVMPurgeAt + purgeVMsIntervalSecs*1000) < System.currentTimeMillis()) { - lastVMPurgeAt = System.currentTimeMillis(); - logger.info("Purging inactive VMs"); - assignableVMs.purgeInactiveVMs( // explicitly exclude VMs that have assignments - schedulingResult.getResultMap() == null? - Collections.emptySet() : - new HashSet<>(schedulingResult.getResultMap().keySet()) - ); - } - schedulingResult.setRuntime(System.currentTimeMillis() - start); - return schedulingResult; + return doScheduling(taskIterator, newLeases); } catch (Exception e) { logger.error("Error with scheduling run: " + e.getMessage(), e); if(e instanceof IllegalStateException) @@ -739,8 +727,13 @@ public Assignable next() { * @return a {@link SchedulingResult} object that contains a task assignment results map and other summaries */ /* package */ SchedulingResult pseudoScheduleOnce(TaskIterator taskIterator) throws Exception { + return doScheduling(taskIterator, Collections.emptyList()); + } + + private SchedulingResult doScheduling(TaskIterator taskIterator, + List newLeases) throws Exception { long start = System.currentTimeMillis(); - final SchedulingResult schedulingResult = doSchedule(taskIterator, Collections.emptyList()); + final SchedulingResult schedulingResult = doSchedule(taskIterator, newLeases); if((lastVMPurgeAt + purgeVMsIntervalSecs*1000) < System.currentTimeMillis()) { lastVMPurgeAt = System.currentTimeMillis(); logger.info("Purging inactive VMs");