Skip to content

Commit

Permalink
Improve backend debug logging, refactor scheduling queues (ray-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanie-wang authored and raulchen committed Jan 26, 2019
1 parent 066fa8a commit eddd60e
Show file tree
Hide file tree
Showing 19 changed files with 509 additions and 437 deletions.
168 changes: 112 additions & 56 deletions doc/source/development.rst
Original file line number Diff line number Diff line change
@@ -1,85 +1,141 @@
Development Tips
================

If you are doing development on the Ray codebase, the following tips may be
helpful.
Compilation
-----------

1. **Speeding up compilation:** Be sure to install Ray with
To speed up compilation, be sure to install Ray with

.. code-block:: shell
.. code-block:: shell
cd ray/python
pip install -e . --verbose
cd ray/python
pip install -e . --verbose
The ``-e`` means "editable", so changes you make to files in the Ray
directory will take effect without reinstalling the package. In contrast, if
you do ``python setup.py install``, files will be copied from the Ray
directory to a directory of Python packages (often something like
``/home/ubuntu/anaconda3/lib/python3.6/site-packages/ray``). This means that
changes you make to files in the Ray directory will not have any effect.
The ``-e`` means "editable", so changes you make to files in the Ray
directory will take effect without reinstalling the package. In contrast, if
you do ``python setup.py install``, files will be copied from the Ray
directory to a directory of Python packages (often something like
``/home/ubuntu/anaconda3/lib/python3.6/site-packages/ray``). This means that
changes you make to files in the Ray directory will not have any effect.

If you run into **Permission Denied** errors when running ``pip install``,
you can try adding ``--user``. You may also need to run something like ``sudo
chown -R $USER /home/ubuntu/anaconda3`` (substituting in the appropriate
path).
If you run into **Permission Denied** errors when running ``pip install``,
you can try adding ``--user``. You may also need to run something like ``sudo
chown -R $USER /home/ubuntu/anaconda3`` (substituting in the appropriate
path).

If you make changes to the C++ files, you will need to recompile them.
However, you do not need to rerun ``pip install -e .``. Instead, you can
recompile much more quickly by doing
If you make changes to the C++ files, you will need to recompile them.
However, you do not need to rerun ``pip install -e .``. Instead, you can
recompile much more quickly by doing

.. code-block:: shell
.. code-block:: shell
cd ray/build
make -j8
cd ray/build
make -j8
2. **Starting processes in a debugger:** When processes are crashing, it is
often useful to start them in a debugger (``gdb`` on Linux or ``lldb`` on
MacOS). See the latest discussion about how to do this `here`_.
Debugging
---------

3. **Running tests locally:** Suppose that one of the tests (e.g.,
``runtest.py``) is failing. You can run that test locally by running
``python test/runtest.py``. However, doing so will run all of the tests which
can take a while. To run a specific test that is failing, you can do
Starting processes in a debugger
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When processes are crashing, it is often useful to start them in a debugger
(``gdb`` on Linux or ``lldb`` on MacOS). See the latest discussion about how to
do this `here`_.

.. code-block:: shell
You can also get a core dump of the ``raylet`` process, which is especially
useful when filing `issues`_. The process to obtain a core dump is OS-specific,
but usually involves running ``ulimit -c unlimited`` before starting Ray to
allow core dump files to be written.

cd ray
python -m pytest -v test/runtest.py::test_keyword_args
Inspecting Redis shards
~~~~~~~~~~~~~~~~~~~~~~~
To inspect Redis, you can use the ``ray.experimental.state.GlobalState`` Python
API. The easiest way to do this is to start or connect to a Ray cluster with
``ray.init()``, then query the API like so:

When running tests, usually only the first test failure matters. A single
test failure often triggers the failure of subsequent tests in the same
script.
.. code-block:: python
4. **Running linter locally:** To run the Python linter on a specific file, run
something like ``flake8 ray/python/ray/worker.py``. You may need to first run
``pip install flake8``.
ray.init()
ray.worker.global_state.client_table()
# Returns current information about the nodes in the cluster, such as:
# [{'ClientID': '2a9d2b34ad24a37ed54e4fcd32bf19f915742f5b',
# 'IsInsertion': True,
# 'NodeManagerAddress': '1.2.3.4',
# 'NodeManagerPort': 43280,
# 'ObjectManagerPort': 38062,
# 'ObjectStoreSocketName': '/tmp/ray/session_2019-01-21_16-28-05_4216/sockets/plasma_store',
# 'RayletSocketName': '/tmp/ray/session_2019-01-21_16-28-05_4216/sockets/raylet',
# 'Resources': {'CPU': 8.0, 'GPU': 1.0}}]
5. **Autoformatting code**. We use ``yapf`` https://github.com/google/yapf for
linting, and the config file is located at ``.style.yapf``. We recommend
running ``scripts/yapf.sh`` prior to pushing to format changed files.
Note that some projects such as dataframes and rllib are currently excluded.
To inspect the primary Redis shard manually, you can also query with commands
like the following.

6. **Inspecting Redis shards by hand:** To inspect the primary Redis shard by
hand, you can query it with commands like the following.
.. code-block:: python
.. code-block:: python
r_primary = ray.worker.global_worker.redis_client
r_primary.keys("*")
r_primary = ray.worker.global_worker.redis_client
r_primary.keys("*")
To inspect other Redis shards, you will need to create a new Redis client.
For example (assuming the relevant IP address is ``127.0.0.1`` and the
relevant port is ``1234``), you can do this as follows.

To inspect other Redis shards, you will need to create a new Redis client.
For example (assuming the relevant IP address is ``127.0.0.1`` and the
relevant port is ``1234``), you can do this as follows.
.. code-block:: python
.. code-block:: python
import redis
r = redis.StrictRedis(host='127.0.0.1', port=1234)
import redis
r = redis.StrictRedis(host='127.0.0.1', port=1234)
You can find a list of the relevant IP addresses and ports by running

You can find a list of the relevant IP addresses and ports by running
.. code-block:: python
.. code-block:: python
r_primary.lrange('RedisShards', 0, -1)
r_primary.lrange('RedisShards', 0, -1)
.. _backend-logging:

Backend logging
~~~~~~~~~~~~~~~
The ``raylet`` process logs detailed information about events like task
execution and object transfers between nodes. To set the logging level at
runtime, you can set the ``RAY_BACKEND_LOG_LEVEL`` environment variable before
starting Ray. For example, you can do:

.. code-block:: shell
export RAY_BACKEND_LOG_LEVEL=debug
ray start
This will print any ``RAY_LOG(DEBUG)`` lines in the source code to the
``raylet.err`` file, which you can find in the `Temporary Files`_.

Testing locally
---------------
Suppose that one of the tests (e.g., ``runtest.py``) is failing. You can run
that test locally by running ``python test/runtest.py``. However, doing so will
run all of the tests which can take a while. To run a specific test that is
failing, you can do

.. code-block:: shell
cd ray
python -m pytest -v test/runtest.py::test_keyword_args
When running tests, usually only the first test failure matters. A single
test failure often triggers the failure of subsequent tests in the same
script.

Linting
-------

**Running linter locally:** To run the Python linter on a specific file, run
something like ``flake8 ray/python/ray/worker.py``. You may need to first run
``pip install flake8``.

**Autoformatting code**. We use ``yapf`` https://github.com/google/yapf for
linting, and the config file is located at ``.style.yapf``. We recommend
running ``scripts/yapf.sh`` prior to pushing to format changed files.
Note that some projects such as dataframes and rllib are currently excluded.



.. _`issues`: https://github.com/ray-project/ray/issues
.. _`here`: https://github.com/ray-project/ray/issues/108
.. _`Temporary Files`: http://ray.readthedocs.io/en/latest/tempfile.html
26 changes: 17 additions & 9 deletions src/ray/common/client_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -187,22 +187,24 @@ template <class T>
std::shared_ptr<ClientConnection<T>> ClientConnection<T>::Create(
ClientHandler<T> &client_handler, MessageHandler<T> &message_handler,
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label,
int64_t error_message_type) {
std::shared_ptr<ClientConnection<T>> self(new ClientConnection(
message_handler, std::move(socket), debug_label, error_message_type));
const std::vector<std::string> &message_type_enum_names, int64_t error_message_type) {
std::shared_ptr<ClientConnection<T>> self(
new ClientConnection(message_handler, std::move(socket), debug_label,
message_type_enum_names, error_message_type));
// Let our manager process our new connection.
client_handler(*self);
return self;
}

template <class T>
ClientConnection<T>::ClientConnection(MessageHandler<T> &message_handler,
boost::asio::basic_stream_socket<T> &&socket,
const std::string &debug_label,
int64_t error_message_type)
ClientConnection<T>::ClientConnection(
MessageHandler<T> &message_handler, boost::asio::basic_stream_socket<T> &&socket,
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names, int64_t error_message_type)
: ServerConnection<T>(std::move(socket)),
message_handler_(message_handler),
debug_label_(debug_label),
message_type_enum_names_(message_type_enum_names),
error_message_type_(error_message_type) {}

template <class T>
Expand Down Expand Up @@ -261,8 +263,14 @@ void ClientConnection<T>::ProcessMessage(const boost::system::error_code &error)
message_handler_(shared_ClientConnection_from_this(), read_type_, read_message_.data());
int64_t interval = current_time_ms() - start_ms;
if (interval > RayConfig::instance().handler_warning_timeout_ms()) {
RAY_LOG(WARNING) << "[" << debug_label_ << "]ProcessMessage with type " << read_type_
<< " took " << interval << " ms.";
std::string message_type;
if (message_type_enum_names_.empty()) {
message_type = std::to_string(read_type_);
} else {
message_type = message_type_enum_names_[read_type_];
}
RAY_LOG(WARNING) << "[" << debug_label_ << "]ProcessMessage with type "
<< message_type << " took " << interval << " ms.";
}
}

Expand Down
12 changes: 11 additions & 1 deletion src/ray/common/client_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,15 @@ class ClientConnection : public ServerConnection<T> {
/// \param new_client_handler A reference to the client handler.
/// \param message_handler A reference to the message handler.
/// \param socket The client socket.
/// \param debug_label Label that is printed in debug messages, to identify
/// the type of client.
/// \param message_type_enum_names A table of printable enum names for the
/// message types received from this client, used for debug messages.
/// \return std::shared_ptr<ClientConnection>.
static std::shared_ptr<ClientConnection<T>> Create(
ClientHandler<T> &new_client_handler, MessageHandler<T> &message_handler,
boost::asio::basic_stream_socket<T> &&socket, const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
int64_t error_message_type);

std::shared_ptr<ClientConnection<T>> shared_ClientConnection_from_this() {
Expand All @@ -170,7 +175,9 @@ class ClientConnection : public ServerConnection<T> {
/// A private constructor for a node client connection.
ClientConnection(MessageHandler<T> &message_handler,
boost::asio::basic_stream_socket<T> &&socket,
const std::string &debug_label, int64_t error_message_type);
const std::string &debug_label,
const std::vector<std::string> &message_type_enum_names,
int64_t error_message_type);
/// Process an error from the last operation, then process the message
/// header from the client.
void ProcessMessageHeader(const boost::system::error_code &error);
Expand All @@ -184,6 +191,9 @@ class ClientConnection : public ServerConnection<T> {
MessageHandler<T> message_handler_;
/// A label used for debug messages.
const std::string debug_label_;
/// A table of printable enum names for the message types, used for debug
/// messages.
const std::vector<std::string> message_type_enum_names_;
/// The value for disconnect client message.
int64_t error_message_type_;
/// Buffers for the current message being read from the client.
Expand Down
16 changes: 3 additions & 13 deletions src/ray/gcs/tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,10 +247,7 @@ Status ErrorTable::PushErrorToDriver(const JobID &job_id, const std::string &typ
data->type = type;
data->error_message = error_message;
data->timestamp = timestamp;
return Append(job_id, job_id, data, [](ray::gcs::AsyncGcsClient *client,
const JobID &id, const ErrorTableDataT &data) {
RAY_LOG(DEBUG) << "Error message pushed callback";
});
return Append(job_id, job_id, data, /*done_callback=*/nullptr);
}

std::string ErrorTable::DebugString() const {
Expand All @@ -264,10 +261,7 @@ Status ProfileTable::AddProfileEventBatch(const ProfileTableData &profile_events
profile_events.UnPackTo(data.get());

return Append(JobID::nil(), UniqueID::from_random(), data,
[](ray::gcs::AsyncGcsClient *client, const JobID &id,
const ProfileTableDataT &data) {
RAY_LOG(DEBUG) << "Profile message pushed callback";
});
/*done_callback=*/nullptr);
}

std::string ProfileTable::DebugString() const {
Expand All @@ -278,11 +272,7 @@ Status DriverTable::AppendDriverData(const JobID &driver_id, bool is_dead) {
auto data = std::make_shared<DriverTableDataT>();
data->driver_id = driver_id.binary();
data->is_dead = is_dead;
return Append(driver_id, driver_id, data,
[](ray::gcs::AsyncGcsClient *client, const JobID &id,
const DriverTableDataT &data) {
RAY_LOG(DEBUG) << "Driver entry added callback";
});
return Append(driver_id, driver_id, data, /*done_callback=*/nullptr);
}

void ClientTable::RegisterClientAddedCallback(const ClientTableCallback &callback) {
Expand Down
2 changes: 0 additions & 2 deletions src/ray/object_manager/connection_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,12 @@ std::shared_ptr<SenderConnection> ConnectionPool::Borrow(SenderMapType &conn_map
const ClientID &client_id) {
std::shared_ptr<SenderConnection> conn = std::move(conn_map[client_id].back());
conn_map[client_id].pop_back();
RAY_LOG(DEBUG) << "Borrow " << client_id << " " << conn_map[client_id].size();
return conn;
}

void ConnectionPool::Return(SenderMapType &conn_map, const ClientID &client_id,
std::shared_ptr<SenderConnection> conn) {
conn_map[client_id].push_back(std::move(conn));
RAY_LOG(DEBUG) << "Return " << client_id << " " << conn_map[client_id].size();
}

std::string ConnectionPool::DebugString() const {
Expand Down
6 changes: 0 additions & 6 deletions src/ray/object_manager/object_buffer_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Ge
const ObjectID &object_id, uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index) {
std::lock_guard<std::mutex> lock(pool_mutex_);
RAY_LOG(DEBUG) << "GetChunk " << object_id << " " << data_size << " " << metadata_size;
if (get_buffer_state_.count(object_id) == 0) {
plasma::ObjectBuffer object_buffer;
plasma::ObjectID plasma_id = object_id.to_plasma_id();
Expand Down Expand Up @@ -72,7 +71,6 @@ void ObjectBufferPool::ReleaseGetChunk(const ObjectID &object_id, uint64_t chunk
std::lock_guard<std::mutex> lock(pool_mutex_);
GetBufferState &buffer_state = get_buffer_state_[object_id];
buffer_state.references--;
RAY_LOG(DEBUG) << "ReleaseBuffer " << object_id << " " << buffer_state.references;
if (buffer_state.references == 0) {
RAY_ARROW_CHECK_OK(store_client_.Release(object_id.to_plasma_id()));
get_buffer_state_.erase(object_id);
Expand All @@ -89,8 +87,6 @@ std::pair<const ObjectBufferPool::ChunkInfo &, ray::Status> ObjectBufferPool::Cr
const ObjectID &object_id, uint64_t data_size, uint64_t metadata_size,
uint64_t chunk_index) {
std::lock_guard<std::mutex> lock(pool_mutex_);
RAY_LOG(DEBUG) << "CreateChunk " << object_id << " " << data_size << " "
<< metadata_size;
if (create_buffer_state_.count(object_id) == 0) {
const plasma::ObjectID plasma_id = object_id.to_plasma_id();
int64_t object_size = data_size - metadata_size;
Expand Down Expand Up @@ -153,8 +149,6 @@ void ObjectBufferPool::SealChunk(const ObjectID &object_id, const uint64_t chunk
CreateChunkState::REFERENCED);
create_buffer_state_[object_id].chunk_state[chunk_index] = CreateChunkState::SEALED;
create_buffer_state_[object_id].num_seals_remaining--;
RAY_LOG(DEBUG) << "SealChunk" << object_id << " "
<< create_buffer_state_[object_id].num_seals_remaining;
if (create_buffer_state_[object_id].num_seals_remaining == 0) {
const plasma::ObjectID plasma_id = object_id.to_plasma_id();
RAY_ARROW_CHECK_OK(store_client_.Seal(plasma_id));
Expand Down
Loading

0 comments on commit eddd60e

Please sign in to comment.