Skip to content

Commit

Permalink
[Train] Sort Local Train Workers by GPU id (ray-project#40953)
Browse files Browse the repository at this point in the history
Signed-off-by: woshiyyya <[email protected]>
  • Loading branch information
woshiyyya authored Nov 17, 2023
1 parent d3e99ee commit 0e7a481
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 6 deletions.
2 changes: 1 addition & 1 deletion python/ray/train/_internal/backend_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def start(
# TODO remove passing in trial_driver_ip.

trial_driver_ip = self._trial_info.driver_ip if self._trial_info else None
self.worker_group.group_workers_by_ip(trial_driver_ip)
self.worker_group.sort_workers_by_ip_and_gpu_id(trial_driver_ip)

try:
if initialization_hook:
Expand Down
40 changes: 38 additions & 2 deletions python/ray/train/_internal/worker_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,29 @@ def add_workers(self, num_workers: int):
for i in range(len(new_actors)):
self.workers.append(Worker(actor=new_actors[i], metadata=metadata[i]))

def group_workers_by_ip(self, _first_ip: Optional[str] = None):
"""Groups workers by IP.
def sort_workers_by_ip_and_gpu_id(self, _first_ip: Optional[str] = None):
"""Reorder the workers by their node ip and the lowest GPU id.
This is useful for collocating workers on the same node.
Example:
Given workers with the following attributes:
worker_0: ip=1, gpu_ids=[1]
worker_1: ip=0, gpu_ids=[0]
worker_2: ip=1, gpu_ids=[0]
worker_3: ip=0, gpu_ids=[1]
The function will perform the following steps:
1. Group by node IP:
ip=0: worker_1, worker_3
ip=1: worker_0, worker_2
2. Sort each group by GPU ID:
ip=0: worker_1 (gpu_id=0), worker_3 (gpu_id=1)
ip=1: worker_2 (gpu_id=0), worker_0 (gpu_id=1)
Resulting in the order: [worker_1, worker_3, worker_2, worker_0]
Args:
_first_ip: The first IP to group by.
Hack to avoid OOMs.
Expand All @@ -385,6 +403,24 @@ def group_workers_by_ip(self, _first_ip: Optional[str] = None):
for worker in self.workers:
ip_to_workers[worker.metadata.node_ip].append(worker)

# Sort workers on the same node by the lowest GPU id
# More details: https://github.com/ray-project/ray/issues/40803
def get_lowest_gpu_id(worker) -> int:
gpu_ids = worker.metadata.resource_ids.get("GPU", [])
# If there are no GPU IDs, return 0 as a default
if not gpu_ids:
return 0

# Attempt to convert GPU IDs to integers and find the minimum ID.
# Fallback to return the minimum string-based ID
try:
return min(int(gpu_id) for gpu_id in gpu_ids)
except ValueError:
return min(gpu_ids)

for node_ip in ip_to_workers:
ip_to_workers[node_ip].sort(key=get_lowest_gpu_id)

sorted_workers = []
for workers in ip_to_workers.values():
sorted_workers.extend(workers)
Expand Down
84 changes: 81 additions & 3 deletions python/ray/train/tests/test_worker_group.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from collections import defaultdict

import pytest

Expand Down Expand Up @@ -144,7 +145,7 @@ def create_worker_group(ips):
node_id="dummy",
node_ip=ip,
hostname="dummy",
resource_ids=None,
resource_ids={},
pid=0,
),
)
Expand All @@ -153,7 +154,7 @@ def create_worker_group(ips):
return wg

wg = create_worker_group(["2", "3", "1", "4", "2", "1", "3", "3", "4", "2"])
wg.group_workers_by_ip()
wg.sort_workers_by_ip_and_gpu_id()
expected = ["2", "2", "2", "3", "3", "3", "1", "1", "4", "4"]
ips = [w.metadata.node_ip for w in wg.workers]
assert ips == expected, (
Expand All @@ -162,14 +163,91 @@ def create_worker_group(ips):
)

wg = create_worker_group(["2", "3", "1", "4", "2", "1", "3", "3", "4", "2"])
wg.group_workers_by_ip(_first_ip="1")
wg.sort_workers_by_ip_and_gpu_id(_first_ip="1")
expected = ["1", "1", "2", "2", "2", "3", "3", "3", "4", "4"]
ips = [w.metadata.node_ip for w in wg.workers]
assert (
ips == expected
), "Workers should be grouped by IP, with the first IP being 1."


def test_sort_local_workers_by_gpu_id(ray_start_2_cpus):
def create_worker_group(pids, ips, gpu_ids):
wg = WorkerGroup(num_workers=2)
wg.workers = [
Worker(
actor=None,
metadata=WorkerMetadata(
node_id="dummy",
node_ip=ip,
hostname="dummy",
resource_ids={"GPU": gpu_id.split() if gpu_id else []},
pid=pid,
),
)
for pid, ip, gpu_id in zip(pids, ips, gpu_ids)
]
return wg

def setup_and_check_worker_group(pids, ips, gpu_ids, expected_local_ranks):
"""
Create a worker group, group workers by IP, and check local ranks assignment.
Args:
pids: List of unique process IDs.
ips: List of IP addresses corresponding to each PID.
gpu_ids: List of GPU IDs or None for each PID.
expected_local_ranks: Dictionary mapping PID to the
expected local rank.
"""
wg = create_worker_group(pids=pids, ips=ips, gpu_ids=gpu_ids)
wg.sort_workers_by_ip_and_gpu_id()

# Build local ranks according to the logics in
# `BackendExecutor._create_rank_world_size_mappings()`
ip_dict = defaultdict(int)
local_ranks_map = defaultdict(int)
for w in wg.workers:
local_ranks_map[w.metadata.pid] = ip_dict[w.metadata.node_ip]
ip_dict[w.metadata.node_ip] += 1

local_ranks = [local_ranks_map[pid] for pid in pids]

assert (
local_ranks == expected_local_ranks
), "Incorrect local ranks allocation!\n"
f"Expect: {expected_local_ranks}\nGot: {local_ranks}"

# Define the worker configurations for different scenarios
# For workers without GPU resources, their original order will be preserved
cpu_workers_config = {
"pids": [0, 1, 2, 3, 4, 5, 6, 7],
"ips": ["2", "2", "1", "1", "2", "1", "1", "2"],
"gpu_ids": [None] * 8,
"expected_local_ranks": [0, 1, 0, 1, 2, 2, 3, 3],
}

gpu_workers_single_gpu_config = {
"pids": [0, 1, 2, 3, 4, 5, 6, 7],
"ips": ["2", "2", "1", "1", "2", "1", "1", "2"],
"gpu_ids": ["1", "0", "3", "2", "2", "0", "1", "3"],
"expected_local_ranks": [1, 0, 3, 2, 2, 0, 1, 3],
}

# For workers with multiple gpus, sort by their lowest gpu id
gpu_workers_multiple_gpus_config = {
"pids": [0, 1, 2, 3],
"ips": ["2", "1", "1", "2"],
"gpu_ids": ["1,3", "2,1", "0,3", "0,2"],
"expected_local_ranks": [1, 1, 0, 0],
}

# Setup and check worker groups for each configuration
setup_and_check_worker_group(**cpu_workers_config)
setup_and_check_worker_group(**gpu_workers_single_gpu_config)
setup_and_check_worker_group(**gpu_workers_multiple_gpus_config)


def test_execute_single(ray_start_2_cpus):
wg = WorkerGroup(num_workers=2)

Expand Down

0 comments on commit 0e7a481

Please sign in to comment.