Skip to content

Commit

Permalink
[Core] Use map instead of list to represent resources in heartbeat me…
Browse files Browse the repository at this point in the history
…ssage (ray-project#9294)
  • Loading branch information
WangTaoTheTonic authored Jul 5, 2020
1 parent 28d18ec commit f7ac495
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 58 deletions.
12 changes: 3 additions & 9 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,9 @@ def xray_heartbeat_batch_handler(self, unused_channel, data):
message = ray.gcs_utils.HeartbeatBatchTableData.FromString(
heartbeat_data)
for heartbeat_message in message.batch:
resource_load = dict(
zip(heartbeat_message.resource_load_label,
heartbeat_message.resource_load_capacity))
total_resources = dict(
zip(heartbeat_message.resources_total_label,
heartbeat_message.resources_total_capacity))
available_resources = dict(
zip(heartbeat_message.resources_available_label,
heartbeat_message.resources_available_capacity))
resource_load = dict(heartbeat_message.resource_load)
total_resources = dict(heartbeat_message.resources_total)
available_resources = dict(heartbeat_message.resources_available)
for resource in total_resources:
available_resources.setdefault(resource, 0.0)

Expand Down
7 changes: 2 additions & 5 deletions python/ray/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -703,12 +703,9 @@ def available_resources(self):
heartbeat_data = pub_message.data
message = gcs_utils.HeartbeatTableData.FromString(heartbeat_data)
# Calculate available resources for this client
num_resources = len(message.resources_available_label)
dynamic_resources = {}
for i in range(num_resources):
resource_id = message.resources_available_label[i]
dynamic_resources[resource_id] = (
message.resources_available_capacity[i])
for resource_id, capacity in message.resources_available.items():
dynamic_resources[resource_id] = capacity

# Update available resources for this client
client_id = ray.utils.binary_to_hex(message.client_id)
Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ void GcsNodeManager::NodeFailureDetector::HandleHeartbeat(

iter->second = num_heartbeats_timeout_;
if (!light_heartbeat_enabled_ || heartbeat_data.should_global_gc() ||
heartbeat_data.resources_available_label_size() > 0 ||
heartbeat_data.resources_total_label_size() > 0 ||
heartbeat_data.resource_load_label_size() > 0) {
heartbeat_data.resources_available_size() > 0 ||
heartbeat_data.resources_total_size() > 0 ||
heartbeat_data.resource_load_size() > 0) {
heartbeat_buffer_[node_id] = heartbeat_data;
}
}
Expand Down
14 changes: 5 additions & 9 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -221,20 +221,16 @@ message GcsNodeInfo {
message HeartbeatTableData {
// Node manager client id
bytes client_id = 1;
// TODO(hchen): Define the following resources in map format.
// Resource capacity currently available on this node manager.
repeated string resources_available_label = 2;
repeated double resources_available_capacity = 3;
map<string, double> resources_available = 2;
// Total resource capacity configured for this node manager.
repeated string resources_total_label = 4;
repeated double resources_total_capacity = 5;
map<string, double> resources_total = 3;
// Aggregate outstanding resource load on this node manager.
repeated string resource_load_label = 6;
repeated double resource_load_capacity = 7;
map<string, double> resource_load = 4;
// Object IDs that are in use by workers on this node manager's node.
repeated bytes active_object_id = 8;
repeated bytes active_object_id = 5;
// Whether this node manager is requesting global GC.
bool should_global_gc = 9;
bool should_global_gc = 6;
}

message HeartbeatBatchTableData {
Expand Down
53 changes: 21 additions & 32 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,8 @@ void NodeManager::Heartbeat() {
local_resources.GetAvailableResources())) {
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
heartbeat_data->add_resources_available_label(resource_pair.first);
heartbeat_data->add_resources_available_capacity(resource_pair.second);
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));
Expand All @@ -362,8 +362,8 @@ void NodeManager::Heartbeat() {
local_resources.GetTotalResources())) {
for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
heartbeat_data->add_resources_total_label(resource_pair.first);
heartbeat_data->add_resources_total_capacity(resource_pair.second);
(*heartbeat_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));
Expand All @@ -374,8 +374,8 @@ void NodeManager::Heartbeat() {
local_resources.GetLoadResources())) {
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
heartbeat_data->add_resource_load_label(resource_pair.first);
heartbeat_data->add_resource_load_capacity(resource_pair.second);
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
Expand All @@ -384,25 +384,25 @@ void NodeManager::Heartbeat() {
// If light heartbeat disabled, we send whole resources information every time.
for (const auto &resource_pair :
local_resources.GetAvailableResources().GetResourceMap()) {
heartbeat_data->add_resources_available_label(resource_pair.first);
heartbeat_data->add_resources_available_capacity(resource_pair.second);
(*heartbeat_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetAvailableResources(
ResourceSet(local_resources.GetAvailableResources()));

for (const auto &resource_pair :
local_resources.GetTotalResources().GetResourceMap()) {
heartbeat_data->add_resources_total_label(resource_pair.first);
heartbeat_data->add_resources_total_capacity(resource_pair.second);
(*heartbeat_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetTotalResources(
ResourceSet(local_resources.GetTotalResources()));

local_resources.SetLoadResources(local_queues_.GetResourceLoad());
for (const auto &resource_pair :
local_resources.GetLoadResources().GetResourceMap()) {
heartbeat_data->add_resource_load_label(resource_pair.first);
heartbeat_data->add_resource_load_capacity(resource_pair.second);
(*heartbeat_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
last_heartbeat_resources_.SetLoadResources(
ResourceSet(local_resources.GetLoadResources()));
Expand Down Expand Up @@ -800,37 +800,26 @@ void NodeManager::HeartbeatAdded(const ClientID &client_id,
// If light heartbeat enabled, we update remote resources only when related resources
// map in heartbeat is not empty.
if (light_heartbeat_enabled_) {
if (heartbeat_data.resources_total_label_size() > 0) {
ResourceSet remote_total(
VectorFromProtobuf(heartbeat_data.resources_total_label()),
VectorFromProtobuf(heartbeat_data.resources_total_capacity()));
if (heartbeat_data.resources_total_size() > 0) {
ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total()));
remote_resources.SetTotalResources(std::move(remote_total));
}
if (heartbeat_data.resources_available_label_size() > 0) {
ResourceSet remote_available(
VectorFromProtobuf(heartbeat_data.resources_available_label()),
VectorFromProtobuf(heartbeat_data.resources_available_capacity()));
if (heartbeat_data.resources_available_size() > 0) {
ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available()));
remote_resources.SetAvailableResources(std::move(remote_available));
}
if (heartbeat_data.resource_load_label_size() > 0) {
ResourceSet remote_load(
VectorFromProtobuf(heartbeat_data.resource_load_label()),
VectorFromProtobuf(heartbeat_data.resource_load_capacity()));
if (heartbeat_data.resource_load_size() > 0) {
ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load()));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));
}
} else {
// If light heartbeat disabled, we update remote resources every time.
ResourceSet remote_total(
VectorFromProtobuf(heartbeat_data.resources_total_label()),
VectorFromProtobuf(heartbeat_data.resources_total_capacity()));
ResourceSet remote_total(MapFromProtobuf(heartbeat_data.resources_total()));
remote_resources.SetTotalResources(std::move(remote_total));
ResourceSet remote_available(
VectorFromProtobuf(heartbeat_data.resources_available_label()),
VectorFromProtobuf(heartbeat_data.resources_available_capacity()));
ResourceSet remote_available(MapFromProtobuf(heartbeat_data.resources_available()));
remote_resources.SetAvailableResources(std::move(remote_available));
ResourceSet remote_load(VectorFromProtobuf(heartbeat_data.resource_load_label()),
VectorFromProtobuf(heartbeat_data.resource_load_capacity()));
ResourceSet remote_load(MapFromProtobuf(heartbeat_data.resource_load()));
// Extract the load information and save it locally.
remote_resources.SetLoadResources(std::move(remote_load));
}
Expand Down

0 comments on commit f7ac495

Please sign in to comment.