Skip to content

Commit

Permalink
Don't redirect worker output to log files if redirect_output=False. (r…
Browse files Browse the repository at this point in the history
…ay-project#873)

* Don't redirect worker output to log files if redirect_output=False.

* Fix, handle case where RedirectOutput key is not in Redis.
  • Loading branch information
robertnishihara authored and pcmoritz committed Aug 27, 2017
1 parent 617bc4d commit d43a435
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 21 deletions.
10 changes: 7 additions & 3 deletions python/ray/experimental/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,10 +844,14 @@ def workers(self):
"plasma_manager_socket": (worker_info[b"plasma_manager_socket"]
.decode("ascii")),
"plasma_store_socket": (worker_info[b"plasma_store_socket"]
.decode("ascii")),
"stderr_file": worker_info[b"stderr_file"].decode("ascii"),
"stdout_file": worker_info[b"stdout_file"].decode("ascii")
.decode("ascii"))
}
if b"stderr_file" in worker_info:
workers_data[worker_id]["stderr_file"] = (
worker_info[b"stderr_file"].decode("ascii"))
if b"stdout_file" in worker_info:
workers_data[worker_id]["stdout_file"] = (
worker_info[b"stdout_file"].decode("ascii"))
return workers_data

def actors(self):
Expand Down
13 changes: 12 additions & 1 deletion python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def start_redis(node_ip_address,
port=None,
num_redis_shards=1,
redirect_output=False,
redirect_worker_output=False,
cleanup=True):
"""Start the Redis global state store.
Expand All @@ -259,6 +260,11 @@ def start_redis(node_ip_address,
num_redis_shards (int): If provided, the number of Redis shards to
start, in addition to the primary one. The default value is one
shard.
redirect_output (bool): True if output should be redirected to a file
and false otherwise.
redirect_worker_output (bool): True if worker output should be
redirected to a file and false otherwise. Workers will have access
to this value when they start up.
cleanup (bool): True if using Ray in local mode. If cleanup is true,
then all Redis processes started by this method will be killed by
services.cleanup() when the Python process that imported services
Expand All @@ -284,6 +290,10 @@ def start_redis(node_ip_address,
redis_client = redis.StrictRedis(host=node_ip_address, port=port)
redis_client.set("NumRedisShards", str(num_redis_shards))

# Put the redirect_worker_output bool in the Redis shard so that workers
# can access it and know whether or not to redirect their output.
redis_client.set("RedirectOutput", 1 if redirect_worker_output else 0)

# Start other Redis shards listening on random ports. Each Redis shard logs
# to a separate file, prefixed by "redis-<shard number>".
redis_shards = []
Expand Down Expand Up @@ -847,7 +857,8 @@ def start_ray_processes(address_info=None,
redis_address, redis_shards = start_redis(
node_ip_address, port=redis_port,
num_redis_shards=num_redis_shards,
redirect_output=redirect_output, cleanup=cleanup)
redirect_output=redirect_output,
redirect_worker_output=redirect_output, cleanup=cleanup)
address_info["redis_address"] = redis_address
time.sleep(0.1)

Expand Down
46 changes: 29 additions & 17 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1625,15 +1625,6 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
worker.actor_id = actor_id
worker.connected = True
worker.set_mode(mode)
# Redirect worker output and error to their own files.
if mode == WORKER_MODE:
log_stdout_file, log_stderr_file = services.new_log_files("worker",
True)
sys.stdout = log_stdout_file
sys.stderr = log_stderr_file
services.record_log_files_in_redis(info["redis_address"],
info["node_ip_address"],
[log_stdout_file, log_stderr_file])
# The worker.events field is used to aggregate logging information and
# display it in the web UI. Note that Python lists protected by the GIL,
# which is important because we will append to this field from multiple
Expand All @@ -1652,6 +1643,26 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
port=int(redis_port))
worker.lock = threading.Lock()

# Check the RedirectOutput key in Redis and based on its value redirect
# worker output and error to their own files.
if mode == WORKER_MODE:
# This key is set in services.py when Redis is started.
redirect_worker_output_val = worker.redis_client.get("RedirectOutput")
if (redirect_worker_output_val is not None and
int(redirect_worker_output_val) == 1):
redirect_worker_output = 1
else:
redirect_worker_output = 0
if redirect_worker_output:
log_stdout_file, log_stderr_file = services.new_log_files("worker",
True)
sys.stdout = log_stdout_file
sys.stderr = log_stderr_file
services.record_log_files_in_redis(info["redis_address"],
info["node_ip_address"],
[log_stdout_file,
log_stderr_file])

# Create an object for interfacing with the global state.
global_state._initialize_global_state(redis_ip_address, int(redis_port))

Expand All @@ -1673,14 +1684,15 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker,
is_worker = False
elif mode == WORKER_MODE:
# Register the worker with Redis.
worker.redis_client.hmset(
b"Workers:" + worker.worker_id,
{"node_ip_address": worker.node_ip_address,
"stdout_file": os.path.abspath(log_stdout_file.name),
"stderr_file": os.path.abspath(log_stderr_file.name),
"plasma_store_socket": info["store_socket_name"],
"plasma_manager_socket": info["manager_socket_name"],
"local_scheduler_socket": info["local_scheduler_socket_name"]})
worker_dict = {
"node_ip_address": worker.node_ip_address,
"plasma_store_socket": info["store_socket_name"],
"plasma_manager_socket": info["manager_socket_name"],
"local_scheduler_socket": info["local_scheduler_socket_name"]}
if redirect_worker_output:
worker_dict["stdout_file"] = os.path.abspath(log_stdout_file.name)
worker_dict["stderr_file"] = os.path.abspath(log_stderr_file.name)
worker.redis_client.hmset(b"Workers:" + worker.worker_id, worker_dict)
is_worker = True
else:
raise Exception("This code should be unreachable.")
Expand Down

0 comments on commit d43a435

Please sign in to comment.