Skip to content

Commit

Permalink
Various Python code cleanups. (ray-project#3837)
Browse files Browse the repository at this point in the history
  • Loading branch information
suquark authored and robertnishihara committed Feb 3, 2019
1 parent a1bcd2a commit 9295ab8
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 258 deletions.
19 changes: 3 additions & 16 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def save_and_log_checkpoint(worker, actor):
Args:
worker: The worker to use to log errors.
actor: The actor to checkpoint.
checkpoint_index: The number of tasks that have executed so far.
"""
try:
actor.__ray_checkpoint__()
Expand All @@ -113,11 +112,7 @@ def save_and_log_checkpoint(worker, actor):
worker,
ray_constants.CHECKPOINT_PUSH_ERROR,
traceback_str,
driver_id=worker.task_driver_id,
data={
"actor_class": actor.__class__.__name__,
"function_name": actor.__ray_checkpoint__.__name__
})
driver_id=worker.task_driver_id)


def restore_and_log_checkpoint(worker, actor):
Expand All @@ -137,11 +132,7 @@ def restore_and_log_checkpoint(worker, actor):
worker,
ray_constants.CHECKPOINT_PUSH_ERROR,
traceback_str,
driver_id=worker.task_driver_id,
data={
"actor_class": actor.__class__.__name__,
"function_name": actor.__ray_checkpoint_restore__.__name__
})
driver_id=worker.task_driver_id)
return checkpoint_resumed


Expand Down Expand Up @@ -550,11 +541,7 @@ def _actor_method_call(self,
method_name: The name of the actor method to execute.
args: A list of arguments for the actor method.
kwargs: A dictionary of keyword arguments for the actor method.
dependency: The object ID that this method is dependent on.
Defaults to None, for no dependencies. Most tasks should
pass in the dummy object returned by the preceding task.
Some tasks, such as checkpoint and terminate methods, have
no dependencies.
num_return_vals (int): The number of return values for the method.
Returns:
object_ids: A list of object IDs returned by the remote actor
Expand Down
2 changes: 1 addition & 1 deletion python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def _initialize_global_state(self,
time.sleep(1)
continue
num_redis_shards = int(num_redis_shards)
if (num_redis_shards < 1):
if num_redis_shards < 1:
raise Exception("Expected at least one Redis shard, found "
"{}.".format(num_redis_shards))

Expand Down
22 changes: 8 additions & 14 deletions python/ray/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,12 +403,10 @@ def f():
push_error_to_driver(
self._worker,
ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR,
traceback_str,
driver_id=driver_id,
data={
"function_id": function_id.binary(),
"function_name": function_name
})
"Failed to unpickle the remote function '{}' with function ID "
"{}. Traceback:\n{}".format(function_name, function_id.hex(),
traceback_str),
driver_id=driver_id)
else:
# The below line is necessary. Because in the driver process,
# if the function is defined in the file where the python script
Expand Down Expand Up @@ -506,7 +504,6 @@ def _publish_actor_class_to_key(self, key, actor_class_info):
Args:
key: The key to store the actor class info at.
actor_class_info: Information about the actor class.
worker: The worker to use to connect to Redis.
"""
# We set the driver ID here because it may not have been available when
# the actor class was defined.
Expand Down Expand Up @@ -577,7 +574,6 @@ def fetch_and_register_actor(self, actor_class_key):
Args:
actor_class_key: The key in Redis to use to fetch the actor.
worker: The worker to use.
"""
actor_id = self._worker.actor_id
(driver_id_str, class_name, module, pickled_class, checkpoint_interval,
Expand Down Expand Up @@ -642,11 +638,10 @@ def temporary_actor_method(*xs):
traceback.format_exc())
# Log the error message.
push_error_to_driver(
self._worker,
ray_constants.REGISTER_ACTOR_PUSH_ERROR,
traceback_str,
driver_id,
data={"actor_id": actor_id.binary()})
self._worker, ray_constants.REGISTER_ACTOR_PUSH_ERROR,
"Failed to unpickle actor class '{}' for actor ID {}. "
"Traceback:\n{}".format(class_name, actor_id.hex(),
traceback_str), driver_id)
# TODO(rkn): In the future, it might make sense to have the worker
# exit here. However, currently that would lead to hanging if
# someone calls ray.get on a method invoked on the actor.
Expand Down Expand Up @@ -680,7 +675,6 @@ def _make_actor_method_executor(self, method_name, method, actor_imported):
necessary checkpointing operations.
Args:
worker (Worker): The worker that is executing the actor.
method_name (str): The name of the actor method.
method (instancemethod): The actor method to wrap. This should be a
method defined on the actor class and should therefore take an
Expand Down
5 changes: 1 addition & 4 deletions python/ray/import_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,8 @@ def fetch_and_execute_function_to_run(self, key):
# the traceback and notify the scheduler of the failure.
traceback_str = traceback.format_exc()
# Log the error message.
name = function.__name__ if ("function" in locals() and hasattr(
function, "__name__")) else ""
utils.push_error_to_driver(
self.worker,
ray_constants.FUNCTION_TO_RUN_PUSH_ERROR,
traceback_str,
driver_id=ray.DriverID(driver_id),
data={"name": name})
driver_id=ray.DriverID(driver_id))
2 changes: 0 additions & 2 deletions python/ray/log_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ class LogMonitor(object):
process is running on. This will be used to determine which log
files to track.
redis_client: A client used to communicate with the Redis server.
log_filenames: A list of the names of the log files that this monitor
process is monitoring.
log_files: A dictionary mapping the name of a log file to a list of
strings representing its contents.
log_file_handles: A dictionary mapping the name of a log file to a file
Expand Down
2 changes: 0 additions & 2 deletions python/ray/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ def process_messages(self, max_messages=10000):
data = message["data"]

# Determine the appropriate message handler.
message_handler = None
if channel == ray.gcs_utils.XRAY_HEARTBEAT_BATCH_CHANNEL:
# Similar functionality as local scheduler info channel
message_handler = self.xray_heartbeat_batch_handler
Expand All @@ -250,7 +249,6 @@ def process_messages(self, max_messages=10000):
raise Exception("This code should be unreachable.")

# Call the handler.
assert (message_handler is not None)
message_handler(channel, data)

def update_local_scheduler_map(self):
Expand Down
Empty file removed python/ray/plasma/.gitkeep
Empty file.
4 changes: 2 additions & 2 deletions python/ray/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def flush_profile_data(self):
timeline until after the task has completed. For very long-running
tasks, we may want profiling information to appear more quickly.
In such cases, this function can be called. Note that as an
aalternative, we could start thread in the background on workers that
alternative, we could start a thread in the background on workers that
calls this automatically.
"""
with self.lock:
Expand All @@ -137,7 +137,7 @@ class RayLogSpanRaylet(object):
Attributes:
event_type (str): The type of the event being logged.
contents: Additional information to log.
extra_data: Additional information to log.
"""

def __init__(self, profiler, event_type, extra_data=None):
Expand Down
Loading

0 comments on commit 9295ab8

Please sign in to comment.