Skip to content

Commit

Permalink
[Core] Remove dead code (ray-project#42600)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiajun Yao <[email protected]>
  • Loading branch information
jjyao authored Jan 24, 2024
1 parent 93ee64c commit f57feac
Show file tree
Hide file tree
Showing 9 changed files with 2 additions and 81 deletions.
4 changes: 0 additions & 4 deletions python/ray/_private/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,6 @@ def export_key(self, key):
> 0
):
break
# Notify all subscribers that there is a new function exported. Note
# that the notification doesn't include any actual data.
# TODO(mwtian) implement per-job notification here.
self._worker.gcs_publisher.publish_function_key(key)

def export_setup_func(
self, setup_func: Callable, timeout: Optional[int] = None
Expand Down
19 changes: 0 additions & 19 deletions python/ray/_private/gcs_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import ray._private.gcs_utils as gcs_utils
import ray._private.logging_utils as logging_utils
from ray.core.generated.gcs_pb2 import ErrorTableData
from ray.core.generated import dependency_pb2
from ray.core.generated import gcs_service_pb2_grpc
from ray.core.generated import gcs_service_pb2
from ray.core.generated import common_pb2
Expand All @@ -41,17 +40,6 @@ def _create_log_request(log_json: dict):
]
)

@staticmethod
def _create_function_key_request(key: bytes):
return gcs_service_pb2.GcsPublishRequest(
pub_messages=[
pubsub_pb2.PubMessage(
channel_type=pubsub_pb2.RAY_PYTHON_FUNCTION_CHANNEL,
python_function_message=dependency_pb2.PythonFunction(key=key),
)
]
)

@staticmethod
def _create_node_resource_usage_request(key: str, json: str):
return gcs_service_pb2.GcsPublishRequest(
Expand Down Expand Up @@ -130,13 +118,6 @@ def _pop_log_batch(queue):
msg = queue.popleft()
return logging_utils.log_batch_proto_to_dict(msg.log_batch_message)

@staticmethod
def _pop_function_key(queue):
if len(queue) == 0:
return None
msg = queue.popleft()
return msg.python_function_message.key

@staticmethod
def _pop_resource_usage(queue):
if len(queue) == 0:
Expand Down
10 changes: 0 additions & 10 deletions python/ray/_raylet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ from ray.includes.common cimport (
CChannelType,
RAY_ERROR_INFO_CHANNEL,
RAY_LOG_CHANNEL,
RAY_PYTHON_FUNCTION_CHANNEL,
GCS_ACTOR_CHANNEL,
PythonGetLogBatchLines,
WORKER_EXIT_TYPE_USER_ERROR,
Expand Down Expand Up @@ -2949,15 +2948,6 @@ cdef class GcsPublisher:
with nogil:
check_status(self.inner.get().PublishLogs(c_job_id, log_batch))

def publish_function_key(self, key: bytes):
cdef:
CPythonFunction python_function

python_function.set_key(key)

with nogil:
check_status(self.inner.get().PublishFunctionKey(python_function))


cdef class _GcsSubscriber:
"""Cython wrapper class of C++ `ray::gcs::PythonGcsSubscriber`."""
Expand Down
4 changes: 0 additions & 4 deletions python/ray/includes/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -450,8 +450,6 @@ cdef extern from "ray/gcs/pubsub/gcs_pub_sub.h" nogil:

CRayStatus PublishLogs(const c_string &key_id, const CLogBatch &data)

CRayStatus PublishFunctionKey(const CPythonFunction& python_function)

cdef cppclass CPythonGcsSubscriber "ray::gcs::PythonGcsSubscriber":

CPythonGcsSubscriber(
Expand Down Expand Up @@ -489,8 +487,6 @@ cdef extern from "src/ray/protobuf/gcs.pb.h" nogil:
cdef enum CChannelType "ray::rpc::ChannelType":
RAY_ERROR_INFO_CHANNEL "ray::rpc::ChannelType::RAY_ERROR_INFO_CHANNEL",
RAY_LOG_CHANNEL "ray::rpc::ChannelType::RAY_LOG_CHANNEL",
RAY_PYTHON_FUNCTION_CHANNEL \
"ray::rpc::ChannelType::RAY_PYTHON_FUNCTION_CHANNEL",
GCS_ACTOR_CHANNEL "ray::rpc::ChannelType::GCS_ACTOR_CHANNEL",

cdef cppclass CJobConfig "ray::rpc::JobConfig":
Expand Down
1 change: 0 additions & 1 deletion src/ray/gcs/gcs_server/gcs_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ GcsServer::GcsServer(const ray::gcs::GcsServerConfig &config,
rpc::ChannelType::GCS_WORKER_DELTA_CHANNEL,
rpc::ChannelType::RAY_ERROR_INFO_CHANNEL,
rpc::ChannelType::RAY_LOG_CHANNEL,
rpc::ChannelType::RAY_PYTHON_FUNCTION_CHANNEL,
rpc::ChannelType::RAY_NODE_RESOURCE_USAGE_CHANNEL,
},
/*periodical_runner=*/&pubsub_periodical_runner_,
Expand Down
9 changes: 0 additions & 9 deletions src/ray/gcs/pubsub/gcs_pub_sub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,15 +286,6 @@ Status PythonGcsPublisher::PublishLogs(const std::string &key_id,
return DoPublishWithRetries(request, -1, -1);
}

Status PythonGcsPublisher::PublishFunctionKey(
const rpc::PythonFunction &python_function) {
rpc::GcsPublishRequest request;
auto *message = request.add_pub_messages();
message->set_channel_type(rpc::RAY_PYTHON_FUNCTION_CHANNEL);
message->mutable_python_function_message()->MergeFrom(python_function);
return DoPublishWithRetries(request, -1, -1);
}

PythonGcsSubscriber::PythonGcsSubscriber(const std::string &gcs_address,
int gcs_port,
rpc::ChannelType channel_type,
Expand Down
3 changes: 0 additions & 3 deletions src/ray/gcs/pubsub/gcs_pub_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,6 @@ class RAY_EXPORT PythonGcsPublisher {
/// Publish logs to GCS.
Status PublishLogs(const std::string &key_id, const rpc::LogBatch &log_batch);

/// Publish a function key to GCS.
Status PublishFunctionKey(const rpc::PythonFunction &python_function);

private:
Status DoPublishWithRetries(const rpc::GcsPublishRequest &request,
int64_t num_retries,
Expand Down
27 changes: 0 additions & 27 deletions src/ray/protobuf/gcs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -496,38 +496,11 @@ message StoredConfig {
string config = 1;
}

// A notification message about one node's resources being changed.
message NodeResourceChange {
// ID of the node whose resources have changed.
bytes node_id = 1;
// Labels of the updated resources and their latest capacities.
map<string, double> updated_resources = 2;
// Labels of the resources that were deleted.
repeated string deleted_resources = 3;
}

message PubSubMessage {
bytes id = 1;
bytes data = 2;
}

// A notification that the resource table has changed.
message ResourceUpdate {
oneof resource_change_or_data {
// The total resources on a node have changed.
NodeResourceChange change = 1;
// The available resources on a node have changed.
ResourcesData data = 2;
}
}

message ResourceUsageBroadcastData {
// An incrementing sequence number, used for correcting network errors.
int64 seq_no = 1;
// The changes to the state of the cluster.
repeated ResourceUpdate batch = 2;
}

///////////////////////////////////////////////////////////////////////////////
/* Please do not modify/remove/change the following messages to maintain
backwards compatibility in autoscaler. This is necessary to make sure we can
Expand Down
6 changes: 2 additions & 4 deletions src/ray/protobuf/pubsub.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import "src/ray/protobuf/logging.proto";
/// For example, for pubsub channels that are used by core workers,
/// they have the prefix WORKER_.
enum ChannelType {
reserved 9;
/// A channel for object eviction.
WORKER_OBJECT_EVICTION = 0;
/// A channel for ref removed.
Expand All @@ -44,8 +45,6 @@ enum ChannelType {
RAY_ERROR_INFO_CHANNEL = 7;
/// A channel for logs from various Ray components.
RAY_LOG_CHANNEL = 8;
/// A channel for keys to pickled python functions and actor classes.
RAY_PYTHON_FUNCTION_CHANNEL = 9;
/// A channel for reporting node resource usage stats.
RAY_NODE_RESOURCE_USAGE_CHANNEL = 10;
}
Expand All @@ -55,6 +54,7 @@ enum ChannelType {
///

message PubMessage {
reserved 10, 14;
/// Channel type for this publish message.
ChannelType channel_type = 1;
/// The key id (e.g., object id) in bytes.
Expand All @@ -67,11 +67,9 @@ message PubMessage {
ActorTableData actor_message = 7;
JobTableData job_message = 8;
GcsNodeInfo node_info_message = 9;
NodeResourceChange node_resource_message = 10;
WorkerDeltaData worker_delta_message = 11;
ErrorTableData error_info_message = 12;
LogBatch log_batch_message = 13;
PythonFunction python_function_message = 14;
NodeResourceUsage node_resource_usage_message = 15;

// The message that indicates the given key id is not available anymore.
Expand Down

0 comments on commit f57feac

Please sign in to comment.