forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_placement_groups.py
119 lines (99 loc) · 3.67 KB
/
test_placement_groups.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import os
import unittest
import ray
from ray import air
from ray import tune
from ray.tune import Callback
from ray.rllib.algorithms.pg import PG, PGConfig
from ray.tune.experiment import Trial
from ray.tune.execution.placement_groups import PlacementGroupFactory
trial_executor = None
class _TestCallback(Callback):
def on_step_end(self, iteration, trials, **info):
num_running = len([t for t in trials if t.status == Trial.RUNNING])
# All 3 trials (3 different learning rates) should be scheduled.
assert 3 == min(3, len(trials))
# Cannot run more than 2 at a time
# (due to different resource restrictions in the test cases).
assert num_running <= 2
class TestPlacementGroups(unittest.TestCase):
def setUp(self) -> None:
os.environ["TUNE_PLACEMENT_GROUP_RECON_INTERVAL"] = "0"
ray.init(num_cpus=6)
def tearDown(self) -> None:
ray.shutdown()
def test_overriding_default_resource_request(self):
# 3 Trials: Can only run 2 at a time (num_cpus=6; needed: 3).
config = (
PGConfig()
.training(
model={"fcnet_hiddens": [10]}, lr=tune.grid_search([0.1, 0.01, 0.001])
)
.environment("CartPole-v1")
.rollouts(num_rollout_workers=2)
.framework("tf")
)
# Create an Algorithm with an overridden default_resource_request
# method that returns a PlacementGroupFactory.
class MyAlgo(PG):
@classmethod
def default_resource_request(cls, config):
head_bundle = {"CPU": 1, "GPU": 0}
child_bundle = {"CPU": 1}
return PlacementGroupFactory(
[head_bundle, child_bundle, child_bundle],
strategy=config["placement_strategy"],
)
tune.register_trainable("my_trainable", MyAlgo)
tune.Tuner(
"my_trainable",
param_space=config,
run_config=air.RunConfig(
stop={"training_iteration": 2},
verbose=2,
callbacks=[_TestCallback()],
),
).fit()
def test_default_resource_request(self):
config = (
PGConfig()
.rollouts(
num_rollout_workers=2,
)
.training(
model={"fcnet_hiddens": [10]}, lr=tune.grid_search([0.1, 0.01, 0.001])
)
.environment("CartPole-v1")
.framework("torch")
.resources(placement_strategy="SPREAD", num_cpus_per_worker=2)
)
# 3 Trials: Can only run 1 at a time (num_cpus=6; needed: 5).
tune.Tuner(
PG,
param_space=config,
run_config=air.RunConfig(
stop={"training_iteration": 2},
verbose=2,
callbacks=[_TestCallback()],
),
tune_config=tune.TuneConfig(reuse_actors=False),
).fit()
def test_default_resource_request_plus_manual_leads_to_error(self):
config = (
PGConfig()
.training(model={"fcnet_hiddens": [10]})
.environment("CartPole-v1")
.rollouts(num_rollout_workers=0)
)
try:
tune.Tuner(
tune.with_resources(PG, PlacementGroupFactory([{"CPU": 1}])),
param_space=config,
run_config=air.RunConfig(stop={"training_iteration": 2}, verbose=2),
).fit()
except ValueError as e:
assert "have been automatically set to" in e.args[0]
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))