Skip to content

Commit

Permalink
[GCS] Global state accessor from node resource table (ray-project#8658)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashione authored Jun 2, 2020
1 parent 207ab44 commit 4cbbc15
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 134 deletions.
24 changes: 10 additions & 14 deletions java/runtime/src/main/java/io/ray/runtime/gcs/GcsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.ray.api.id.UniqueId;
import io.ray.api.runtimecontext.NodeInfo;
import io.ray.runtime.config.RayConfig;
import io.ray.runtime.gcs.GlobalStateAccessor;
import io.ray.runtime.generated.Gcs;
import io.ray.runtime.generated.Gcs.ActorCheckpointIdData;
import io.ray.runtime.generated.Gcs.GcsNodeInfo;
Expand Down Expand Up @@ -96,19 +95,16 @@ public List<NodeInfo> getAllNodeInfo() {
}

private Map<String, Double> getResourcesForClient(UniqueId clientId) {
final String prefix = TablePrefix.NODE_RESOURCE.toString();
final byte[] key = ArrayUtils.addAll(prefix.getBytes(), clientId.getBytes());
Map<byte[], byte[]> results = primary.hgetAll(key);
Map<String, Double> resources = new HashMap<>();
for (Map.Entry<byte[], byte[]> entry : results.entrySet()) {
String resourceName = new String(entry.getKey());
Gcs.ResourceTableData resourceTableData;
try {
resourceTableData = Gcs.ResourceTableData.parseFrom(entry.getValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Received invalid protobuf data from GCS.");
}
resources.put(resourceName, resourceTableData.getResourceCapacity());
byte[] resourceMapBytes = globalStateAccessor.getNodeResourceInfo(clientId);
Gcs.ResourceMap resourceMap;
try {
resourceMap = Gcs.ResourceMap.parseFrom(resourceMapBytes);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("Received invalid protobuf data from GCS.");
}
HashMap<String, Double> resources = new HashMap<>();
for (Map.Entry<String, Gcs.ResourceTableData> entry : resourceMap.getItemsMap().entrySet()) {
resources.put(entry.getKey(), entry.getValue().getResourceCapacity());
}
return resources;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.base.Preconditions;
import io.ray.api.id.ActorId;
import io.ray.api.id.UniqueId;
import java.util.List;

/**
Expand Down Expand Up @@ -65,6 +66,18 @@ public List<byte[]> getAllNodeInfo() {
}
}

/**
* @param nodeId node unique id.
* @return A map of node resource info in protobuf schema.
*/
public byte[] getNodeResourceInfo(UniqueId nodeId) {
synchronized (GlobalStateAccessor.class) {
Preconditions.checkState(globalStateAccessorNativePointer != 0,
"Get resource info by node id when global state accessor have been destroyed.");
return nativeGetNodeResourceInfo(globalStateAccessorNativePointer, nodeId.getBytes());
}
}

/**
* @return A list of actor info with ActorInfo protobuf schema.
*/
Expand Down Expand Up @@ -120,6 +133,8 @@ private void destroyGlobalStateAccessor() {

private native List<byte[]> nativeGetAllNodeInfo(long nativePtr);

private native byte[] nativeGetNodeResourceInfo(long nativePtr, byte[] nodeId);

private native List<byte[]> nativeGetAllActorInfo(long nativePtr);

private native byte[] nativeGetActorInfo(long nativePtr, byte[] actorId);
Expand Down
2 changes: 2 additions & 0 deletions python/ray/gcs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
TablePrefix,
TablePubsub,
TaskTableData,
ResourceMap,
ResourceTableData,
ObjectLocationInfo,
PubSubMessage,
Expand All @@ -33,6 +34,7 @@
"TablePrefix",
"TablePubsub",
"TaskTableData",
"ResourceMap",
"ResourceTableData",
"construct_error_message",
"ObjectLocationInfo",
Expand Down
2 changes: 2 additions & 0 deletions python/ray/includes/global_state_accessor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ from libcpp.vector cimport vector as c_vector
from libcpp.memory cimport unique_ptr
from ray.includes.unique_ids cimport (
CActorID,
CClientID,
CObjectID,
)

Expand All @@ -21,3 +22,4 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
unique_ptr[c_string] GetObjectInfo(const CObjectID &object_id)
c_vector[c_string] GetAllActorInfo()
unique_ptr[c_string] GetActorInfo(const CActorID &actor_id)
c_string GetNodeResourceInfo(const CClientID &node_id)
4 changes: 4 additions & 0 deletions python/ray/includes/global_state_accessor.pxi
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ray.includes.unique_ids cimport (
CActorID,
CClientID,
CObjectID,
)

Expand Down Expand Up @@ -53,3 +54,6 @@ cdef class GlobalStateAccessor:
if actor_info:
return c_string(actor_info.get().data(), actor_info.get().size())
return None

def get_node_resource_info(self, node_id):
return self.inner.get().GetNodeResourceInfo(CClientID.FromBinary(node_id.binary()))
127 changes: 25 additions & 102 deletions python/ray/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,106 +17,6 @@
logger = logging.getLogger(__name__)


def _parse_client_table(redis_client):
"""Read the client table.
Args:
redis_client: A client to the primary Redis shard.
Returns:
A list of information about the nodes in the cluster.
"""
NIL_CLIENT_ID = ray.ClientID.nil().binary()
message = redis_client.execute_command(
"RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("CLIENT"), "",
NIL_CLIENT_ID)

# Handle the case where no clients are returned. This should only
# occur potentially immediately after the cluster is started.
if message is None:
return []

node_info = {}
gcs_entry = gcs_utils.GcsEntry.FromString(message)

ordered_node_ids = []

# Since GCS entries are append-only, we override so that
# only the latest entries are kept.
for entry in gcs_entry.entries:
item = gcs_utils.GcsNodeInfo.FromString(entry)

node_id = ray.utils.binary_to_hex(item.node_id)

if item.state == gcs_utils.GcsNodeInfo.GcsNodeState.Value("ALIVE"):
ordered_node_ids.append(node_id)
node_info[node_id] = {
"NodeID": node_id,
"Alive": True,
"NodeManagerAddress": item.node_manager_address,
"NodeManagerHostname": item.node_manager_hostname,
"NodeManagerPort": item.node_manager_port,
"ObjectManagerPort": item.object_manager_port,
"ObjectStoreSocketName": item.object_store_socket_name,
"RayletSocketName": item.raylet_socket_name
}

# If this node is being removed, then it must
# have previously been inserted, and
# it cannot have previously been removed.
else:
assert node_id in node_info, "node not found!"
assert node_info[node_id]["Alive"], (
"Unexpected duplicate removal of node.")
node_info[node_id]["Alive"] = False
# Fill resource info.
for node_id in ordered_node_ids:
if node_info[node_id]["Alive"]:
resources = _parse_resource_table(redis_client, node_id)
else:
resources = {}
node_info[node_id]["Resources"] = resources
# NOTE: We return the list comprehension below instead of simply doing
# 'list(node_info.values())' in order to have the nodes appear in the order
# that they joined the cluster. Python dictionaries do not preserve
# insertion order. We could use an OrderedDict, but then we'd have to be
# sure to only insert a given node a single time (clients that die appear
# twice in the GCS log).
return [node_info[node_id] for node_id in ordered_node_ids]


def _parse_resource_table(redis_client, client_id):
"""Read the resource table with given client id.
Args:
redis_client: A client to the primary Redis shard.
client_id: The client ID of the node in hex.
Returns:
A dict of resources about this node.
"""
message = redis_client.execute_command(
"RAY.TABLE_LOOKUP", gcs_utils.TablePrefix.Value("NODE_RESOURCE"), "",
ray.utils.hex_to_binary(client_id))

if message is None:
return {}

resources = {}
gcs_entry = gcs_utils.GcsEntry.FromString(message)
entries_len = len(gcs_entry.entries)
if entries_len % 2 != 0:
raise ValueError("Invalid entry size for resource lookup: " +
str(entries_len))

for i in range(0, entries_len, 2):
resource_table_data = gcs_utils.ResourceTableData.FromString(
gcs_entry.entries[i + 1])
resources[decode(
gcs_entry.entries[i])] = resource_table_data.resource_capacity
return resources


class GlobalState:
"""A class used to interface with the Ray control state.
Expand Down Expand Up @@ -356,6 +256,30 @@ def _gen_actor_info(self, actor_table_data):
}
return actor_info

def node_resource_table(self, node_id=None):
"""Fetch and parse the node resource table info for one.
Args:
node_id: An node ID to fetch information about.
Returns:
Information from the node resource table.
"""
self._check_connected()

node_id = ray.ClientID(hex_to_binary(node_id))
node_resource_bytes = \
self.global_state_accessor.get_node_resource_info(node_id)
if node_resource_bytes is None:
return {}
else:
node_resource_info = gcs_utils.ResourceMap.FromString(
node_resource_bytes)
return {
key: value.resource_capacity
for key, value in node_resource_info.items.items()
}

def node_table(self):
"""Fetch and parse the Gcs node info table.
Expand All @@ -381,8 +305,7 @@ def node_table(self):
"RayletSocketName": item.raylet_socket_name
}
node_info["alive"] = node_info["Alive"]
node_info["Resources"] = _parse_resource_table(
self.redis_client,
node_info["Resources"] = self.node_resource_table(
node_info["NodeID"]) if node_info["Alive"] else {}
results.append(node_info)
return results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *env, jo
});
}

JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr, jbyteArray node_id_bytes) {
auto *gcs_accessor =
reinterpret_cast<ray::gcs::GlobalStateAccessor *>(gcs_accessor_ptr);
auto node_id = JavaByteArrayToId<ray::ClientID>(env, node_id_bytes);
auto node_resource_info = gcs_accessor->GetNodeResourceInfo(node_id);
return static_cast<jbyteArray>(NativeStringToJavaByteArray(env, node_resource_info));
}

JNIEXPORT jobject JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllActorInfo(
JNIEnv *env, jobject o, jlong gcs_accessor_ptr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ JNIEXPORT jobject JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetAllNodeInfo(JNIEnv *, jobject,
jlong);

/*
* Class: io_ray_runtime_gcs_GlobalStateAccessor
* Method: nativeGetNodeResourceInfo
* Signature: (J[B)[B
*/
JNIEXPORT jbyteArray JNICALL
Java_io_ray_runtime_gcs_GlobalStateAccessor_nativeGetNodeResourceInfo(JNIEnv *, jobject,
jlong, jbyteArray);
/*
* Class: io_ray_runtime_gcs_GlobalStateAccessor
* Method: nativeGetAllActorInfo
Expand Down
21 changes: 21 additions & 0 deletions src/ray/gcs/gcs_client/global_state_accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,27 @@ std::unique_ptr<std::string> GlobalStateAccessor::GetObjectInfo(
return object_info;
}

std::string GlobalStateAccessor::GetNodeResourceInfo(const ClientID &node_id) {
rpc::ResourceMap node_resource_map;
std::promise<void> promise;
auto on_done =
[&node_resource_map, &promise](
const Status &status,
const boost::optional<ray::gcs::NodeInfoAccessor::ResourceMap> &result) {
RAY_CHECK_OK(status);
if (result) {
auto result_value = result.get();
for (auto &data : result_value) {
(*node_resource_map.mutable_items())[data.first] = *data.second;
}
}
promise.set_value();
};
RAY_CHECK_OK(gcs_client_->Nodes().AsyncGetResources(node_id, on_done));
promise.get_future().get();
return node_resource_map.SerializeAsString();
}

std::vector<std::string> GlobalStateAccessor::GetAllActorInfo() {
std::vector<std::string> actor_table_data;
std::promise<bool> promise;
Expand Down
8 changes: 8 additions & 0 deletions src/ray/gcs/gcs_client/global_state_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ class GlobalStateAccessor {
/// protobuf function.
std::unique_ptr<std::string> GetObjectInfo(const ObjectID &object_id);

/// Get information of a node resource from GCS Service.
///
/// \param node_id The ID of node to look up in the GCS Service.
/// \return node resource map info. To support multi-language, we serialize each
/// ResourceTableData and return the serialized string. Where used, it needs to be
/// deserialized with protobuf function.
std::string GetNodeResourceInfo(const ClientID &node_id);

/// Get information of all actors from GCS Service.
///
/// \return All actor info. To support multi-language, we serialize each ActorTableData
Expand Down
36 changes: 36 additions & 0 deletions src/ray/gcs/gcs_client/test/global_state_accessor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,42 @@ TEST_F(GlobalStateAccessorTest, TestNodeTable) {
}
}

TEST_F(GlobalStateAccessorTest, TestNodeResourceTable) {
int node_count = 100;
ASSERT_EQ(global_state_->GetAllNodeInfo().size(), 0);
for (int index = 0; index < node_count; ++index) {
auto node_table_data =
Mocker::GenNodeInfo(index, std::string("127.0.0.") + std::to_string(index));
auto node_id = ClientID::FromBinary(node_table_data->node_id());
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Nodes().AsyncRegister(
*node_table_data, [&promise](Status status) { promise.set_value(status.ok()); }));
WaitReady(promise.get_future(), timeout_ms_);
ray::gcs::NodeInfoAccessor::ResourceMap resources;
rpc::ResourceTableData resource_table_data;
resource_table_data.set_resource_capacity(static_cast<double>(index + 1) + 0.1);
resources[std::to_string(index)] =
std::make_shared<rpc::ResourceTableData>(resource_table_data);
RAY_IGNORE_EXPR(gcs_client_->Nodes().AsyncUpdateResources(
node_id, resources, [](Status status) { RAY_CHECK(status.ok()); }));
}
auto node_table = global_state_->GetAllNodeInfo();
ASSERT_EQ(node_table.size(), node_count);
for (int index = 0; index < node_count; ++index) {
rpc::GcsNodeInfo node_data;
node_data.ParseFromString(node_table[index]);
auto resource_map_str =
global_state_->GetNodeResourceInfo(ClientID::FromBinary(node_data.node_id()));
rpc::ResourceMap resource_map;
resource_map.ParseFromString(resource_map_str);
ASSERT_EQ(
static_cast<uint32_t>(
(*resource_map.mutable_items())[std::to_string(node_data.node_manager_port())]
.resource_capacity()),
node_data.node_manager_port() + 1);
}
}

TEST_F(GlobalStateAccessorTest, TestProfileTable) {
int profile_count = 100;
ASSERT_EQ(global_state_->GetAllProfileInfo().size(), 0);
Expand Down
Loading

0 comments on commit 4cbbc15

Please sign in to comment.