Skip to content

Commit

Permalink
Change resource bookkeeping to account for machine precision. (ray-pr…
Browse files Browse the repository at this point in the history
  • Loading branch information
williamma12 authored and robertnishihara committed Apr 23, 2019
1 parent b4ee50f commit c99e3ca
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 164 deletions.
4 changes: 2 additions & 2 deletions python/ray/tests/test_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,13 +1402,13 @@ def inc(self):
return self.x

# Create many actors. It should take a while to finish initializing them.
actors = [Counter.remote() for _ in range(100)]
actors = [Counter.remote() for _ in range(15)]
# Allow some time to forward the actor creation tasks to the other node.
time.sleep(0.1)
# Kill the second node.
cluster.remove_node(remote_node)

# Get all of the results
# Get all of the results.
results = ray.get([actor.inc.remote() for actor in actors])
assert results == [1 for actor in actors]

Expand Down
75 changes: 72 additions & 3 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,71 @@ def method(self):
assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2]


def test_many_fractional_resources(shutdown_only):
ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2})

@ray.remote
def g():
return 1

@ray.remote
def f(block, accepted_resources):
true_resources = {
resource: value[0][1]
for resource, value in ray.get_resource_ids().items()
}
if block:
ray.get(g.remote())
return true_resources == accepted_resources

# Check that the resource are assigned correctly.
result_ids = []
for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)):
resource_set = {"CPU": int(rand1 * 10000) / 10000}
result_ids.append(f._remote([False, resource_set], num_cpus=rand1))

resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000}
result_ids.append(f._remote([False, resource_set], num_gpus=rand1))

resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000}
result_ids.append(
f._remote([False, resource_set], resources={"Custom": rand1}))

resource_set = {
"CPU": int(rand1 * 10000) / 10000,
"GPU": int(rand2 * 10000) / 10000,
"Custom": int(rand3 * 10000) / 10000
}
result_ids.append(
f._remote(
[False, resource_set],
num_cpus=rand1,
num_gpus=rand2,
resources={"Custom": rand3}))
result_ids.append(
f._remote(
[True, resource_set],
num_cpus=rand1,
num_gpus=rand2,
resources={"Custom": rand3}))
assert all(ray.get(result_ids))

# Check that the available resources at the end are the same as the
# beginning.
stop_time = time.time() + 10
correct_available_resources = False
while time.time() < stop_time:
if ray.global_state.available_resources() == {
"CPU": 2.0,
"GPU": 2.0,
"Custom": 2.0,
}:
correct_available_resources = True
break
if not correct_available_resources:
assert False, "Did not get correct available resources."


def test_get_multiple(ray_start_regular):
object_ids = [ray.put(i) for i in range(10)]
assert ray.get(object_ids) == list(range(10))
Expand Down Expand Up @@ -2126,20 +2191,24 @@ def f():
ray.get(results)


# TODO: 5 retry attempts may be too little for Travis and we may need to
# increase it if this test begins to be flaky on Travis.
def test_zero_capacity_deletion_semantics(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1})

def test():
resources = ray.global_state.available_resources()
MAX_RETRY_ATTEMPTS = 5
retry_count = 0

while resources and retry_count < 5:
while resources and retry_count < MAX_RETRY_ATTEMPTS:
time.sleep(0.1)
resources = ray.global_state.available_resources()
retry_count += 1

if retry_count >= 5:
raise RuntimeError("Resources were available even after retries.")
if retry_count >= MAX_RETRY_ATTEMPTS:
raise RuntimeError(
"Resources were available even after five retries.")

return resources

Expand Down
20 changes: 5 additions & 15 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1468,17 +1468,13 @@ void NodeManager::HandleTaskBlocked(const std::shared_ptr<LocalClientConnection>
local_queues_.QueueTasks({task}, TaskState::RUNNING);
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
std::unordered_map<std::string, double> cpu_resources;
if (required_cpus > 0) {
cpu_resources[kCPU_ResourceLabel] = required_cpus;
}
const ResourceSet cpu_resources = required_resources.GetNumCpus();

// Release the CPU resources.
auto const cpu_resource_ids = worker->ReleaseTaskCpuResources();
local_available_resources_.Release(cpu_resource_ids);
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
ResourceSet(cpu_resources));
cpu_resources);
worker->MarkBlocked();

// Try dispatching tasks since we may have released some resources.
Expand Down Expand Up @@ -1521,12 +1517,7 @@ void NodeManager::HandleTaskUnblocked(
local_queues_.QueueTasks({task}, TaskState::RUNNING);
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = required_resources.GetNumCpus();
std::unordered_map<std::string, double> cpu_resources_map;
if (required_cpus > 0) {
cpu_resources_map[kCPU_ResourceLabel] = required_cpus;
}
const ResourceSet cpu_resources(cpu_resources_map);
const ResourceSet cpu_resources = required_resources.GetNumCpus();
// Check if we can reacquire the CPU resources.
bool oversubscribed = !local_available_resources_.Contains(cpu_resources);

Expand Down Expand Up @@ -1633,7 +1624,8 @@ bool NodeManager::AssignTask(const Task &task) {

if (spec.IsActorCreationTask()) {
// Check that we are not placing an actor creation task on a node with 0 CPUs.
RAY_CHECK(cluster_resource_map_[my_client_id].GetTotalResources().GetNumCpus() != 0);
RAY_CHECK(cluster_resource_map_[my_client_id].GetTotalResources().GetResourceMap().at(
kCPU_ResourceLabel) != 0);
worker->SetLifetimeResourceIds(acquired_resources);
} else {
worker->SetTaskResourceIds(acquired_resources);
Expand Down Expand Up @@ -2037,8 +2029,6 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task,
RAY_LOG(INFO) << "Failed to forward task " << task_id << " to node manager "
<< node_manager_id;

// TODO(romilb): We should probably revert the load subtraction from
// SchedulingPolicy::Schedule()
// Mark the failed task as pending to let other raylets know that we still
// have the task. TaskDependencyManager::TaskPending() is assumed to be
// idempotent.
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ class NodeManager {
/// \param data Data associated with the new client.
/// \return Void.
void ClientAdded(const ClientTableDataT &data);

/// Handler for the removal of a GCS client.
/// \param client_data Data associated with the removed client.
/// \return Void.
Expand Down
7 changes: 4 additions & 3 deletions src/ray/raylet/scheduling_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ std::unordered_map<TaskID, ClientID> SchedulingPolicy::Schedule(
const auto &node_resources = client_resource_pair.second;
ResourceSet available_node_resources =
ResourceSet(node_resources.GetAvailableResources());
// TODO(romilb): Why do we need to subtract load from available resources?
// Even if we don't the code path below for choosing a dst_client_id would be
// similar.
// We have to subtract the current "load" because we set the current "load"
// to be the resources used by tasks that are in the
// `SchedulingQueue::ready_queue_` in NodeManager::ProcessGetTaskMessage's
// call to SchedulingQueue::GetResourceLoad.
available_node_resources.SubtractResources(node_resources.GetLoadResources());
RAY_LOG(DEBUG) << "client_id " << node_client_id
<< " avail: " << node_resources.GetAvailableResources().ToString()
Expand Down
Loading

0 comments on commit c99e3ca

Please sign in to comment.