diff --git a/src/ray/gcs/client.cc b/src/ray/gcs/client.cc index 1f7bb8460ac6c..c1cbee11303e4 100644 --- a/src/ray/gcs/client.cc +++ b/src/ray/gcs/client.cc @@ -109,6 +109,7 @@ AsyncGcsClient::AsyncGcsClient(const std::string &address, int port, client_table_.reset(new ClientTable({primary_context_}, this, client_id)); error_table_.reset(new ErrorTable({primary_context_}, this)); driver_table_.reset(new DriverTable({primary_context_}, this)); + heartbeat_batch_table_.reset(new HeartbeatBatchTable({primary_context_}, this)); // Tables below would be sharded. object_table_.reset(new ObjectTable(shard_contexts_, this, command_type)); actor_table_.reset(new ActorTable(shard_contexts_, this)); @@ -214,6 +215,10 @@ ClassTable &AsyncGcsClient::class_table() { return *class_table_; } HeartbeatTable &AsyncGcsClient::heartbeat_table() { return *heartbeat_table_; } +HeartbeatBatchTable &AsyncGcsClient::heartbeat_batch_table() { + return *heartbeat_batch_table_; +} + ErrorTable &AsyncGcsClient::error_table() { return *error_table_; } DriverTable &AsyncGcsClient::driver_table() { return *driver_table_; } diff --git a/src/ray/gcs/client.h b/src/ray/gcs/client.h index 62813b5537b14..e44355571409a 100644 --- a/src/ray/gcs/client.h +++ b/src/ray/gcs/client.h @@ -60,6 +60,7 @@ class RAY_EXPORT AsyncGcsClient { TaskLeaseTable &task_lease_table(); ClientTable &client_table(); HeartbeatTable &heartbeat_table(); + HeartbeatBatchTable &heartbeat_batch_table(); ErrorTable &error_table(); DriverTable &driver_table(); ProfileTable &profile_table(); @@ -89,6 +90,7 @@ class RAY_EXPORT AsyncGcsClient { std::unique_ptr<TaskReconstructionLog> task_reconstruction_log_; std::unique_ptr<TaskLeaseTable> task_lease_table_; std::unique_ptr<HeartbeatTable> heartbeat_table_; + std::unique_ptr<HeartbeatBatchTable> heartbeat_batch_table_; std::unique_ptr<ErrorTable> error_table_; std::unique_ptr<ProfileTable> profile_table_; std::unique_ptr<ClientTable> client_table_; diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 2ce8d40d399f3..1cc09925c60a7 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -15,6 +15,7 @@ enum TablePrefix:int { FUNCTION, TASK_RECONSTRUCTION, HEARTBEAT, + HEARTBEAT_BATCH, ERROR_INFO, DRIVER, PROFILE, @@ -30,6 +31,7 @@ enum TablePubsub:int { OBJECT, ACTOR, HEARTBEAT, + HEARTBEAT_BATCH, ERROR_INFO, TASK_LEASE, DRIVER, @@ -262,6 +264,10 @@ table HeartbeatTableData { resource_load_capacity: [double]; } +table HeartbeatBatchTableData { + batch: [HeartbeatTableData]; +} + // Data for a lease on task execution. table TaskLeaseData { // Node manager client ID. diff --git a/src/ray/gcs/tables.cc b/src/ray/gcs/tables.cc index 24c053b21b0fe..c6c12aa53069c 100644 --- a/src/ray/gcs/tables.cc +++ b/src/ray/gcs/tables.cc @@ -479,6 +479,7 @@ template class Log<ActorID, ActorTableData>; template class Log<TaskID, TaskReconstructionData>; template class Table<TaskID, TaskLeaseData>; template class Table<ClientID, HeartbeatTableData>; +template class Table<ClientID, HeartbeatBatchTableData>; template class Log<JobID, ErrorTableData>; template class Log<UniqueID, ClientTableData>; template class Log<JobID, DriverTableData>; diff --git a/src/ray/gcs/tables.h b/src/ray/gcs/tables.h index 2847376fd5b6d..5cca066fb453c 100644 --- a/src/ray/gcs/tables.h +++ b/src/ray/gcs/tables.h @@ -351,6 +351,17 @@ class HeartbeatTable : public Table<ClientID, HeartbeatTableData> { virtual ~HeartbeatTable() {} }; +class HeartbeatBatchTable : public Table<ClientID, HeartbeatBatchTableData> { + public: + HeartbeatBatchTable(const std::vector<std::shared_ptr<RedisContext>> &contexts, + AsyncGcsClient *client) + : Table(contexts, client) { + pubsub_channel_ = TablePubsub::HEARTBEAT_BATCH; + prefix_ = TablePrefix::HEARTBEAT_BATCH; + } + virtual ~HeartbeatBatchTable() {} +}; + class DriverTable : public Log<JobID, DriverTableData> { public: DriverTable(const std::vector<std::shared_ptr<RedisContext>> &contexts, @@ -359,6 +370,7 @@ class DriverTable : public Log<JobID, DriverTableData> { pubsub_channel_ = TablePubsub::DRIVER; prefix_ = TablePrefix::DRIVER; }; + virtual ~DriverTable() {} /// Appends driver data to the driver table. diff --git a/src/ray/raylet/monitor.cc b/src/ray/raylet/monitor.cc index 57ffd2fabfc38..baaa7070022d9 100644 --- a/src/ray/raylet/monitor.cc +++ b/src/ray/raylet/monitor.cc @@ -23,14 +23,16 @@ Monitor::Monitor(boost::asio::io_service &io_service, const std::string &redis_a RAY_CHECK_OK(gcs_client_.Attach(io_service)); } -void Monitor::HandleHeartbeat(const ClientID &client_id) { +void Monitor::HandleHeartbeat(const ClientID &client_id, + const HeartbeatTableDataT &heartbeat_data) { heartbeats_[client_id] = num_heartbeats_timeout_; + heartbeat_buffer_[client_id] = heartbeat_data; } void Monitor::Start() { const auto heartbeat_callback = [this](gcs::AsyncGcsClient *client, const ClientID &id, const HeartbeatTableDataT &heartbeat_data) { - HandleHeartbeat(id); + HandleHeartbeat(id, heartbeat_data); }; RAY_CHECK_OK(gcs_client_.heartbeat_table().Subscribe( UniqueID::nil(), UniqueID::nil(), heartbeat_callback, nullptr, nullptr)); @@ -66,6 +68,18 @@ void Monitor::Tick() { } } + // Send any buffered heartbeats as a single publish. + if (!heartbeat_buffer_.empty()) { + auto batch = std::make_shared<HeartbeatBatchTableDataT>(); + for (const auto &heartbeat : heartbeat_buffer_) { + batch->batch.push_back(std::unique_ptr<HeartbeatTableDataT>( + new HeartbeatTableDataT(heartbeat.second))); + } + RAY_CHECK_OK(gcs_client_.heartbeat_batch_table().Add(UniqueID::nil(), UniqueID::nil(), + batch, nullptr)); + heartbeat_buffer_.clear(); + } + auto heartbeat_period = boost::posix_time::milliseconds( RayConfig::instance().heartbeat_timeout_milliseconds()); heartbeat_timer_.expires_from_now(heartbeat_period); diff --git a/src/ray/raylet/monitor.h b/src/ray/raylet/monitor.h index b300bf4cf8a00..bb698b07f6743 100644 --- a/src/ray/raylet/monitor.h +++ b/src/ray/raylet/monitor.h @@ -33,7 +33,9 @@ class Monitor { /// Handle a heartbeat from a Raylet. /// /// \param client_id The client ID of the Raylet that sent the heartbeat. - void HandleHeartbeat(const ClientID &client_id); + /// \param heartbeat_data The heartbeat sent by the client. + void HandleHeartbeat(const ClientID &client_id, + const HeartbeatTableDataT &heartbeat_data); private: /// A client to the GCS, through which heartbeats are received. @@ -47,6 +49,8 @@ class Monitor { std::unordered_map<ClientID, int64_t> heartbeats_; /// The Raylets that have been marked as dead in the client table. std::unordered_set<ClientID> dead_clients_; + /// A buffer containing heartbeats received from node managers in the last tick. + std::unordered_map<ClientID, HeartbeatTableDataT> heartbeat_buffer_; }; } // namespace raylet diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e5ce98456d746..1cf320953bb86 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -156,15 +156,16 @@ ray::Status NodeManager::RegisterGcs() { }; gcs_client_->client_table().RegisterClientRemovedCallback(node_manager_client_removed); - // Subscribe to node manager heartbeats. - const auto heartbeat_added = [this](gcs::AsyncGcsClient *client, const ClientID &id, - const HeartbeatTableDataT &heartbeat_data) { - HeartbeatAdded(client, id, heartbeat_data); + // Subscribe to heartbeat batches from the monitor. + const auto &heartbeat_batch_added = [this]( + gcs::AsyncGcsClient *client, const ClientID &id, + const HeartbeatBatchTableDataT &heartbeat_batch) { + HeartbeatBatchAdded(heartbeat_batch); }; - RAY_RETURN_NOT_OK(gcs_client_->heartbeat_table().Subscribe( - UniqueID::nil(), UniqueID::nil(), heartbeat_added, nullptr, + RAY_RETURN_NOT_OK(gcs_client_->heartbeat_batch_table().Subscribe( + UniqueID::nil(), UniqueID::nil(), heartbeat_batch_added, nullptr, [](gcs::AsyncGcsClient *client) { - RAY_LOG(DEBUG) << "heartbeat table subscription done callback called."; + RAY_LOG(DEBUG) << "Heartbeat batch table subscription done."; })); // Subscribe to driver table updates. @@ -399,14 +400,9 @@ void NodeManager::ClientRemoved(const ClientTableDataT &client_data) { remote_server_connections_.erase(client_id); } -void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &client_id, +void NodeManager::HeartbeatAdded(const ClientID &client_id, const HeartbeatTableDataT &heartbeat_data) { RAY_LOG(DEBUG) << "[HeartbeatAdded]: received heartbeat from client id " << client_id; - const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); - if (client_id == local_client_id) { - // Skip heartbeats from self. - return; - } // Locate the client id in remote client table and update available resources based on // the received heartbeat information. auto it = cluster_resource_map_.find(client_id); @@ -427,9 +423,8 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl remote_resources.SetAvailableResources(std::move(remote_available)); // Extract the load information and save it locally. remote_resources.SetLoadResources(std::move(remote_load)); - - auto decision = scheduling_policy_.SpillOver(remote_resources); // Extract decision for this local scheduler. + auto decision = scheduling_policy_.SpillOver(remote_resources); std::unordered_set<TaskID> local_task_ids; for (const auto &task_id : decision) { // (See design_docs/task_states.rst for the state transition diagram.) @@ -448,6 +443,19 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl } } +void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableDataT &heartbeat_batch) { + const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); + // Update load information provided by each heartbeat. + for (const auto &heartbeat_data : heartbeat_batch.batch) { + const ClientID &client_id = ClientID::from_binary(heartbeat_data->client_id); + if (client_id == local_client_id) { + // Skip heartbeats from self. + continue; + } + HeartbeatAdded(client_id, *heartbeat_data); + } +} + void NodeManager::HandleActorCreation(const ActorID &actor_id, const std::vector<ActorTableDataT> &data) { RAY_LOG(DEBUG) << "Actor creation notification received: " << actor_id; diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index a920bac0ff285..8a26616617ee3 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -127,12 +127,14 @@ class NodeManager { /// Handler for a heartbeat notification from the GCS. /// - /// \param client The GCS client. /// \param id The ID of the node manager that sent the heartbeat. /// \param data The heartbeat data including load information. /// \return Void. - void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id, - const HeartbeatTableDataT &data); + void HeartbeatAdded(const ClientID &id, const HeartbeatTableDataT &data); + /// Handler for a heartbeat batch notification from the GCS + /// + /// \param heartbeat_batch The batch of heartbeat data. + void HeartbeatBatchAdded(const HeartbeatBatchTableDataT &heartbeat_batch); /// Methods for task scheduling. diff --git a/src/ray/raylet/scheduling_policy.cc b/src/ray/raylet/scheduling_policy.cc index eb2d416323395..177d2fd806c89 100644 --- a/src/ray/raylet/scheduling_policy.cc +++ b/src/ray/raylet/scheduling_policy.cc @@ -1,6 +1,8 @@ -#include "scheduling_policy.h" - +#include <algorithm> #include <chrono> +#include <random> + +#include "scheduling_policy.h" #include "ray/util/logging.h" @@ -123,21 +125,23 @@ std::vector<TaskID> SchedulingPolicy::SpillOver( ResourceSet new_load(remote_scheduling_resources.GetLoadResources()); - // Check if we can accommodate an infeasible task. + // Check if we can accommodate infeasible tasks. for (const auto &task : scheduling_queue_.GetInfeasibleTasks()) { const auto &spec = task.GetTaskSpecification(); - if (spec.GetRequiredPlacementResources().IsSubset( - remote_scheduling_resources.GetTotalResources())) { + const auto &placement_resources = spec.GetRequiredPlacementResources(); + if (placement_resources.IsSubset(remote_scheduling_resources.GetTotalResources())) { decision.push_back(spec.TaskId()); new_load.AddResources(spec.GetRequiredResources()); } } + // Try to accommodate up to a single ready task. for (const auto &task : scheduling_queue_.GetReadyTasks()) { const auto &spec = task.GetTaskSpecification(); if (!spec.IsActorTask()) { + // Make sure the node has enough available resources to prevent forwarding cycles. if (spec.GetRequiredPlacementResources().IsSubset( - remote_scheduling_resources.GetTotalResources())) { + remote_scheduling_resources.GetAvailableResources())) { decision.push_back(spec.TaskId()); new_load.AddResources(spec.GetRequiredResources()); break; diff --git a/src/ray/raylet/scheduling_policy.h b/src/ray/raylet/scheduling_policy.h index b6dc272754131..7449f8ba0b3b1 100644 --- a/src/ray/raylet/scheduling_policy.h +++ b/src/ray/raylet/scheduling_policy.h @@ -36,6 +36,13 @@ class SchedulingPolicy { std::unordered_map<ClientID, SchedulingResources> &cluster_resources, const ClientID &local_client_id); + /// \brief Given a set of cluster resources perform a spill-over scheduling operation. + /// + /// \param cluster_resources: a set of cluster resources containing resource and load + /// information for some subset of the cluster. For all client IDs in the returned + /// placement map, the corresponding SchedulingResources::resources_load_ is + /// incremented by the aggregate resource demand of the tasks assigned to it. + /// \return Scheduling decision, mapping tasks to raylets for placement. std::vector<TaskID> SpillOver(SchedulingResources &remote_scheduling_resources) const; /// \brief SchedulingPolicy destructor. diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index ddd75167c3038..03c85dfbe7a57 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -145,8 +145,8 @@ const std::string ResourceSet::ToString() const { // Convert the first element to a string. if (it != resource_capacity_.end()) { return_string += "{" + it->first + "," + std::to_string(it->second) + "}"; + it++; } - it++; // Add the remaining elements to the string (along with a comma). for (; it != resource_capacity_.end(); ++it) { diff --git a/test/actor_test.py b/test/actor_test.py index 1d004ff3ea6cc..8981292e0b637 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -3,6 +3,7 @@ from __future__ import print_function import collections +import json import random import numpy as np import os @@ -919,13 +920,16 @@ def test_actor_multiple_gpus_from_multiple_tasks(shutdown_only): num_local_schedulers=num_local_schedulers, redirect_output=True, num_cpus=(num_local_schedulers * [10 * num_gpus_per_scheduler]), - num_gpus=(num_local_schedulers * [num_gpus_per_scheduler])) + num_gpus=(num_local_schedulers * [num_gpus_per_scheduler]), + _internal_config=json.dumps({ + "num_heartbeats_timeout": 1000 + })) @ray.remote - def create_actors(n): + def create_actors(i, n): @ray.remote(num_gpus=1) class Actor(object): - def __init__(self): + def __init__(self, i, j): self.gpu_ids = ray.get_gpu_ids() def get_location_and_ids(self): @@ -933,15 +937,44 @@ def get_location_and_ids(self): ray.worker.global_worker.plasma_client.store_socket_name), tuple(self.gpu_ids)) + def sleep(self): + time.sleep(100) + # Create n actors. - for _ in range(n): - Actor.remote() + actors = [] + for j in range(n): + actors.append(Actor.remote(i, j)) + + locations = ray.get( + [actor.get_location_and_ids.remote() for actor in actors]) - ray.get([ - create_actors.remote(num_gpus_per_scheduler) - for _ in range(num_local_schedulers) + # Put each actor to sleep for a long time to prevent them from getting + # terminated. + for actor in actors: + actor.sleep.remote() + + return locations + + all_locations = ray.get([ + create_actors.remote(i, num_gpus_per_scheduler) + for i in range(num_local_schedulers) ]) + # Make sure that no two actors are assigned to the same GPU. + node_names = { + location + for locations in all_locations for location, gpu_id in locations + } + assert len(node_names) == num_local_schedulers + + # Keep track of which GPU IDs are being used for each location. + gpus_in_use = {node_name: [] for node_name in node_names} + for locations in all_locations: + for location, gpu_ids in locations: + gpus_in_use[location].extend(gpu_ids) + for node_name in node_names: + assert len(set(gpus_in_use[node_name])) == num_gpus_per_scheduler + @ray.remote(num_gpus=1) class Actor(object): def __init__(self):