Skip to content

Commit

Permalink
autoscaler: count head node, don't kill below target (fixes ray-proje…
Browse files Browse the repository at this point in the history
…ct#2317) (ray-project#2320)

Specifically, subtracts 1 from the target number of workers, taking into
account that the head node has some computational resources.

Do not kill an idle node if it would drop us below the target number of
nodes (in which case we just immediately relaunch).
  • Loading branch information
AdamGleave authored and ericl committed Jun 28, 2018
1 parent b4dff9f commit 89460b8
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 17 deletions.
14 changes: 8 additions & 6 deletions python/ray/autoscaler/autoscaler.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ def _update(self):
print(self.debug_string(nodes))
self.load_metrics.prune_active_ips(
[self.provider.internal_ip(node_id) for node_id in nodes])
target_workers = self.target_num_workers()

# Terminate any idle or out of date nodes
last_used = self.load_metrics.last_used_time_by_ip
Expand All @@ -367,7 +368,7 @@ def _update(self):
for node_id in nodes:
node_ip = self.provider.internal_ip(node_id)
if node_ip in last_used and last_used[node_ip] < horizon and \
len(nodes) - num_terminated > self.config["min_workers"]:
len(nodes) - num_terminated > target_workers:
num_terminated += 1
print("StandardAutoscaler: Terminating idle node: "
"{}".format(node_id))
Expand All @@ -394,12 +395,12 @@ def _update(self):
print(self.debug_string(nodes))

# Launch new nodes if needed
target_num = self.target_num_workers()
num_nodes = len(nodes) + num_pending
if num_nodes < target_num:
num_workers = len(nodes) + num_pending
if num_workers < target_workers:
max_allowed = min(self.max_launch_batch,
self.max_concurrent_launches - num_pending)
self.launch_new_node(min(max_allowed, target_num - num_nodes))
num_launches = min(max_allowed, target_workers - num_workers)
self.launch_new_node(num_launches)
print(self.debug_string())

# Process any completed updates
Expand Down Expand Up @@ -453,7 +454,8 @@ def reload_config(self, errors_fatal=False):
def target_num_workers(self):
target_frac = self.config["target_utilization_fraction"]
cur_used = self.load_metrics.approx_workers_used()
ideal_num_workers = int(np.ceil(cur_used / float(target_frac)))
ideal_num_nodes = int(np.ceil(cur_used / float(target_frac)))
ideal_num_workers = ideal_num_nodes - 1 # subtract 1 for head node
return min(self.config["max_workers"],
max(self.config["min_workers"], ideal_num_workers))

Expand Down
58 changes: 47 additions & 11 deletions test/autoscaler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import copy

import ray
import ray.services as services
from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics, \
fillout_defaults, validate_config
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS
Expand Down Expand Up @@ -572,7 +573,7 @@ def testConfiguresOutdatedNodes(self):

def testScaleUpBasedOnLoad(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 2
config["min_workers"] = 1
config["max_workers"] = 10
config["target_utilization_fraction"] = 0.5
config_path = self.write_config(config)
Expand All @@ -582,38 +583,73 @@ def testScaleUpBasedOnLoad(self):
config_path, lm, max_failures=0, update_interval_s=0)
self.assertEqual(len(self.provider.nodes({})), 0)
autoscaler.update()
self.waitForNodes(2)
self.waitForNodes(1)
autoscaler.update()
self.assertEqual(autoscaler.num_launches_pending.value, 0)
self.assertEqual(len(self.provider.nodes({})), 2)
self.assertEqual(len(self.provider.nodes({})), 1)

# Scales up as nodes are reported as used
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0})
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) # worker 1
autoscaler.update()
self.waitForNodes(4)
lm.update("172.0.0.2", {"CPU": 2}, {"CPU": 0})
self.waitForNodes(3)
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0})
autoscaler.update()
self.waitForNodes(6)
self.waitForNodes(5)

# Holds steady when load is removed
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2})
autoscaler.update()
self.assertEqual(autoscaler.num_launches_pending.value, 0)
self.assertEqual(len(self.provider.nodes({})), 6)
self.assertEqual(len(self.provider.nodes({})), 5)

# Scales down as nodes become unused
lm.last_used_time_by_ip["172.0.0.0"] = 0
lm.last_used_time_by_ip["172.0.0.1"] = 0
autoscaler.update()
self.assertEqual(autoscaler.num_launches_pending.value, 0)
self.assertEqual(len(self.provider.nodes({})), 4)
self.assertEqual(len(self.provider.nodes({})), 3)
lm.last_used_time_by_ip["172.0.0.2"] = 0
lm.last_used_time_by_ip["172.0.0.3"] = 0
autoscaler.update()
self.assertEqual(autoscaler.num_launches_pending.value, 0)
self.assertEqual(len(self.provider.nodes({})), 2)
self.assertEqual(len(self.provider.nodes({})), 1)

def testDontScaleBelowTarget(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 0
config["max_workers"] = 2
config["target_utilization_fraction"] = 0.5
config_path = self.write_config(config)
self.provider = MockProvider()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path, lm, max_failures=0, update_interval_s=0)
self.assertEqual(len(self.provider.nodes({})), 0)
autoscaler.update()
self.assertEqual(autoscaler.num_launches_pending.value, 0)
self.assertEqual(len(self.provider.nodes({})), 0)

# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
# 1.0 nodes used => target nodes = 2 => target workers = 1
autoscaler.update()
self.waitForNodes(1)

# Make new node idle, and never used.
# Should hold steady as target is still 2.
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0})
lm.last_used_time_by_ip["172.0.0.0"] = 0
autoscaler.update()
self.assertEqual(len(self.provider.nodes({})), 1)

# Reduce load on head => target nodes = 1 => target workers = 0
lm.update(local_ip, {"CPU": 2}, {"CPU": 1})
autoscaler.update()
self.assertEqual(len(self.provider.nodes({})), 0)

def testRecoverUnhealthyWorkers(self):
config_path = self.write_config(SMALL_CLUSTER)
Expand Down

0 comments on commit 89460b8

Please sign in to comment.