From 38ef3a75cc74b5adb0cb22a72fc0426e8ffd094f Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Tue, 21 Jul 2020 18:29:39 +0300 Subject: [PATCH] PollableFd: explicit sync_with_poll GitOrigin-RevId: 71fa35a594816e84e372ebcfa9d0077a13f26a62 --- benchmark/bench_http_server_cheat.cpp | 7 +++--- benchmark/bench_http_server_fast.cpp | 7 +++--- td/mtproto/RawConnection.h | 3 ++- tdnet/td/net/HttpConnectionBase.cpp | 9 ++++---- tdnet/td/net/TcpListener.cpp | 5 +++-- tdnet/td/net/TransparentProxy.cpp | 6 +++-- tdutils/td/utils/BufferedFd.h | 11 +++++----- tdutils/td/utils/BufferedUdp.h | 7 ++++-- tdutils/td/utils/port/ServerSocketFd.cpp | 2 +- tdutils/td/utils/port/SocketFd.cpp | 6 ++--- tdutils/td/utils/port/StdStreams.cpp | 6 +++-- tdutils/td/utils/port/UdpSocketFd.cpp | 8 +++---- tdutils/td/utils/port/detail/EventFdBsd.cpp | 3 ++- tdutils/td/utils/port/detail/EventFdLinux.cpp | 2 +- tdutils/td/utils/port/detail/PollableFd.h | 22 +++++++++++++------ tdutils/test/misc.cpp | 3 ++- test/http.cpp | 2 +- 17 files changed, 65 insertions(+), 44 deletions(-) diff --git a/benchmark/bench_http_server_cheat.cpp b/benchmark/bench_http_server_cheat.cpp index 8716ec5cfb00..9e8639881579 100644 --- a/benchmark/bench_http_server_cheat.cpp +++ b/benchmark/bench_http_server_cheat.cpp @@ -61,15 +61,16 @@ class HelloWorld : public Actor { } } Status do_loop() { + sync_with_poll(socket_fd_); TRY_STATUS(read_loop()); TRY_STATUS(write_loop()); - if (can_close(socket_fd_)) { + if (can_close_local(socket_fd_)) { return Status::Error("CLOSE"); } return Status::OK(); } Status write_loop() { - while (can_write(socket_fd_) && write_pos_ < write_buf_.size()) { + while (can_write_local(socket_fd_) && write_pos_ < write_buf_.size()) { TRY_RESULT(written, socket_fd_.write(Slice(write_buf_).substr(write_pos_))); write_pos_ += written; if (write_pos_ == write_buf_.size()) { @@ -80,7 +81,7 @@ class HelloWorld : public Actor { return Status::OK(); } Status read_loop() { - while (can_read(socket_fd_)) { + while (can_read_local(socket_fd_)) { TRY_RESULT(read_size, socket_fd_.read(MutableSlice(read_buf.data(), read_buf.size()))); for (size_t i = 0; i < read_size; i++) { if (read_buf[i] == '\n') { diff --git a/benchmark/bench_http_server_fast.cpp b/benchmark/bench_http_server_fast.cpp index c4353dc1754e..c180c7856cb2 100644 --- a/benchmark/bench_http_server_fast.cpp +++ b/benchmark/bench_http_server_fast.cpp @@ -55,19 +55,18 @@ class HttpEchoConnection : public Actor { } void loop() override { + sync_with_poll(fd_); auto status = [&] { TRY_STATUS(loop_read()); TRY_STATUS(loop_write()); return Status::OK(); }(); - if (status.is_error() || can_close(fd_)) { + if (status.is_error() || can_close_local(fd_)) { stop(); } } Status loop_read() { - if (can_read(fd_)) { - TRY_STATUS(fd_.flush_read()); - } + TRY_STATUS(fd_.flush_read()); while (true) { TRY_RESULT(need, reader_.read_next(&query_)); if (need == 0) { diff --git a/td/mtproto/RawConnection.h b/td/mtproto/RawConnection.h index 4864ac61fc01..13fdbf7e4074 100644 --- a/td/mtproto/RawConnection.h +++ b/td/mtproto/RawConnection.h @@ -130,6 +130,7 @@ class RawConnection { if (has_error_) { return Status::Error("Connection has already failed"); } + sync_with_poll(socket_fd_); // read/write // EINVAL may be returned in linux kernel < 2.6.28. And on some new kernels too. @@ -139,7 +140,7 @@ class RawConnection { TRY_STATUS(flush_read(auth_key, callback)); TRY_STATUS(callback.before_write()); TRY_STATUS(flush_write()); - if (can_close(socket_fd_)) { + if (can_close_local(socket_fd_)) { return Status::Error("Connection closed"); } return Status::OK(); diff --git a/tdnet/td/net/HttpConnectionBase.cpp b/tdnet/td/net/HttpConnectionBase.cpp index 1a0a116298e8..7ef6a83c4529 100644 --- a/tdnet/td/net/HttpConnectionBase.cpp +++ b/tdnet/td/net/HttpConnectionBase.cpp @@ -92,7 +92,8 @@ void HttpConnectionBase::timeout_expired() { stop(); } void HttpConnectionBase::loop() { - if (can_read(fd_)) { + sync_with_poll(fd_); + if (can_read_local(fd_)) { LOG(DEBUG) << "Can read from the connection"; auto r = fd_.flush_read(); if (r.is_error()) { @@ -133,7 +134,7 @@ void HttpConnectionBase::loop() { write_source_.wakeup(); - if (can_write(fd_)) { + if (can_write_local(fd_)) { LOG(DEBUG) << "Can write to the connection"; auto r = fd_.flush_write(); if (r.is_error()) { @@ -146,7 +147,7 @@ void HttpConnectionBase::loop() { } Status pending_error; - if (fd_.get_poll_info().get_flags().has_pending_error()) { + if (fd_.get_poll_info().get_flags_local().has_pending_error()) { pending_error = fd_.get_pending_error(); } if (pending_error.is_ok() && write_sink_.status().is_error()) { @@ -163,7 +164,7 @@ void HttpConnectionBase::loop() { state_ = State::Close; } - if (can_close(fd_)) { + if (can_close_local(fd_)) { LOG(DEBUG) << "Can close the connection"; state_ = State::Close; } diff --git a/tdnet/td/net/TcpListener.cpp b/tdnet/td/net/TcpListener.cpp index 8bbf00afd00f..e6161811c8c3 100644 --- a/tdnet/td/net/TcpListener.cpp +++ b/tdnet/td/net/TcpListener.cpp @@ -40,7 +40,8 @@ void TcpListener::loop() { if (server_fd_.empty()) { start_up(); } - while (can_read(server_fd_)) { + sync_with_poll(server_fd_); + while (can_read_local(server_fd_)) { auto r_socket_fd = server_fd_.accept(); if (r_socket_fd.is_error()) { if (r_socket_fd.error().code() != -1) { @@ -51,7 +52,7 @@ void TcpListener::loop() { send_closure(callback_, &Callback::accept, r_socket_fd.move_as_ok()); } - if (can_close(server_fd_)) { + if (can_close_local(server_fd_)) { stop(); } } diff --git a/tdnet/td/net/TransparentProxy.cpp b/tdnet/td/net/TransparentProxy.cpp index ed38b9a51831..002dacec0db7 100644 --- a/tdnet/td/net/TransparentProxy.cpp +++ b/tdnet/td/net/TransparentProxy.cpp @@ -55,12 +55,14 @@ void TransparentProxy::start_up() { VLOG(proxy) << "Begin to connect to proxy"; Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this)); set_timeout_in(10); - if (can_write(fd_)) { + sync_with_poll(fd_); + if (can_write_local(fd_)) { loop(); } } void TransparentProxy::loop() { + sync_with_poll(fd_); auto status = [&] { TRY_STATUS(fd_.flush_read()); TRY_STATUS(loop_impl()); @@ -70,7 +72,7 @@ void TransparentProxy::loop() { if (status.is_error()) { on_error(std::move(status)); } - if (can_close(fd_)) { + if (can_close_local(fd_)) { on_error(Status::Error("Connection closed")); } } diff --git a/tdutils/td/utils/BufferedFd.h b/tdutils/td/utils/BufferedFd.h index d8ad078caf49..125640bbbd15 100644 --- a/tdutils/td/utils/BufferedFd.h +++ b/tdutils/td/utils/BufferedFd.h @@ -31,15 +31,16 @@ class BufferedFdBase : public FdT { Result flush_write() TD_WARN_UNUSED_RESULT; bool need_flush_write(size_t at_least = 0) { - CHECK(write_); - write_->sync_with_writer(); - return write_->size() > at_least; + return ready_for_flush_write() > at_least; } size_t ready_for_flush_write() { CHECK(write_); write_->sync_with_writer(); return write_->size(); } + void sync_with_poll() { + ::td::sync_with_poll(*this); + } void set_input_writer(ChainBufferWriter *read) { read_ = read; } @@ -99,7 +100,7 @@ template Result BufferedFdBase::flush_read(size_t max_read) { CHECK(read_); size_t result = 0; - while (::td::can_read(*this) && max_read) { + while (::td::can_read_local(*this) && max_read) { MutableSlice slice = read_->prepare_append().truncate(max_read); TRY_RESULT(x, FdT::read(slice)); slice.truncate(x); @@ -115,7 +116,7 @@ Result BufferedFdBase::flush_write() { // TODO: sync on demand write_->sync_with_writer(); size_t result = 0; - while (!write_->empty() && ::td::can_write(*this)) { + while (!write_->empty() && ::td::can_write_local(*this)) { constexpr size_t BUF_SIZE = 20; IoSlice buf[BUF_SIZE]; diff --git a/tdutils/td/utils/BufferedUdp.h b/tdutils/td/utils/BufferedUdp.h index 68031dc290a0..29b47e12dfb9 100644 --- a/tdutils/td/utils/BufferedUdp.h +++ b/tdutils/td/utils/BufferedUdp.h @@ -114,8 +114,11 @@ class BufferedUdp : public UdpSocketFd { } #if TD_PORT_POSIX + void sync_with_poll() { + ::td::sync_with_poll(*this); + } Result> receive() { - if (input_.empty() && can_read(*this)) { + if (input_.empty() && can_read_local(*this)) { TRY_STATUS(flush_read_once()); } if (input_.empty()) { @@ -130,7 +133,7 @@ class BufferedUdp : public UdpSocketFd { Status flush_send() { Status status; - while (status.is_ok() && can_write(*this) && !output_.empty()) { + while (status.is_ok() && can_write_local(*this) && !output_.empty()) { status = flush_send_once(); } return status; diff --git a/tdutils/td/utils/port/ServerSocketFd.cpp b/tdutils/td/utils/port/ServerSocketFd.cpp index cb12330c6077..5e8b34c74f4f 100644 --- a/tdutils/td/utils/port/ServerSocketFd.cpp +++ b/tdutils/td/utils/port/ServerSocketFd.cpp @@ -263,7 +263,7 @@ class ServerSocketFdImpl { } Status get_pending_error() { - if (!get_poll_info().get_flags().has_pending_error()) { + if (!get_poll_info().get_flags_local().has_pending_error()) { return Status::OK(); } TRY_STATUS(detail::get_socket_pending_error(get_native_fd())); diff --git a/tdutils/td/utils/port/SocketFd.cpp b/tdutils/td/utils/port/SocketFd.cpp index c669a586f77a..3b0bcba0809a 100644 --- a/tdutils/td/utils/port/SocketFd.cpp +++ b/tdutils/td/utils/port/SocketFd.cpp @@ -122,7 +122,7 @@ class SocketFdImpl : private Iocp::Callback { } Result read(MutableSlice slice) { - if (get_poll_info().get_flags().has_pending_error()) { + if (get_poll_info().get_flags_local().has_pending_error()) { TRY_STATUS(get_pending_error()); } input_reader_.sync_with_writer(); @@ -435,7 +435,7 @@ class SocketFdImpl { } } Result read(MutableSlice slice) { - if (get_poll_info().get_flags().has_pending_error()) { + if (get_poll_info().get_flags_local().has_pending_error()) { TRY_STATUS(get_pending_error()); } int native_fd = get_native_fd().socket(); @@ -482,7 +482,7 @@ class SocketFdImpl { } } Status get_pending_error() { - if (!get_poll_info().get_flags().has_pending_error()) { + if (!get_poll_info().get_flags_local().has_pending_error()) { return Status::OK(); } TRY_STATUS(detail::get_socket_pending_error(get_native_fd())); diff --git a/tdutils/td/utils/port/StdStreams.cpp b/tdutils/td/utils/port/StdStreams.cpp index e70005e270f3..7510b45a698e 100644 --- a/tdutils/td/utils/port/StdStreams.cpp +++ b/tdutils/td/utils/port/StdStreams.cpp @@ -11,6 +11,7 @@ #include "td/utils/port/detail/Iocp.h" #include "td/utils/port/detail/NativeFd.h" #include "td/utils/port/PollFlags.h" +#include "td/utils/port/detail/PollableFd.h" #include "td/utils/port/thread.h" #include "td/utils/ScopeGuard.h" #include "td/utils/Slice.h" @@ -102,7 +103,7 @@ class BufferedStdinImpl : public Iocp::Callback { } Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT { - info_.get_flags(); + info_.sync_with_poll(); info_.clear_flags(PollFlags::Read()); reader_.sync_with_writer(); return reader_.size(); @@ -196,7 +197,8 @@ class BufferedStdinImpl { Result flush_read(size_t max_read = std::numeric_limits::max()) TD_WARN_UNUSED_RESULT { size_t result = 0; - while (::td::can_read(*this) && max_read) { + ::td::sync_with_poll(*this); + while (::td::can_read_local(*this) && max_read) { MutableSlice slice = writer_.prepare_append().truncate(max_read); TRY_RESULT(x, file_fd_.read(slice)); slice.truncate(x); diff --git a/tdutils/td/utils/port/UdpSocketFd.cpp b/tdutils/td/utils/port/UdpSocketFd.cpp index 2cc7be2b6345..8ba885be00d6 100644 --- a/tdutils/td/utils/port/UdpSocketFd.cpp +++ b/tdutils/td/utils/port/UdpSocketFd.cpp @@ -477,7 +477,7 @@ class UdpSocketFdImpl { return info_.native_fd(); } Status get_pending_error() { - if (!get_poll_info().get_flags().has_pending_error()) { + if (!get_poll_info().get_flags_local().has_pending_error()) { return Status::OK(); } TRY_STATUS(detail::get_socket_pending_error(get_native_fd())); @@ -487,7 +487,7 @@ class UdpSocketFdImpl { Status receive_message(UdpSocketFd::InboundMessage &message, bool &is_received) { is_received = false; int flags = 0; - if (get_poll_info().get_flags().has_pending_error()) { + if (get_poll_info().get_flags_local().has_pending_error()) { #ifdef MSG_ERRQUEUE flags = MSG_ERRQUEUE; #else @@ -679,7 +679,7 @@ class UdpSocketFdImpl { #endif Status receive_messages_slow(MutableSpan messages, size_t &cnt) { cnt = 0; - while (cnt < messages.size() && get_poll_info().get_flags().can_read()) { + while (cnt < messages.size() && get_poll_info().get_flags_local().can_read()) { auto &message = messages[cnt]; CHECK(!message.data.empty()); bool is_received; @@ -694,7 +694,7 @@ class UdpSocketFdImpl { Status receive_messages_fast(MutableSpan messages, size_t &cnt) { int flags = 0; cnt = 0; - if (get_poll_info().get_flags().has_pending_error()) { + if (get_poll_info().get_flags_local().has_pending_error()) { #ifdef MSG_ERRQUEUE flags = MSG_ERRQUEUE; #else diff --git a/tdutils/td/utils/port/detail/EventFdBsd.cpp b/tdutils/td/utils/port/detail/EventFdBsd.cpp index 077e316534ef..25acf1edb834 100644 --- a/tdutils/td/utils/port/detail/EventFdBsd.cpp +++ b/tdutils/td/utils/port/detail/EventFdBsd.cpp @@ -81,8 +81,9 @@ void EventFdBsd::release() { } void EventFdBsd::acquire() { + sync_with_poll(out_); out_.get_poll_info().add_flags(PollFlags::Read()); - while (can_read(out_)) { + while (can_read_local(out_)) { uint8 value[1024]; auto result = out_.read(MutableSlice(value, sizeof(value))); if (result.is_error()) { diff --git a/tdutils/td/utils/port/detail/EventFdLinux.cpp b/tdutils/td/utils/port/detail/EventFdLinux.cpp index f399ca5b2e68..a050e0ec776d 100644 --- a/tdutils/td/utils/port/detail/EventFdLinux.cpp +++ b/tdutils/td/utils/port/detail/EventFdLinux.cpp @@ -85,7 +85,7 @@ void EventFdLinux::release() { } void EventFdLinux::acquire() { - impl_->info.get_flags(); + impl_->info.sync_with_poll(); SCOPE_EXIT { // Clear flags without EAGAIN and EWOULDBLOCK // Looks like it is safe thing to do with eventfd diff --git a/tdutils/td/utils/port/detail/PollableFd.h b/tdutils/td/utils/port/detail/PollableFd.h index c482b8834c32..eee56b4ed470 100644 --- a/tdutils/td/utils/port/detail/PollableFd.h +++ b/tdutils/td/utils/port/detail/PollableFd.h @@ -90,7 +90,10 @@ class PollableFdInfo : private ListNode { void clear_flags(PollFlags flags) { flags_.clear_flags(flags); } - PollFlags get_flags() const { + //PollFlags get_flags() const { + //return flags_.read_flags(); + //} + PollFlags sync_with_poll() const { return flags_.read_flags(); } PollFlags get_flags_local() const { @@ -208,18 +211,23 @@ inline const NativeFd &PollableFd::native_fd() const { } template -bool can_read(const FdT &fd) { - return fd.get_poll_info().get_flags().can_read() || fd.get_poll_info().get_flags().has_pending_error(); +void sync_with_poll(const FdT &fd) { + fd.get_poll_info().sync_with_poll(); } template -bool can_write(const FdT &fd) { - return fd.get_poll_info().get_flags().can_write(); +bool can_read_local(const FdT &fd) { + return fd.get_poll_info().get_flags_local().can_read() || fd.get_poll_info().get_flags_local().has_pending_error(); } template -bool can_close(const FdT &fd) { - return fd.get_poll_info().get_flags().can_close(); +bool can_write_local(const FdT &fd) { + return fd.get_poll_info().get_flags_local().can_write(); +} + +template +bool can_close_local(const FdT &fd) { + return fd.get_poll_info().get_flags_local().can_close(); } } // namespace td diff --git a/tdutils/test/misc.cpp b/tdutils/test/misc.cpp index bb40c5616631..144b5253505b 100644 --- a/tdutils/test/misc.cpp +++ b/tdutils/test/misc.cpp @@ -432,7 +432,8 @@ static void test_to_double() { TEST(Misc, to_double) { test_to_double(); - const char *locale_name = (std::setlocale(LC_ALL, "fr-FR") == nullptr ? "" : "fr-FR"); + const char *locale_name = (std::setlocale(LC_ALL, "fr-FR") == nullptr ? "C" : "fr-FR"); + LOG(ERROR) << locale_name; std::locale new_locale(locale_name); auto host_locale = std::locale::global(new_locale); test_to_double(); diff --git a/test/http.cpp b/test/http.cpp index 15dad9f81a27..b4c2a18d25e2 100644 --- a/test/http.cpp +++ b/test/http.cpp @@ -287,7 +287,7 @@ TEST(Http, aes_file_encryption) { fd.set_input_writer(&input_writer); fd.get_poll_info().add_flags(PollFlags::Read()); - while (can_read(fd)) { + while (can_read_local(fd)) { fd.flush_read(4096).ensure(); source.wakeup(); }