Skip to content

Commit

Permalink
[Core] Multi-tenancy: enable multi-tenancy by default (ray-project#10570
Browse files Browse the repository at this point in the history
)

* Add new job in Travis to enable multi-tenancy

* fix

* Update .bazelrc

* Update .travis.yml

* fix test_job_gc_with_detached_actor

* fix test_multiple_downstream_tasks

* fix lint

* Enable multi-tenancy by default

* Kill idle workers in FIFO order

* Update test

* minor update

* Address comments

* fix some cases

* fix test_remote_cancel

* Address comments

* fix after merge

* remove kill

* fix worker_pool_test

* fix java test timeout

* fix test_two_custom_resources

* Add a delay when killing idle workers

* fix test_worker_failure

* fix test_worker_failed again

* fix DisconnectWorker

* update test_worker_failed

* Revert some python tests

* lint

* address comments
  • Loading branch information
kfstorm authored Sep 30, 2020
1 parent f54f7b2 commit 3504391
Show file tree
Hide file tree
Showing 17 changed files with 205 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,17 @@ public RayConfig(Config config) {
codeSearchPath = Collections.emptyList();
}

boolean enableMultiTenancy = false;
boolean enableMultiTenancy;
if (config.hasPath("ray.raylet.config.enable_multi_tenancy")) {
enableMultiTenancy =
Boolean.valueOf(config.getString("ray.raylet.config.enable_multi_tenancy"));
} else {
String envString = System.getenv("RAY_ENABLE_MULTI_TENANCY");
if (StringUtils.isNotBlank(envString)) {
enableMultiTenancy = "1".equals(envString);
} else {
enableMultiTenancy = true; // Default value
}
}

if (!enableMultiTenancy) {
Expand Down
7 changes: 5 additions & 2 deletions python/ray/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,13 @@ def run_string_as_driver_nonblocking(driver_script):
return proc


def wait_for_num_actors(num_actors, timeout=10):
def wait_for_num_actors(num_actors, state=None, timeout=10):
start_time = time.time()
while time.time() - start_time < timeout:
if len(ray.actors()) >= num_actors:
if len([
_ for _ in ray.actors().values()
if state is None or _["State"] == state
]) >= num_actors:
return
time.sleep(0.1)
raise RayTestTimeoutException("Timed out while waiting for global state.")
Expand Down
16 changes: 15 additions & 1 deletion python/ray/tests/test_advanced_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import ray.cluster_utils
import ray.test_utils

from ray.test_utils import RayTestTimeoutException
from ray.test_utils import (
RayTestTimeoutException,
wait_for_condition,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -505,6 +508,17 @@ def test_two_custom_resources(ray_start_cluster):
})
ray.init(address=cluster.address)

@ray.remote
def foo():
# Sleep a while to emulate a slow operation. This is needed to make
# sure tasks are scheduled to different nodes.
time.sleep(0.1)
return ray.worker.global_worker.node.unique_id

# Make sure each node has at least one idle worker.
wait_for_condition(
lambda: len(set(ray.get([foo.remote() for _ in range(6)]))) == 2)

@ray.remote(resources={"CustomResource1": 1})
def f():
time.sleep(0.001)
Expand Down
6 changes: 5 additions & 1 deletion python/ray/tests/test_component_failures_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ def f(x):
time.sleep(0.1)
# Kill the workers as the tasks execute.
for pid in pids:
os.kill(pid, SIGKILL)
try:
os.kill(pid, SIGKILL)
except OSError:
# The process may have already exited due to worker capping.
pass
time.sleep(0.1)
# Make sure that we either get the object or we get an appropriate
# exception.
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,13 @@ def value(self):
return 1
_ = Actor.options(lifetime="detached", name="DetachedActor").remote()
# Make sure the actor is created before the driver exits.
ray.get(_.value.remote())
""".format(address)

p = run_string_as_driver_nonblocking(driver)
# Wait for actor to be created
wait_for_num_actors(1)
wait_for_num_actors(1, ray.gcs_utils.ActorTableData.ALIVE)

actor_table = ray.actors()
assert len(actor_table) == 1
Expand Down
30 changes: 13 additions & 17 deletions python/ray/tests/test_multi_tenancy.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ def get_workers():
# `ray.init(...)`, Raylet will start `num_cpus` Python workers for the driver.
def test_initial_workers(shutdown_only):
# `num_cpus` should be <=2 because a Travis CI machine only has 2 CPU cores
ray.init(
num_cpus=1,
include_dashboard=True,
_system_config={"enable_multi_tenancy": True})
ray.init(num_cpus=1, include_dashboard=True)
wait_for_condition(lambda: len(get_workers()) == 1)


Expand All @@ -46,7 +43,7 @@ def test_initial_workers(shutdown_only):
# different drivers were scheduled to the same worker process, that is, tasks
# of different jobs were not correctly isolated during execution.
def test_multi_drivers(shutdown_only):
info = ray.init(num_cpus=10, _system_config={"enable_multi_tenancy": True})
info = ray.init(num_cpus=10)

driver_code = """
import os
Expand Down Expand Up @@ -118,8 +115,7 @@ def test_worker_env(shutdown_only):
job_config=ray.job_config.JobConfig(worker_env={
"foo1": "bar1",
"foo2": "bar2"
}),
_system_config={"enable_multi_tenancy": True})
}))

@ray.remote
def get_env(key):
Expand All @@ -131,7 +127,7 @@ def get_env(key):

def test_worker_capping_kill_idle_workers(shutdown_only):
# Avoid starting initial workers by setting num_cpus to 0.
ray.init(num_cpus=0, _system_config={"enable_multi_tenancy": True})
ray.init(num_cpus=0)
assert len(get_workers()) == 0

@ray.remote(num_cpus=0)
Expand All @@ -157,16 +153,13 @@ def foo():
# Worker 3 runs a normal task
wait_for_condition(lambda: len(get_workers()) == 3)

ray.get(obj1)
# Worker 2 now becomes idle and should be killed
wait_for_condition(lambda: len(get_workers()) == 2)
ray.get(obj2)
# Worker 3 now becomes idle and should be killed
ray.get([obj1, obj2])
# Worker 2 and 3 now become idle and should be killed
wait_for_condition(lambda: len(get_workers()) == 1)


def test_worker_capping_run_many_small_tasks(shutdown_only):
ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True})
ray.init(num_cpus=2)

@ray.remote(num_cpus=0.5)
def foo():
Expand All @@ -188,7 +181,7 @@ def foo():


def test_worker_capping_run_chained_tasks(shutdown_only):
ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True})
ray.init(num_cpus=2)

@ray.remote(num_cpus=0.5)
def foo(x):
Expand All @@ -215,7 +208,7 @@ def foo(x):

def test_worker_capping_fifo(shutdown_only):
# Start 2 initial workers by setting num_cpus to 2.
info = ray.init(num_cpus=2, _system_config={"enable_multi_tenancy": True})
info = ray.init(num_cpus=2)
wait_for_condition(lambda: len(get_workers()) == 2)

time.sleep(1)
Expand All @@ -233,6 +226,7 @@ def getpid():

driver_code = """
import ray
import time
ray.init(address="{}")
Expand All @@ -241,6 +235,8 @@ def foo():
pass
ray.get(foo.remote())
# Sleep a while to make sure an idle worker exits before this driver exits.
time.sleep(2)
ray.shutdown()
""".format(info["redis_address"])

Expand All @@ -254,7 +250,7 @@ def foo():


def test_worker_registration_failure_after_driver_exit(shutdown_only):
info = ray.init(num_cpus=1, _system_config={"enable_multi_tenancy": True})
info = ray.init(num_cpus=1)

driver_code = """
import ray
Expand Down
6 changes: 5 additions & 1 deletion python/ray/tests/test_multinode_failures.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ def f(x):
time.sleep(0.1)
# Kill the workers as the tasks execute.
for pid in pids:
os.kill(pid, SIGKILL)
try:
os.kill(pid, SIGKILL)
except OSError:
# The process may have already exited due to worker capping.
pass
time.sleep(0.1)
# Make sure that we either get the object or we get an appropriate
# exception.
Expand Down
6 changes: 5 additions & 1 deletion python/ray/tests/test_reconstruction.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,11 @@ def dependent_task(x):
return

obj = large_object.options(resources={"node2": 1}).remote()
downstream = [chain.remote(obj) for _ in range(4)]
downstream = [
chain.options(resources={
"node1": 1
}).remote(obj) for _ in range(4)
]
for obj in downstream:
ray.get(dependent_task.options(resources={"node1": 1}).remote(obj))

Expand Down
11 changes: 10 additions & 1 deletion src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,16 @@ RAY_CONFIG(int64_t, max_resource_shapes_per_load_report, 100)
RAY_CONFIG(int64_t, gcs_server_request_timeout_seconds, 5)

/// Whether to enable multi tenancy features.
RAY_CONFIG(bool, enable_multi_tenancy, false)
RAY_CONFIG(bool, enable_multi_tenancy,
getenv("RAY_ENABLE_MULTI_TENANCY") == nullptr ||
getenv("RAY_ENABLE_MULTI_TENANCY") == std::string("1"))

/// The interval of periodic idle worker killing. A negative value means worker capping is
/// disabled.
RAY_CONFIG(int64_t, kill_idle_workers_interval_ms, 200)

/// The idle time threshold for an idle worker to be killed.
RAY_CONFIG(int64_t, idle_worker_killing_time_threshold_ms, 1000)

/// Whether start the Plasma Store as a Raylet thread.
RAY_CONFIG(bool, ownership_based_object_directory_enabled, false)
Expand Down
6 changes: 6 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2256,6 +2256,12 @@ void CoreWorker::HandleRestoreSpilledObjects(
}
}

void CoreWorker::HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply,
rpc::SendReplyCallback send_reply_callback) {
send_reply_callback(Status::OK(), nullptr, nullptr);
Exit(/*intentional=*/true);
}

void CoreWorker::YieldCurrentFiber(FiberEvent &event) {
RAY_CHECK(worker_context_.CurrentActorIsAsync());
boost::this_fiber::yield();
Expand Down
4 changes: 4 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
rpc::RestoreSpilledObjectsReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

// Make the this worker exit.
void HandleExit(const rpc::ExitRequest &request, rpc::ExitReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

///
/// Public methods related to async actor call. This should only be used when
/// the actor is (1) direct actor and (2) using asyncio mode.
Expand Down
8 changes: 8 additions & 0 deletions src/ray/protobuf/core_worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,12 @@ message RestoreSpilledObjectsRequest {
message RestoreSpilledObjectsReply {
}

message ExitRequest {
}

message ExitReply {
}

service CoreWorkerService {
// Push a task directly to this worker from another.
rpc PushTask(PushTaskRequest) returns (PushTaskReply);
Expand Down Expand Up @@ -357,4 +363,6 @@ service CoreWorkerService {
returns (RestoreSpilledObjectsReply);
// Notification from raylet that an object ID is available in local plasma.
rpc PlasmaObjectReady(PlasmaObjectReadyRequest) returns (PlasmaObjectReadyReply);
// Request for a worker to exit.
rpc Exit(ExitRequest) returns (ExitReply);
}
5 changes: 0 additions & 5 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1370,11 +1370,6 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr<WorkerInterface> &
// Call task dispatch to assign work to the new worker.
DispatchTasks(local_queues_.GetReadyTasksByClass());
}
if (RayConfig::instance().enable_multi_tenancy()) {
// We trigger killing here instead of inside `Worker::PushWorker` because we
// only kill an idle worker if it remains idle after scheduling.
worker_pool_.TryKillingIdleWorkers();
}
}

void NodeManager::ProcessDisconnectClientMessage(
Expand Down
Loading

0 comments on commit 3504391

Please sign in to comment.