Skip to content

Commit

Permalink
Added docs on using asio::cancel_after with pool_params::thread_safe
Browse files Browse the repository at this point in the history
close #402
  • Loading branch information
anarthal authored Feb 24, 2025
1 parent 33660e2 commit e988c9a
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 38 deletions.
4 changes: 4 additions & 0 deletions doc/images/connection_pool_impl.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
53 changes: 41 additions & 12 deletions doc/qbk/12_connection_pool.qbk
Original file line number Diff line number Diff line change
Expand Up @@ -166,25 +166,54 @@ In short:
setting [refmem pool_params ping_interval] to zero.


[heading Thread-safety]
[heading:thread_safe Thread-safety]

By default, [reflink connection_pool] is [*not thread-safe], but it can
be easily made thread-safe by setting [refmem pool_params thread_safe]:

[connection_pool_thread_safe]
[connection_pool_thread_safe_create]

Thread-safe connection pools create internally a [asioreflink strand strand],
Asio's method to enable concurrency without explicit locking.
Enabling thread-safety will ensure that all intermediate handlers run through
the created strand, avoiding data races at the cost of some performance.
To correctly understand what is protected by [refmem pool_params thread_safe]
and what is not, we need a grasp of how pools are implemented.
Both [reflink connection_pool] and individual [reflink pooled_connection]'s
hold pointers to a shared state object containing all data required by the pool:

[note
Thread-safety only protects the pool. Individual connections are [*not] thread-safe.
Assignments aren't thread-safe, either. See [reflink connection_pool] docs for more info.
]
[$mysql/images/connection_pool_impl.svg [align center]]

Thread-safe connection pools internally create an [asioreflink strand strand]
that protects the connection pool's state. Operations like
[refmemunq connection_pool async_get_connection], [refmemunq connection_pool async_run]
and [reflink pooled_connection]'s destructor will run through the strand,
and are safe to be run from any thread. Operations that mutate
state handles (the internal `std::shared_ptr`), like [*assignment operators,
are not thread-safe].

Data outside the pool's state is not protected. In particular,
[*`asio::cancel_after` creates an internal timer that can cause
inadvertent race conditions]. For example:

[connection_pool_thread_safe_use]

This coroutine must be run within a strand:

[connection_pool_thread_safe_spawn]


If we don't use `asio::make_shared`, we have the following race condition:

* The thread calling `async_get_connection` sets up the timer required by `asio::cancel_after`.
* In parallel, the thread running the execution context sees that there is a healthy connection
and completes the `async_get_connection` operation. As a result, the timer is cancelled.
Thus, the timer is accessed concurrently from both threads without protection.


If you're using callbacks, code gets slightly more convoluted. The
above coroutine can be rewritten as:

[connection_pool_thread_safe_callbacks]

Thread-safety extends to per-operation cancellation, too.
Cancelling an operation on a thread-safe pool is safe.
Thread-safety is disabled by default because strands impose a performance
penalty that is avoidable in single-threaded programs.


[heading Transport types and TLS]
Expand Down
10 changes: 10 additions & 0 deletions include/boost/mysql/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ class pooled_connection
* like \ref pooled_connection, have their own state handle,
* and thus interact only with the pool state.
*
* If configured to be thread-safe, the protection applies only to the pool's state.
* In particular, be careful when using `asio::cancel_after` and similar tokens.
* Please read
* <a href="../connection_pool.html#mysql.connection_pool.thread_safe">this page</a> for more info.
*
* In summary:
*
* - Distinct objects: safe. \n
Expand Down Expand Up @@ -662,6 +667,11 @@ class connection_pool
* Otherwise, intermediate handlers are executed using
* `token`'s associated executor if it has one, or `this->get_executor()` if it hasn't.
*
* **Caution**: be careful when using thread-safety and `asio::cancel_after`, as it
* can result in inadvertent race conditions. Please refer to
* <a href="../../../connection_pool.html#mysql.connection_pool.thread_safe">this
* page</a> for more info.
*
* \par Per-operation cancellation
* This operation supports per-operation cancellation.
* Cancelling `async_get_connection` has no observable side effects.
Expand Down
7 changes: 6 additions & 1 deletion include/boost/mysql/pool_params.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,12 @@ struct pool_params
* will be run through the created strand.
*
* Thread-safety doesn't extend to individual connections: \ref pooled_connection
* objects can't be shared between threads.
* objects can't be shared between threads. Thread-safety does not protect
* objects that don't belong to the pool. For instance, `asio::cancel_after`
* creates a timer that must be protected with a strand.
* Refer to
* <a href="../../connection_pool.html#mysql.connection_pool.thread_safe">this
* page</a> for more info.
*/
bool thread_safe{false};

Expand Down
140 changes: 115 additions & 25 deletions test/integration/test/snippets/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@
#include <boost/mysql/with_diagnostics.hpp>

#include <boost/asio/awaitable.hpp>
#include <boost/asio/bind_executor.hpp>
#include <boost/asio/cancel_after.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/asio/use_future.hpp>
#include <boost/config.hpp>
#include <boost/system/error_code.hpp>
#include <boost/test/unit_test.hpp>

#include <chrono>
Expand All @@ -26,8 +31,9 @@
#include "test_integration/run_coro.hpp"
#include "test_integration/snippets/credentials.hpp"

using namespace boost::mysql;
using namespace boost::mysql::test;
namespace asio = boost::asio;
namespace mysql = boost::mysql;
using namespace mysql::test;

namespace {

Expand All @@ -36,48 +42,112 @@ namespace {
// Use connection pools for functions that will be called
// repeatedly during the application lifetime.
// An HTTP server handler function is a good candidate.
boost::asio::awaitable<std::int64_t> get_num_employees(boost::mysql::connection_pool& pool)
asio::awaitable<std::int64_t> get_num_employees(mysql::connection_pool& pool)
{
// Get a fresh connection from the pool.
// pooled_connection is a proxy to an any_connection object.
boost::mysql::pooled_connection conn = co_await pool.async_get_connection();
mysql::pooled_connection conn = co_await pool.async_get_connection();

// Use pooled_connection::operator-> to access the underlying any_connection.
// Let's use the connection
results result;
mysql::results result;
co_await conn->async_execute("SELECT COUNT(*) FROM employee", result);
co_return result.rows().at(0).at(0).as_int64();

// When conn is destroyed, the connection is returned to the pool
}
//]

boost::asio::awaitable<void> return_without_reset(boost::mysql::connection_pool& pool)
asio::awaitable<void> return_without_reset(mysql::connection_pool& pool)
{
//[connection_pool_return_without_reset
// Get a connection from the pool
boost::mysql::pooled_connection conn = co_await pool.async_get_connection();
mysql::pooled_connection conn = co_await pool.async_get_connection();

// Use the connection in a way that doesn't mutate session state.
// We're not setting variables, preparing statements or starting transactions,
// so it's safe to skip reset
boost::mysql::results result;
mysql::results result;
co_await conn->async_execute("SELECT COUNT(*) FROM employee", result);

// Explicitly return the connection to the pool, skipping reset
conn.return_without_reset();
//]
}

boost::asio::awaitable<void> apply_timeout(boost::mysql::connection_pool& pool)
asio::awaitable<void> apply_timeout(mysql::connection_pool& pool)
{
//[connection_pool_apply_timeout
// Get a connection from the pool, but don't wait more than 5 seconds
auto conn = co_await pool.async_get_connection(boost::asio::cancel_after(std::chrono::seconds(5)));
auto conn = co_await pool.async_get_connection(asio::cancel_after(std::chrono::seconds(5)));
//]

conn.return_without_reset();
}

//[connection_pool_thread_safe_use
// A function that handles a user session in a server
asio::awaitable<void> handle_session(mysql::connection_pool& pool)
{
// CAUTION: asio::cancel_after creates a timer that is *not* part of the pool's state.
// The timer is not protected by the pool's strand.
// This coroutine must be run within a strand for this to be safe
using namespace std::chrono_literals;
co_await pool.async_get_connection(asio::cancel_after(30s));

// Use the connection
}
//]
#endif

#ifndef BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES
//[connection_pool_thread_safe_callbacks
// Holds per-session state
class session_handler : public std::enable_shared_from_this<session_handler>
{
// The connection pool
mysql::connection_pool& pool_;

// A strand object, unique to this session
asio::strand<asio::any_io_executor> strand_;

public:
// pool.get_executor() points to the execution context that was used
// to create the pool, and never to the pool's internal strand
session_handler(mysql::connection_pool& pool)
: pool_(pool), strand_(asio::make_strand(pool.get_executor()))
{
}

void start()
{
// Enters the strand. The passed function will be executed through the strand.
// If the initiation is run outside the strand, a race condition will occur.
asio::dispatch(asio::bind_executor(strand_, [self = shared_from_this()] { self->get_connection(); }));
}

void get_connection()
{
// This function will run within the strand. Binding the passed callback to
// the strand will make async_get_connection run it within the strand, too.
pool_.async_get_connection(asio::cancel_after(
std::chrono::seconds(30),
asio::bind_executor(
strand_,
[self = shared_from_this()](boost::system::error_code, mysql::pooled_connection) {
// Use the connection as required
}
)
));
}
};

void handle_session_v2(mysql::connection_pool& pool)
{
// Start the callback chain
std::make_shared<session_handler>(pool)->start();
}
//]
#endif

BOOST_AUTO_TEST_CASE(section_connection_pool)
Expand All @@ -90,39 +160,39 @@ BOOST_AUTO_TEST_CASE(section_connection_pool)
// You must specify enough information to establish a connection,
// including the server address and credentials.
// You can configure a lot of other things, like pool limits
boost::mysql::pool_params params;
mysql::pool_params params;
params.server_address.emplace_host_and_port(server_hostname);
params.username = mysql_username;
params.password = mysql_password;
params.database = "boost_mysql_examples";

// The I/O context, required by all I/O operations
boost::asio::io_context ctx;
asio::io_context ctx;

// Construct a pool of connections. The context will be used internally
// to create the connections and other I/O objects
boost::mysql::connection_pool pool(ctx, std::move(params));
mysql::connection_pool pool(ctx, std::move(params));

// You need to call async_run on the pool before doing anything useful with it.
// async_run creates connections and keeps them healthy. It must be called
// only once per pool.
// The detached completion token means that we don't want to be notified when
// the operation ends. It's similar to a no-op callback.
pool.async_run(boost::asio::detached);
pool.async_run(asio::detached);
//]

#ifdef BOOST_ASIO_HAS_CO_AWAIT
run_coro(ctx, [&pool]() -> boost::asio::awaitable<void> {
run_coro(ctx, [&pool]() -> asio::awaitable<void> {
co_await get_num_employees(pool);
pool.cancel();
});
#endif
}
{
boost::asio::io_context ctx;
asio::io_context ctx;

//[connection_pool_configure_size
boost::mysql::pool_params params;
mysql::pool_params params;

// Set the usual params
params.server_address.emplace_host_and_port(server_hostname);
Expand All @@ -134,38 +204,58 @@ BOOST_AUTO_TEST_CASE(section_connection_pool)
params.initial_size = 10;
params.max_size = 1000;

boost::mysql::connection_pool pool(ctx, std::move(params));
mysql::connection_pool pool(ctx, std::move(params));
//]

#ifdef BOOST_ASIO_HAS_CO_AWAIT
pool.async_run(boost::asio::detached);
run_coro(ctx, [&pool]() -> boost::asio::awaitable<void> {
pool.async_run(asio::detached);
run_coro(ctx, [&pool]() -> asio::awaitable<void> {
co_await return_without_reset(pool);
co_await apply_timeout(pool);
pool.cancel();
});
#endif
}
{
//[connection_pool_thread_safe
// The I/O context, required by all I/O operations
boost::asio::io_context ctx;
//[connection_pool_thread_safe_create
// The I/O context, required by all I/O operations.
// This is like an io_context, but with 5 threads running it.
asio::thread_pool ctx(5);

// The usual pool configuration params
boost::mysql::pool_params params;
mysql::pool_params params;
params.server_address.emplace_host_and_port(server_hostname);
params.username = mysql_username;
params.password = mysql_password;
params.database = "boost_mysql_examples";
params.thread_safe = true; // enable thread safety

// Construct a thread-safe pool
boost::mysql::connection_pool pool(ctx, std::move(params));
mysql::connection_pool pool(ctx, std::move(params));
pool.async_run(asio::detached);

// We can now pass a reference to pool to other threads,
// and call async_get_connection concurrently without problem.
// Individual connections are still not thread-safe.
//]

#ifdef BOOST_ASIO_HAS_CO_AWAIT
//[connection_pool_thread_safe_spawn
// OK: the entire coroutine runs within a strand.
// In a typical server setup, each request usually gets its own strand,
// so it can run in parallel with other requests.
asio::co_spawn(
asio::make_strand(ctx), // If we removed this make_strand, we would have a race condition
[&pool] { return handle_session(pool); },
asio::detached
);
//]
#endif
#ifndef BOOST_NO_CXX14_INITIALIZED_LAMBDA_CAPTURES
handle_session_v2(pool);
#endif
pool.cancel();
ctx.join();
}
}

Expand Down

0 comments on commit e988c9a

Please sign in to comment.