Skip to content

Commit

Permalink
[autoscaler] Make placement groups bypass max launch limit (ray-proje…
Browse files Browse the repository at this point in the history
  • Loading branch information
Ameer Haj Ali authored Dec 29, 2020
1 parent 5a4e50c commit 44483f4
Show file tree
Hide file tree
Showing 2 changed files with 139 additions and 22 deletions.
45 changes: 34 additions & 11 deletions python/ray/autoscaler/_private/resource_demand_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,15 @@ def get_nodes_to_launch(
node_resources, node_type_counts, self.node_types,
self.max_workers, self.head_node_type, ensure_min_cluster_size)

# Step 3: add nodes for strict spread groups
logger.debug(f"Placement group demands: {pending_placement_groups}")
# Step 3: get resource demands of placement groups and return the
# groups that should be strictly spread.
logger.info(f"Placement group demands: {pending_placement_groups}")
placement_group_demand_vector, strict_spreads = \
placement_groups_to_resource_demands(pending_placement_groups)
resource_demands.extend(placement_group_demand_vector)
# Place placement groups demand vector at the beginning of the resource
# demands vector to make it consistent (results in the same types of
# nodes to add) with pg_demands_nodes_max_launch_limit calculated later
resource_demands = placement_group_demand_vector + resource_demands

if self.is_legacy_yaml() and \
not self.node_types[NODE_TYPE_LEGACY_WORKER]["resources"]:
Expand All @@ -179,18 +183,32 @@ def get_nodes_to_launch(
return self._legacy_worker_node_to_launch(
nodes, launching_nodes, node_resources,
resource_demands + request_resources_demands)
placement_group_nodes_to_add, node_resources, node_type_counts = \

spread_pg_nodes_to_add, node_resources, node_type_counts = \
self.reserve_and_allocate_spread(
strict_spreads, node_resources, node_type_counts)

# Calculate the nodes to add for bypassing max launch limit for
# placement groups and spreads.
unfulfilled_placement_groups_demands, _ = get_bin_pack_residual(
node_resources, placement_group_demand_vector)
# Add 1 to account for the head node.
max_to_add = self.max_workers + 1 - sum(node_type_counts.values())
pg_demands_nodes_max_launch_limit = get_nodes_for(
self.node_types, node_type_counts, self.head_node_type, max_to_add,
unfulfilled_placement_groups_demands)
placement_groups_nodes_max_limit = {
node_type: spread_pg_nodes_to_add.get(node_type, 0) +
pg_demands_nodes_max_launch_limit.get(node_type, 0)
for node_type in self.node_types
}

# Step 4/5: add nodes for pending tasks, actors, and non-strict spread
# groups
unfulfilled, _ = get_bin_pack_residual(node_resources,
resource_demands)
logger.debug("Resource demands: {}".format(resource_demands))
logger.debug("Unfulfilled demands: {}".format(unfulfilled))
# Add 1 to account for the head node.
max_to_add = self.max_workers + 1 - sum(node_type_counts.values())
nodes_to_add_based_on_demand = get_nodes_for(
self.node_types, node_type_counts, self.head_node_type, max_to_add,
unfulfilled)
Expand All @@ -201,15 +219,16 @@ def get_nodes_to_launch(

for node_type in self.node_types:
nodes_to_add = (adjusted_min_workers.get(
node_type, 0) + placement_group_nodes_to_add.get(node_type, 0)
+ nodes_to_add_based_on_demand.get(node_type, 0))
node_type, 0) + spread_pg_nodes_to_add.get(node_type, 0) +
nodes_to_add_based_on_demand.get(node_type, 0))
if nodes_to_add > 0:
total_nodes_to_add[node_type] = nodes_to_add

# Limit the number of concurrent launches
total_nodes_to_add = self._get_concurrent_resource_demand_to_launch(
total_nodes_to_add, unused_resources_by_ip.keys(), nodes,
launching_nodes, adjusted_min_workers)
launching_nodes, adjusted_min_workers,
placement_groups_nodes_max_limit)

logger.debug("Node requests: {}".format(total_nodes_to_add))
return total_nodes_to_add
Expand Down Expand Up @@ -288,6 +307,7 @@ def _get_concurrent_resource_demand_to_launch(
non_terminated_nodes: List[NodeID],
pending_launches_nodes: Dict[NodeType, int],
adjusted_min_workers: Dict[NodeType, int],
placement_group_nodes: Dict[NodeType, int],
) -> Dict[NodeType, int]:
"""Updates the max concurrent resources to launch for each node type.
Expand All @@ -311,6 +331,8 @@ def _get_concurrent_resource_demand_to_launch(
min_workers and request_resources(). This overrides the launch
limits since the user is hinting to immediately scale up to
this size.
placement_group_nodes: Nodes to launch for placement groups.
This overrides the launch concurrency limits.
Returns:
Dict[NodeType, int]: Maximum number of nodes to launch for each
node type.
Expand All @@ -333,8 +355,9 @@ def _get_concurrent_resource_demand_to_launch(
max_allowed_pending_nodes - total_pending_nodes,

# Allow more nodes if this is to respect min_workers or
# request_resources().
adjusted_min_workers.get(node_type, 0))
# request_resources() or placement groups.
adjusted_min_workers.get(node_type, 0) +
placement_group_nodes.get(node_type, 0))

if upper_bound > 0:
updated_nodes_to_launch[node_type] = min(
Expand Down
116 changes: 105 additions & 11 deletions python/ray/tests/test_resource_demand_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,8 @@ def test_get_concurrent_resource_demand_to_launch():

# Sanity check.
updated_to_launch = \
scheduler._get_concurrent_resource_demand_to_launch({}, [], [], {}, {})
scheduler._get_concurrent_resource_demand_to_launch(
{}, [], [], {}, {}, {})
assert updated_to_launch == {}

provider.create_node({}, {
Expand All @@ -785,11 +786,38 @@ def test_get_concurrent_resource_demand_to_launch():
connected_nodes = [] # All the non_terminated_nodes are not connected yet.
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes, {})
pending_launches_nodes, {}, {})
# Note: we have 2 pending/launching gpus, 3 pending/launching cpus,
# 0 running gpu, and 0 running cpus.
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 2}

# Test min_workers bypass max launch limit.
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch,
connected_nodes,
non_terminated_nodes,
pending_launches_nodes,
adjusted_min_workers={"m4.large": 40},
placement_group_nodes={})
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 40}
# Test placement groups bypass max launch limit.
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch,
connected_nodes,
non_terminated_nodes,
pending_launches_nodes, {},
placement_group_nodes={"m4.large": 40})
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 40}
# Test combining min_workers and placement groups bypass max launch limit.
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch,
connected_nodes,
non_terminated_nodes,
pending_launches_nodes,
adjusted_min_workers={"m4.large": 25},
placement_group_nodes={"m4.large": 15})
assert updated_to_launch == {"p2.8xlarge": 3, "m4.large": 40}

# This starts the min workers only, so we have no more pending workers.
# The workers here are either running (connected) or in
# pending_launches_nodes (i.e., launching).
Expand All @@ -798,7 +826,7 @@ def test_get_concurrent_resource_demand_to_launch():
]
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes, {})
pending_launches_nodes, {}, {})
# Note that here we have 1 launching gpu, 1 launching cpu,
# 1 running gpu, and 2 running cpus.
assert updated_to_launch == {"p2.8xlarge": 4, "m4.large": 4}
Expand All @@ -819,7 +847,7 @@ def test_get_concurrent_resource_demand_to_launch():
pending_launches_nodes = {} # No pending launches
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes, {})
pending_launches_nodes, {}, {})
# Note: we have 5 pending cpus. So we are not allowed to start any.
# Still only 2 running cpus.
assert updated_to_launch == {}
Expand All @@ -830,7 +858,7 @@ def test_get_concurrent_resource_demand_to_launch():
]
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes, {})
pending_launches_nodes, {}, {})
# Note: that here we have 7 running cpus and nothing pending/launching.
assert updated_to_launch == {"m4.large": 7}

Expand All @@ -846,7 +874,7 @@ def test_get_concurrent_resource_demand_to_launch():
pending_launches_nodes = {"m4.large": 1}
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes, {})
pending_launches_nodes, {}, {})
# Note: we have 8 pending/launching cpus and only 7 running.
# So we should not launch anything (8 < 7).
assert updated_to_launch == {}
Expand All @@ -857,24 +885,90 @@ def test_get_concurrent_resource_demand_to_launch():
]
updated_to_launch = scheduler._get_concurrent_resource_demand_to_launch(
to_launch, connected_nodes, non_terminated_nodes,
pending_launches_nodes, {})
pending_launches_nodes, {}, {})
# Note: that here we have 14 running cpus and 1 launching.
assert updated_to_launch == {"m4.large": 13}


def test_get_nodes_to_launch_max_launch_concurrency_placement_groups():
provider = MockProvider()
new_types = copy.deepcopy(TYPES_A)
new_types["p2.8xlarge"]["min_workers"] = 10
new_types["p2.8xlarge"]["max_workers"] = 40

scheduler = ResourceDemandScheduler(
provider, new_types, 50, head_node_type=None)

pending_placement_groups = [
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.PACK,
bundles=([Bundle(unit_resources={"GPU": 8})] * 25))
]
# placement groups should bypass max launch limit.
# Note that 25 = max(placement group resources=25, min_workers=10).
to_launch = scheduler.get_nodes_to_launch([], {}, [], {},
pending_placement_groups, {})
assert to_launch == {"p2.8xlarge": 25}

pending_placement_groups = [
# Requires 25 p2.8xlarge nodes.
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.STRICT_SPREAD,
bundles=([Bundle(unit_resources={"GPU": 2})] * 25)),
# Requires 5 additional nodes (total 30).
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.PACK,
bundles=([Bundle(unit_resources={"GPU": 6})] * 30))
]

to_launch = scheduler.get_nodes_to_launch([], {}, [], {},
pending_placement_groups, {})
# Test that combining spreads and normal placement group demands bypasses
# launch limit.
assert to_launch == {"p2.8xlarge": 30}

pending_placement_groups = [
# Requires 25 p2.8xlarge nodes.
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.STRICT_SPREAD,
bundles=([Bundle(unit_resources={"GPU": 2})] * 25)),
# Requires 35 additional nodes (total 60).
PlacementGroupTableData(
state=PlacementGroupTableData.RESCHEDULING,
strategy=PlacementStrategy.PACK,
bundles=([Bundle(unit_resources={"GPU": 6})] * 60))
]

to_launch = scheduler.get_nodes_to_launch([], {}, [], {},
pending_placement_groups, {})
# make sure it still respects max_workers of p2.8xlarge.
assert to_launch == {"p2.8xlarge": 40}

scheduler.node_types["p2.8xlarge"]["max_workers"] = 60
to_launch = scheduler.get_nodes_to_launch([], {}, [], {},
pending_placement_groups, {})
# make sure it still respects global max_workers constraint.
# 50 + 1 is global max_workers + head node.ß
assert to_launch == {"p2.8xlarge": 51}


def test_get_nodes_to_launch_max_launch_concurrency():
provider = MockProvider()
new_types = copy.deepcopy(TYPES_A)
new_types["p2.8xlarge"]["min_workers"] = 4
new_types["p2.8xlarge"]["min_workers"] = 10
new_types["p2.8xlarge"]["max_workers"] = 40

scheduler = ResourceDemandScheduler(
provider, new_types, 30, head_node_type=None)

to_launch = scheduler.get_nodes_to_launch([], {}, [], {}, [], {})
# Respects min_workers despite concurrency limitation.
assert to_launch == {"p2.8xlarge": 4}

# Respects min_workers despite max launch limit.
assert to_launch == {"p2.8xlarge": 10}
scheduler.node_types["p2.8xlarge"]["min_workers"] = 4
provider.create_node({}, {
TAG_RAY_USER_NODE_TYPE: "p2.8xlarge",
TAG_RAY_NODE_STATUS: STATUS_UNINITIALIZED
Expand Down

0 comments on commit 44483f4

Please sign in to comment.