Skip to content

Commit

Permalink
[core] add retriable fifo policy (ray-project#33514)
Browse files Browse the repository at this point in the history
For long-living, memory leaking actors, it is more desirable to kill oldest task that is leaking the most. This avoid the situation where we constantly kill actor, which may lead to side effects where we generate a lot of log files, or trigger increased memory consumption in gcs / dashboard

This fixes the test failure introduced in ray-project#33430

Adding sleep to give time for memory monitor to kick in
Also increasing the memory limit since the node may be using a lot of memory in the first place
  • Loading branch information
clarng authored Mar 21, 2023
1 parent 2c18614 commit 99ddbe4
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 0 deletions.
15 changes: 15 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1709,6 +1709,21 @@ cc_test(
],
)

cc_test(
name = "worker_killing_policy_retriable_fifo_test",
size = "small",
srcs = [
"src/ray/raylet/worker_killing_policy_retriable_fifo_test.cc",
],
copts = COPTS,
tags = ["team:core"],
deps = [
":ray_common",
":raylet_lib",
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "pubsub_integration_test",
size = "small",
Expand Down
43 changes: 43 additions & 0 deletions python/ray/tests/test_memory_pressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import numpy as np
from ray._private.utils import get_system_memory
from ray._private.utils import get_used_memory

from ray.experimental.state.api import list_tasks
from ray.experimental.state.state_manager import StateDataSourceClient

Expand Down Expand Up @@ -513,5 +514,47 @@ def infinite_retry_task():
)


@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="memory monitor only on linux currently",
)
def test_one_actor_max_fifo_kill_previous_actor(shutdown_only):
with ray.init(
_system_config={
"worker_killing_policy": "retriable_fifo",
"memory_usage_threshold": 0.7,
},
):
bytes_to_alloc = get_additional_bytes_to_reach_memory_usage_pct(0.5)

first_actor = Leaker.options(name="first_actor").remote()
ray.get(first_actor.allocate.remote(bytes_to_alloc))

actors = ray.util.list_named_actors()
assert len(actors) == 1
assert "first_actor" in actors

second_actor = Leaker.options(name="second_actor").remote()
ray.get(
second_actor.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3)
)

actors = ray.util.list_named_actors()
assert len(actors) == 1, actors
assert "first_actor" not in actors
assert "second_actor" in actors

third_actor = Leaker.options(name="third_actor").remote()
ray.get(
third_actor.allocate.remote(bytes_to_alloc, memory_monitor_refresh_ms * 3)
)

actors = ray.util.list_named_actors()
assert len(actors) == 1
assert "first_actor" not in actors
assert "second_actor" not in actors
assert "third_actor" in actors


if __name__ == "__main__":
sys.exit(pytest.main(["-sv", __file__]))
1 change: 1 addition & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ RAY_CONFIG(uint64_t, task_oom_retries, -1)
/// The worker killing policy to use, available options are
/// group_by_owner
/// retriable_lifo
/// retriable_fifo
RAY_CONFIG(std::string, worker_killing_policy, "group_by_owner")

/// If the raylet fails to get agent info, we will retry after this interval.
Expand Down
4 changes: 4 additions & 0 deletions src/ray/raylet/worker_killing_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "ray/common/asio/periodical_runner.h"
#include "ray/raylet/worker.h"
#include "ray/raylet/worker_killing_policy_group_by_owner.h"
#include "ray/raylet/worker_killing_policy_retriable_fifo.h"
#include "ray/raylet/worker_pool.h"

namespace ray {
Expand Down Expand Up @@ -99,6 +100,9 @@ std::shared_ptr<WorkerKillingPolicy> CreateWorkerKillingPolicy(
} else if (killing_policy_str == kGroupByOwner) {
RAY_LOG(INFO) << "Running GroupByOwner policy.";
return std::make_shared<GroupByOwnerIdWorkerKillingPolicy>();
} else if (killing_policy_str == kFifoPolicy) {
RAY_LOG(INFO) << "Running RetriableFIFO policy.";
return std::make_shared<RetriableFIFOWorkerKillingPolicy>();
} else {
RAY_LOG(ERROR)
<< killing_policy_str
Expand Down
1 change: 1 addition & 0 deletions src/ray/raylet/worker_killing_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace raylet {

constexpr char kLifoPolicy[] = "retriable_lifo";
constexpr char kGroupByOwner[] = "group_by_owner";
constexpr char kFifoPolicy[] = "retriable_fifo";

/// Provides the policy on which worker to prioritize killing.
class WorkerKillingPolicy {
Expand Down
72 changes: 72 additions & 0 deletions src/ray/raylet/worker_killing_policy_retriable_fifo.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/raylet/worker_killing_policy_retriable_fifo.h"

#include <gtest/gtest_prod.h>

#include <boost/container_hash/hash.hpp>
#include <unordered_map>

#include "absl/container/flat_hash_map.h"
#include "absl/time/time.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/asio/periodical_runner.h"
#include "ray/raylet/worker.h"
#include "ray/raylet/worker_killing_policy.h"
#include "ray/raylet/worker_pool.h"

namespace ray {

namespace raylet {

RetriableFIFOWorkerKillingPolicy::RetriableFIFOWorkerKillingPolicy() {}

const std::pair<std::shared_ptr<WorkerInterface>, bool>
RetriableFIFOWorkerKillingPolicy::SelectWorkerToKill(
const std::vector<std::shared_ptr<WorkerInterface>> &workers,
const MemorySnapshot &system_memory) const {
if (workers.empty()) {
RAY_LOG_EVERY_MS(INFO, 5000) << "Worker list is empty. Nothing can be killed";
return std::make_pair(nullptr, /*should retry*/ false);
}

std::vector<std::shared_ptr<WorkerInterface>> sorted = workers;

std::sort(sorted.begin(),
sorted.end(),
[](std::shared_ptr<WorkerInterface> const &left,
std::shared_ptr<WorkerInterface> const &right) -> bool {
// First sort by retriable tasks and then by task time in ascending order.
int left_retriable =
left->GetAssignedTask().GetTaskSpecification().IsRetriable() ? 0 : 1;
int right_retriable =
right->GetAssignedTask().GetTaskSpecification().IsRetriable() ? 0 : 1;
if (left_retriable == right_retriable) {
return left->GetAssignedTaskTime() < right->GetAssignedTaskTime();
}
return left_retriable < right_retriable;
});

const static int32_t max_to_print = 10;
RAY_LOG(INFO) << "The top 10 workers to be killed based on the worker killing policy:\n"
<< WorkerKillingPolicy::WorkersDebugString(
sorted, max_to_print, system_memory);

return std::make_pair(sorted.front(), /*should retry*/ true);
}

} // namespace raylet

} // namespace ray
41 changes: 41 additions & 0 deletions src/ray/raylet/worker_killing_policy_retriable_fifo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <gtest/gtest_prod.h>

#include "absl/container/flat_hash_set.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "ray/common/memory_monitor.h"
#include "ray/raylet/worker.h"
#include "ray/raylet/worker_killing_policy.h"

namespace ray {

namespace raylet {

/// Prefers killing retriable workers over non-retriable ones, then in FIFO order.
class RetriableFIFOWorkerKillingPolicy : public WorkerKillingPolicy {
public:
RetriableFIFOWorkerKillingPolicy();
const std::pair<std::shared_ptr<WorkerInterface>, bool> SelectWorkerToKill(
const std::vector<std::shared_ptr<WorkerInterface>> &workers,
const MemorySnapshot &system_memory) const;
};

} // namespace raylet

} // namespace ray
108 changes: 108 additions & 0 deletions src/ray/raylet/worker_killing_policy_retriable_fifo_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2022 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "ray/raylet/worker_killing_policy_retriable_fifo.h"

#include "gtest/gtest.h"
#include "ray/common/task/task_spec.h"
#include "ray/raylet/test/util.h"
#include "ray/raylet/worker_killing_policy.h"

namespace ray {

namespace raylet {

class WorkerKillerTest : public ::testing::Test {
protected:
int32_t port_ = 2389;
RetriableFIFOWorkerKillingPolicy worker_killing_policy_;

std::shared_ptr<WorkerInterface> CreateActorCreationWorker(int32_t max_restarts) {
rpc::TaskSpec message;
message.mutable_actor_creation_task_spec()->set_max_actor_restarts(max_restarts);
message.set_type(ray::rpc::TaskType::ACTOR_CREATION_TASK);
TaskSpecification task_spec(message);
RayTask task(task_spec);
auto worker = std::make_shared<MockWorker>(ray::WorkerID::FromRandom(), port_);
worker->SetAssignedTask(task);
return worker;
}

std::shared_ptr<WorkerInterface> CreateTaskWorker(int32_t max_retries) {
rpc::TaskSpec message;
message.set_max_retries(max_retries);
message.set_type(ray::rpc::TaskType::NORMAL_TASK);
TaskSpecification task_spec(message);
RayTask task(task_spec);
auto worker = std::make_shared<MockWorker>(ray::WorkerID::FromRandom(), port_);
worker->SetAssignedTask(task);
return worker;
}
};

TEST_F(WorkerKillerTest, TestEmptyWorkerPoolSelectsNullWorker) {
std::vector<std::shared_ptr<WorkerInterface>> workers;
auto worker_to_kill_and_should_retry =
worker_killing_policy_.SelectWorkerToKill(workers, MemorySnapshot());
auto worker_to_kill = worker_to_kill_and_should_retry.first;
ASSERT_TRUE(worker_to_kill == nullptr);
}

TEST_F(WorkerKillerTest,
TestPreferRetriableOverNonRetriableAndOrderByTimestampAscending) {
std::vector<std::shared_ptr<WorkerInterface>> workers;
auto first_submitted =
WorkerKillerTest::CreateActorCreationWorker(0 /* max_restarts */);
auto second_submitted =
WorkerKillerTest::CreateActorCreationWorker(5 /* max_restarts */);
auto third_submitted = WorkerKillerTest::CreateTaskWorker(0 /* max_restarts */);
auto fourth_submitted = WorkerKillerTest::CreateTaskWorker(11 /* max_restarts */);

workers.push_back(first_submitted);
workers.push_back(second_submitted);
workers.push_back(third_submitted);
workers.push_back(fourth_submitted);

MemorySnapshot memory_snapshot;
auto worker_to_kill =
worker_killing_policy_.SelectWorkerToKill(workers, memory_snapshot).first;
ASSERT_EQ(worker_to_kill->WorkerId(), second_submitted->WorkerId());
workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill),
workers.end());

worker_to_kill =
worker_killing_policy_.SelectWorkerToKill(workers, memory_snapshot).first;
ASSERT_EQ(worker_to_kill->WorkerId(), fourth_submitted->WorkerId());
workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill),
workers.end());

worker_to_kill =
worker_killing_policy_.SelectWorkerToKill(workers, memory_snapshot).first;
ASSERT_EQ(worker_to_kill->WorkerId(), first_submitted->WorkerId());
workers.erase(std::remove(workers.begin(), workers.end(), worker_to_kill),
workers.end());

worker_to_kill =
worker_killing_policy_.SelectWorkerToKill(workers, memory_snapshot).first;
ASSERT_EQ(worker_to_kill->WorkerId(), third_submitted->WorkerId());
}

} // namespace raylet

} // namespace ray

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit 99ddbe4

Please sign in to comment.