Skip to content

Commit

Permalink
[event] event framework integration in raylet, gcs server and core wo…
Browse files Browse the repository at this point in the history
  • Loading branch information
SongGuyang authored Aug 17, 2021
1 parent ddb0dc8 commit 8227e24
Show file tree
Hide file tree
Showing 14 changed files with 182 additions and 10 deletions.
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,7 @@ cc_library(
":sha256",
"//src/ray/protobuf:event_cc_proto",
"@boost//:asio",
"@boost//:filesystem",
"@com_github_spdlog//:spdlog",
"@com_google_absl//absl/debugging:failure_signal_handler",
"@com_google_absl//absl/debugging:stacktrace",
Expand Down
4 changes: 4 additions & 0 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1275,6 +1275,7 @@ def start_dashboard(require_dashboard,


def start_gcs_server(redis_address,
log_dir,
stdout_file=None,
stderr_file=None,
redis_password=None,
Expand All @@ -1286,6 +1287,7 @@ def start_gcs_server(redis_address,
"""Start a gcs server.
Args:
redis_address (str): The address that the Redis server is listening on.
log_dir (str): The path of the dir where log files are created.
stdout_file: A file handle opened for writing to redirect stdout to. If
no redirection should happen, then this should be None.
stderr_file: A file handle opened for writing to redirect stderr to. If
Expand All @@ -1309,6 +1311,7 @@ def start_gcs_server(redis_address,
GCS_SERVER_EXECUTABLE,
f"--redis_address={gcs_ip_address}",
f"--redis_port={gcs_port}",
f"--log_dir={log_dir}",
f"--config_list={config_str}",
f"--gcs_server_port={gcs_server_port}",
f"--metrics-agent-port={metrics_agent_port}",
Expand Down Expand Up @@ -1544,6 +1547,7 @@ def start_raylet(redis_address,
f"--redis_password={redis_password or ''}",
f"--temp_dir={temp_dir}",
f"--session_dir={session_dir}",
f"--log_dir={log_dir}",
f"--resource_dir={resource_dir}",
f"--metrics-agent-port={metrics_agent_port}",
f"--metrics_export_port={metrics_export_port}",
Expand Down
1 change: 1 addition & 0 deletions python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,7 @@ def start_gcs_server(self):
"gcs_server", unique=True)
process_info = ray._private.services.start_gcs_server(
self._redis_address,
self._logs_dir,
stdout_file=stdout_file,
stderr_file=stderr_file,
redis_password=self._ray_params.redis_password,
Expand Down
3 changes: 3 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -456,3 +456,6 @@ RAY_CONFIG(int64_t, grpc_keepalive_time_ms, 10000);

/// grpc keepalive timeout
RAY_CONFIG(int64_t, grpc_keepalive_timeout_ms, 20000);

/// Whether to use log reporter in event framework
RAY_CONFIG(bool, event_log_reporter_enabled, false)
7 changes: 7 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ray/core_worker/transport/direct_actor_transport.h"
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
#include "ray/stats/stats.h"
#include "ray/util/event.h"
#include "ray/util/process.h"
#include "ray/util/util.h"

Expand Down Expand Up @@ -183,6 +184,12 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options)
// for java worker or in constructor of CoreWorker for python worker.
stats::Init(global_tags, options_.metrics_agent_port);

// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !options_.log_dir.empty()) {
RayEventInit(ray::rpc::Event_SourceType::Event_SourceType_CORE_WORKER,
std::unordered_map<std::string, std::string>(), options_.log_dir);
}

#ifndef _WIN32
// NOTE(kfstorm): std::atexit should be put at the end of `CoreWorkerProcess`
// constructor. We assume that spdlog has been initialized before this line. When the
Expand Down
9 changes: 9 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
#include "ray/gcs/gcs_server/gcs_server.h"
#include "ray/gcs/store_client/redis_store_client.h"
#include "ray/stats/stats.h"
#include "ray/util/event.h"
#include "ray/util/util.h"
#include "src/ray/protobuf/gcs_service.pb.h"

DEFINE_string(redis_address, "", "The ip address of redis.");
DEFINE_int32(redis_port, -1, "The port of redis.");
DEFINE_string(log_dir, "", "The path of the dir where log files are created.");
DEFINE_int32(gcs_server_port, 0, "The port of gcs server.");
DEFINE_int32(metrics_agent_port, -1, "The port of metrics agent.");
DEFINE_string(config_list, "", "The config list of raylet.");
Expand All @@ -40,6 +42,7 @@ int main(int argc, char *argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
const std::string redis_address = FLAGS_redis_address;
const int redis_port = static_cast<int>(FLAGS_redis_port);
const std::string log_dir = FLAGS_log_dir;
const int gcs_server_port = static_cast<int>(FLAGS_gcs_server_port);
const int metrics_agent_port = static_cast<int>(FLAGS_metrics_agent_port);
std::string config_list;
Expand Down Expand Up @@ -87,6 +90,12 @@ int main(int argc, char *argv[]) {
{ray::stats::NodeAddressKey, node_ip_address}};
ray::stats::Init(global_tags, metrics_agent_port);

// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) {
ray::RayEventInit(ray::rpc::Event_SourceType::Event_SourceType_GCS,
std::unordered_map<std::string, std::string>(), log_dir);
}

// IO Service for main loop.
instrumented_io_context main_service;
// Ensure that the IO service keeps running. Without this, the main_service will exit
Expand Down
9 changes: 9 additions & 0 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "ray/gcs/gcs_client/service_based_gcs_client.h"
#include "ray/raylet/raylet.h"
#include "ray/stats/stats.h"
#include "ray/util/event.h"

DEFINE_string(raylet_socket_name, "", "The socket name of raylet.");
DEFINE_string(store_socket_name, "", "The socket name of object store.");
Expand Down Expand Up @@ -50,6 +51,7 @@ DEFINE_string(cpp_worker_command, "", "CPP worker command.");
DEFINE_string(redis_password, "", "The password of redis.");
DEFINE_string(temp_dir, "", "Temporary directory.");
DEFINE_string(session_dir, "", "The path of this ray session directory.");
DEFINE_string(log_dir, "", "The path of the dir where log files are created.");
DEFINE_string(resource_dir, "", "The path of this ray resource directory.");
DEFINE_int32(ray_debugger_external, 0, "Make Ray debugger externally accessible.");
// store options
Expand Down Expand Up @@ -95,6 +97,7 @@ int main(int argc, char *argv[]) {
const std::string redis_password = FLAGS_redis_password;
const std::string temp_dir = FLAGS_temp_dir;
const std::string session_dir = FLAGS_session_dir;
const std::string log_dir = FLAGS_log_dir;
const std::string resource_dir = FLAGS_resource_dir;
const int ray_debugger_external = FLAGS_ray_debugger_external;
const int64_t object_store_memory = FLAGS_object_store_memory;
Expand Down Expand Up @@ -251,6 +254,12 @@ int main(int argc, char *argv[]) {
redis_password, node_manager_config, object_manager_config, gcs_client,
metrics_export_port));

// Initialize event framework.
if (RayConfig::instance().event_log_reporter_enabled() && !log_dir.empty()) {
ray::RayEventInit(ray::rpc::Event_SourceType::Event_SourceType_RAYLET,
{{"node_id", raylet->GetNodeId().Hex()}}, log_dir);
};

raylet->Start();
}));

Expand Down
2 changes: 2 additions & 0 deletions src/ray/raylet/raylet.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class Raylet {
/// Destroy the NodeServer.
~Raylet();

NodeID GetNodeId() const { return self_node_id_; }

private:
/// Register GCS client.
ray::Status RegisterGcs();
Expand Down
19 changes: 18 additions & 1 deletion src/ray/util/event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
// limitations under the License.

#include "ray/util/event.h"
#include <boost/filesystem.hpp>

namespace ray {
///
/// LogEventReporter
///
LogEventReporter::LogEventReporter(rpc::Event_SourceType source_type,
std::string &log_dir, bool force_flush,
const std::string &log_dir, bool force_flush,
int rotate_max_file_size, int rotate_max_file_num)
: log_dir_(log_dir),
force_flush_(force_flush),
Expand Down Expand Up @@ -118,6 +119,12 @@ void LogEventReporter::Report(const rpc::Event &event, const json &custom_fields
///
/// EventManager
///
EventManager::EventManager() {
RayLog::AddFatalLogCallbacks({[](const std::string &label, const std::string &content) {
RayEvent::ReportEvent("FATAL", label, content);
}});
}

EventManager &EventManager::Instance() {
static EventManager instance_;
return instance_;
Expand Down Expand Up @@ -213,4 +220,14 @@ void RayEvent::SendMessage(const std::string &message) {
EventManager::Instance().Publish(event, custom_fields_);
}

void RayEventInit(rpc::Event_SourceType source_type,
const std::unordered_map<std::string, std::string> &custom_fields,
const std::string &log_dir) {
RayEventContext::Instance().SetEventContext(source_type, custom_fields);
auto event_dir = boost::filesystem::path(log_dir) / boost::filesystem::path("event");
ray::EventManager::Instance().AddReporter(
std::make_shared<ray::LogEventReporter>(source_type, event_dir.string()));
RAY_LOG(INFO) << "Ray Event initialized for " << Event_SourceType_Name(source_type);
}

} // namespace ray
8 changes: 6 additions & 2 deletions src/ray/util/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class BaseEventReporter {
// responsible for writing event to specific file
class LogEventReporter : public BaseEventReporter {
public:
LogEventReporter(rpc::Event_SourceType source_type, std::string &log_dir,
LogEventReporter(rpc::Event_SourceType source_type, const std::string &log_dir,
bool force_flush = true, int rotate_max_file_size = 100,
int rotate_max_file_num = 20);

Expand Down Expand Up @@ -106,7 +106,7 @@ class EventManager final {
void ClearReporters();

private:
EventManager() = default;
EventManager();

EventManager(const EventManager &manager) = delete;

Expand Down Expand Up @@ -202,4 +202,8 @@ class RayEvent {
std::ostringstream osstream_;
};

void RayEventInit(rpc::Event_SourceType source_type,
const std::unordered_map<std::string, std::string> &custom_fields,
const std::string &log_dir);

} // namespace ray
21 changes: 21 additions & 0 deletions src/ray/util/event_label.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2021 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

namespace ray {

#define EL_RAY_FATAL_CHECK_FAILED "RAY_FATAL_CHECK_FAILED"

} // namespace ray
32 changes: 32 additions & 0 deletions src/ray/util/event_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
#include "ray/util/event.h"
#include <boost/filesystem.hpp>
#include <boost/range.hpp>
#include <csignal>
#include <fstream>
#include <set>
#include <thread>
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "ray/util/event_label.h"

namespace ray {

Expand Down Expand Up @@ -349,6 +352,35 @@ TEST(EVENT_TEST, WITH_FIELD) {
boost::filesystem::remove_all(log_dir.c_str());
}

TEST(EVENT_TEST, TEST_RAY_CHECK_ABORT) {
std::string log_dir = GenerateLogDir();

ray::EventManager::Instance().ClearReporters();
auto custom_fields = std::unordered_map<std::string, std::string>();
custom_fields.emplace("node_id", "node 1");
custom_fields.emplace("job_id", "job 1");
custom_fields.emplace("task_id", "task 1");
RayEventInit(rpc::Event_SourceType::Event_SourceType_RAYLET, custom_fields, log_dir);

RAY_CHECK(1 > 0) << "correct test case";

ASSERT_DEATH({ RAY_CHECK(1 < 0) << "incorrect test case"; }, "");

std::vector<std::string> vc;
ReadEventFromFile(vc, log_dir + "/event/event_RAYLET.log");
json out_custom_fields;
rpc::Event ele_1 = GetEventFromString(vc.back(), &out_custom_fields);

CheckEventDetail(ele_1, "job 1", "node 1", "task 1", "RAYLET", "FATAL",
EL_RAY_FATAL_CHECK_FAILED, "NULL");
EXPECT_THAT(ele_1.message(),
testing::HasSubstr("Check failed: 1 < 0 incorrect test case"));
EXPECT_THAT(ele_1.message(), testing::HasSubstr("*** StackTrace Information ***"));
EXPECT_THAT(ele_1.message(), testing::HasSubstr("ray::RayLog::~RayLog()"));

boost::filesystem::remove_all(log_dir.c_str());
}

} // namespace ray

int main(int argc, char **argv) {
Expand Down
Loading

0 comments on commit 8227e24

Please sign in to comment.