From f57feac1a250cb6dad6b843610f616cf3e2b9a46 Mon Sep 17 00:00:00 2001 From: Jiajun Yao Date: Wed, 24 Jan 2024 09:12:33 -0800 Subject: [PATCH] [Core] Remove dead code (#42600) Signed-off-by: Jiajun Yao --- python/ray/_private/function_manager.py | 4 ---- python/ray/_private/gcs_pubsub.py | 19 ----------------- python/ray/_raylet.pyx | 10 --------- python/ray/includes/common.pxd | 4 ---- src/ray/gcs/gcs_server/gcs_server.cc | 1 - src/ray/gcs/pubsub/gcs_pub_sub.cc | 9 --------- src/ray/gcs/pubsub/gcs_pub_sub.h | 3 --- src/ray/protobuf/gcs.proto | 27 ------------------------- src/ray/protobuf/pubsub.proto | 6 ++---- 9 files changed, 2 insertions(+), 81 deletions(-) diff --git a/python/ray/_private/function_manager.py b/python/ray/_private/function_manager.py index 0a74e3d3a810..0a25f8cfdfc6 100644 --- a/python/ray/_private/function_manager.py +++ b/python/ray/_private/function_manager.py @@ -173,10 +173,6 @@ def export_key(self, key): > 0 ): break - # Notify all subscribers that there is a new function exported. Note - # that the notification doesn't include any actual data. - # TODO(mwtian) implement per-job notification here. - self._worker.gcs_publisher.publish_function_key(key) def export_setup_func( self, setup_func: Callable, timeout: Optional[int] = None diff --git a/python/ray/_private/gcs_pubsub.py b/python/ray/_private/gcs_pubsub.py index 5131c0435f36..a558d9be8967 100644 --- a/python/ray/_private/gcs_pubsub.py +++ b/python/ray/_private/gcs_pubsub.py @@ -15,7 +15,6 @@ import ray._private.gcs_utils as gcs_utils import ray._private.logging_utils as logging_utils from ray.core.generated.gcs_pb2 import ErrorTableData -from ray.core.generated import dependency_pb2 from ray.core.generated import gcs_service_pb2_grpc from ray.core.generated import gcs_service_pb2 from ray.core.generated import common_pb2 @@ -41,17 +40,6 @@ def _create_log_request(log_json: dict): ] ) - @staticmethod - def _create_function_key_request(key: bytes): - return gcs_service_pb2.GcsPublishRequest( - pub_messages=[ - pubsub_pb2.PubMessage( - channel_type=pubsub_pb2.RAY_PYTHON_FUNCTION_CHANNEL, - python_function_message=dependency_pb2.PythonFunction(key=key), - ) - ] - ) - @staticmethod def _create_node_resource_usage_request(key: str, json: str): return gcs_service_pb2.GcsPublishRequest( @@ -130,13 +118,6 @@ def _pop_log_batch(queue): msg = queue.popleft() return logging_utils.log_batch_proto_to_dict(msg.log_batch_message) - @staticmethod - def _pop_function_key(queue): - if len(queue) == 0: - return None - msg = queue.popleft() - return msg.python_function_message.key - @staticmethod def _pop_resource_usage(queue): if len(queue) == 0: diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 0c6c0ae8812b..e55b597c69d6 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -125,7 +125,6 @@ from ray.includes.common cimport ( CChannelType, RAY_ERROR_INFO_CHANNEL, RAY_LOG_CHANNEL, - RAY_PYTHON_FUNCTION_CHANNEL, GCS_ACTOR_CHANNEL, PythonGetLogBatchLines, WORKER_EXIT_TYPE_USER_ERROR, @@ -2949,15 +2948,6 @@ cdef class GcsPublisher: with nogil: check_status(self.inner.get().PublishLogs(c_job_id, log_batch)) - def publish_function_key(self, key: bytes): - cdef: - CPythonFunction python_function - - python_function.set_key(key) - - with nogil: - check_status(self.inner.get().PublishFunctionKey(python_function)) - cdef class _GcsSubscriber: """Cython wrapper class of C++ `ray::gcs::PythonGcsSubscriber`.""" diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index a0900b38efca..fd4a13715c9b 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -450,8 +450,6 @@ cdef extern from "ray/gcs/pubsub/gcs_pub_sub.h" nogil: CRayStatus PublishLogs(const c_string &key_id, const CLogBatch &data) - CRayStatus PublishFunctionKey(const CPythonFunction& python_function) - cdef cppclass CPythonGcsSubscriber "ray::gcs::PythonGcsSubscriber": CPythonGcsSubscriber( @@ -489,8 +487,6 @@ cdef extern from "src/ray/protobuf/gcs.pb.h" nogil: cdef enum CChannelType "ray::rpc::ChannelType": RAY_ERROR_INFO_CHANNEL "ray::rpc::ChannelType::RAY_ERROR_INFO_CHANNEL", RAY_LOG_CHANNEL "ray::rpc::ChannelType::RAY_LOG_CHANNEL", - RAY_PYTHON_FUNCTION_CHANNEL \ - "ray::rpc::ChannelType::RAY_PYTHON_FUNCTION_CHANNEL", GCS_ACTOR_CHANNEL "ray::rpc::ChannelType::GCS_ACTOR_CHANNEL", cdef cppclass CJobConfig "ray::rpc::JobConfig": diff --git a/src/ray/gcs/gcs_server/gcs_server.cc b/src/ray/gcs/gcs_server/gcs_server.cc index 9c961174b759..ef1b347e8b8c 100644 --- a/src/ray/gcs/gcs_server/gcs_server.cc +++ b/src/ray/gcs/gcs_server/gcs_server.cc @@ -115,7 +115,6 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config, rpc::ChannelType::GCS_WORKER_DELTA_CHANNEL, rpc::ChannelType::RAY_ERROR_INFO_CHANNEL, rpc::ChannelType::RAY_LOG_CHANNEL, - rpc::ChannelType::RAY_PYTHON_FUNCTION_CHANNEL, rpc::ChannelType::RAY_NODE_RESOURCE_USAGE_CHANNEL, }, /*periodical_runner=*/&pubsub_periodical_runner_, diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.cc b/src/ray/gcs/pubsub/gcs_pub_sub.cc index 822be749a2e2..3b0a1d370705 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.cc +++ b/src/ray/gcs/pubsub/gcs_pub_sub.cc @@ -286,15 +286,6 @@ Status PythonGcsPublisher::PublishLogs(const std::string &key_id, return DoPublishWithRetries(request, -1, -1); } -Status PythonGcsPublisher::PublishFunctionKey( - const rpc::PythonFunction &python_function) { - rpc::GcsPublishRequest request; - auto *message = request.add_pub_messages(); - message->set_channel_type(rpc::RAY_PYTHON_FUNCTION_CHANNEL); - message->mutable_python_function_message()->MergeFrom(python_function); - return DoPublishWithRetries(request, -1, -1); -} - PythonGcsSubscriber::PythonGcsSubscriber(const std::string &gcs_address, int gcs_port, rpc::ChannelType channel_type, diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index 4d93682a5b8f..e3db18511317 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -154,9 +154,6 @@ class RAY_EXPORT PythonGcsPublisher { /// Publish logs to GCS. Status PublishLogs(const std::string &key_id, const rpc::LogBatch &log_batch); - /// Publish a function key to GCS. - Status PublishFunctionKey(const rpc::PythonFunction &python_function); - private: Status DoPublishWithRetries(const rpc::GcsPublishRequest &request, int64_t num_retries, diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 9ac0aa30abad..c74bdf2aad6a 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -496,38 +496,11 @@ message StoredConfig { string config = 1; } -// A notification message about one node's resources being changed. -message NodeResourceChange { - // ID of the node whose resources have changed. - bytes node_id = 1; - // Labels of the updated resources and their latest capacities. - map updated_resources = 2; - // Labels of the resources that were deleted. - repeated string deleted_resources = 3; -} - message PubSubMessage { bytes id = 1; bytes data = 2; } -// A notification that the resource table has changed. -message ResourceUpdate { - oneof resource_change_or_data { - // The total resources on a node have changed. - NodeResourceChange change = 1; - // The available resources on a node have changed. - ResourcesData data = 2; - } -} - -message ResourceUsageBroadcastData { - // An incrementing sequence number, used for correcting network errors. - int64 seq_no = 1; - // The changes to the state of the cluster. - repeated ResourceUpdate batch = 2; -} - /////////////////////////////////////////////////////////////////////////////// /* Please do not modify/remove/change the following messages to maintain backwards compatibility in autoscaler. This is necessary to make sure we can diff --git a/src/ray/protobuf/pubsub.proto b/src/ray/protobuf/pubsub.proto index d4ab5675cd0f..4a09ee680c43 100644 --- a/src/ray/protobuf/pubsub.proto +++ b/src/ray/protobuf/pubsub.proto @@ -26,6 +26,7 @@ import "src/ray/protobuf/logging.proto"; /// For example, for pubsub channels that are used by core workers, /// they have the prefix WORKER_. enum ChannelType { + reserved 9; /// A channel for object eviction. WORKER_OBJECT_EVICTION = 0; /// A channel for ref removed. @@ -44,8 +45,6 @@ enum ChannelType { RAY_ERROR_INFO_CHANNEL = 7; /// A channel for logs from various Ray components. RAY_LOG_CHANNEL = 8; - /// A channel for keys to pickled python functions and actor classes. - RAY_PYTHON_FUNCTION_CHANNEL = 9; /// A channel for reporting node resource usage stats. RAY_NODE_RESOURCE_USAGE_CHANNEL = 10; } @@ -55,6 +54,7 @@ enum ChannelType { /// message PubMessage { + reserved 10, 14; /// Channel type for this publish message. ChannelType channel_type = 1; /// The key id (e.g., object id) in bytes. @@ -67,11 +67,9 @@ message PubMessage { ActorTableData actor_message = 7; JobTableData job_message = 8; GcsNodeInfo node_info_message = 9; - NodeResourceChange node_resource_message = 10; WorkerDeltaData worker_delta_message = 11; ErrorTableData error_info_message = 12; LogBatch log_batch_message = 13; - PythonFunction python_function_message = 14; NodeResourceUsage node_resource_usage_message = 15; // The message that indicates the given key id is not available anymore.