Skip to content

Commit

Permalink
core: Introduce expiring_fifo extracted from semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
tgrabiec committed Nov 22, 2016
1 parent d90ecab commit 80195db
Show file tree
Hide file tree
Showing 5 changed files with 383 additions and 58 deletions.
3 changes: 3 additions & 0 deletions configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ def sanitize_vptr_flag(compiler):
'tests/distributed_test',
'tests/rpc',
'tests/semaphore_test',
'tests/expiring_fifo_test',
'tests/packet_test',
'tests/tls_test',
'tests/fair_queue_test',
Expand Down Expand Up @@ -395,6 +396,7 @@ def have_xen():
'tests/alloc_test': ['tests/alloc_test.cc'] + core,
'tests/foreign_ptr_test': ['tests/foreign_ptr_test.cc'] + core,
'tests/semaphore_test': ['tests/semaphore_test.cc'] + core,
'tests/expiring_fifo_test': ['tests/expiring_fifo_test.cc'] + core,
'tests/smp_test': ['tests/smp_test.cc'] + core,
'tests/thread_test': ['tests/thread_test.cc'] + core,
'tests/thread_context_switch': ['tests/thread_context_switch.cc'] + core,
Expand Down Expand Up @@ -437,6 +439,7 @@ def have_xen():
'tests/alloc_test',
'tests/foreign_ptr_test',
'tests/semaphore_test',
'tests/expiring_fifo_test',
'tests/thread_test',
'tests/tls_test',
'tests/fair_queue_test',
Expand Down
167 changes: 167 additions & 0 deletions core/expiring_fifo.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* This file is open source software, licensed to you under the terms
* of the Apache License, Version 2.0 (the "License"). See the NOTICE file
* distributed with this work for additional information regarding copyright
* ownership. You may not use this file except in compliance with the License.
*
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Copyright (C) 2016 ScyllaDB
*/

#pragma once

#include "future.hh"
#include "chunked_fifo.hh"
#include <stdexcept>
#include <exception>
#include "timer.hh"
#include "future-util.hh"
#include "lowres_clock.hh"

template<typename T>
struct dummy_expiry {
void operator()(T&) noexcept {};
};

template<typename... T>
struct promise_expiry {
void operator()(promise<T...>& pr) noexcept {
pr.set_exception(std::make_exception_ptr(timed_out_error()));
};
};

/// Container for elements with support for expiration of entries.
///
/// OnExpiry is a functor which will be called with a reference to T right before it expires.
/// T is removed and destroyed from the container immediately after OnExpiry returns.
/// OnExpiry callback must not modify the container, it can only modify its argument.
///
/// The container can only be moved before any elements are pushed.
///
template <typename T, typename OnExpiry = dummy_expiry<T>, typename Clock = lowres_clock>
class expiring_fifo {
public:
using clock = Clock;
using time_point = typename Clock::time_point;
private:
struct entry {
std::experimental::optional<T> payload; // disengaged means that it's expired
timer<Clock> tr;
entry(T&& payload_) : payload(std::move(payload_)) {}
entry(const T& payload_) : payload(payload_) {}
entry(T payload_, expiring_fifo& ef, time_point timeout)
: payload(std::move(payload_))
, tr([this, &ef] {
ef._on_expiry(*payload);
payload = std::experimental::nullopt;
--ef._size;
ef.drop_expired_front();
})
{
tr.arm(timeout);
}
entry(entry&& x) = delete;
entry(const entry& x) = delete;
};

// There is an invariant that the front element is never expired.
chunked_fifo<entry> _list;
OnExpiry _on_expiry;
size_t _size = 0;

// Ensures that front() is not expired by dropping expired elements from the front.
void drop_expired_front() {
while (!_list.empty() && !_list.front().payload) {
_list.pop_front();
}
}
public:
expiring_fifo() = default;
expiring_fifo(OnExpiry on_expiry) : _on_expiry(std::move(on_expiry)) {}

/// Checks if container contains any elements
///
/// \note Inside OnExpiry callback, the expired element is still contained.
///
/// \return true if and only if there are any elements contained.
bool empty() const {
return _size == 0;
}

/// Equivalent to !empty()
explicit operator bool() const {
return !empty();
}

/// Returns a reference to the element in the front.
/// Valid only when !empty().
T& front() {
return *_list.front().payload;
}

/// Returns a reference to the element in the front.
/// Valid only when !empty().
const T& front() const {
return *_list.front().payload;
}

/// Returns the number of elements contained.
///
/// \note Expired elements are not contained. Expiring element is still contained when OnExpiry is called.
size_t size() const {
return _size;
}

/// Reserves storage in the container for at least 'size' elements.
/// Note that expired elements may also take space when they are not in the front of the queue.
///
/// Doesn't give any guarantees about exception safety of subsequent push_back().
void reserve(size_t size) {
return _list.reserve(size);
}

/// Adds element to the back of the queue.
/// The element will never expire.
void push_back(const T& payload) {
_list.emplace_back(payload);
++_size;
}

/// Adds element to the back of the queue.
/// The element will never expire.
void push_back(T&& payload) {
_list.emplace_back(std::move(payload));
++_size;
}

/// Adds element to the back of the queue.
/// The element will expire when timeout is reached, unless it is time_point::max(), in which
/// case it never expires.
void push_back(T payload, time_point timeout) {
if (timeout < time_point::max()) {
_list.emplace_back(std::move(payload), *this, timeout);
} else {
_list.emplace_back(std::move(payload));
}
++_size;
}

/// Removes the element at the front.
/// Can be called only if !empty().
void pop_front() {
_list.pop_front();
--_size;
drop_expired_front();
}
};
81 changes: 23 additions & 58 deletions core/semaphore.hh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <stdexcept>
#include <exception>
#include "timer.hh"
#include "expiring_fifo.hh"

/// \addtogroup fiber-module
/// @{
Expand Down Expand Up @@ -86,41 +87,31 @@ struct semaphore_default_exception_factory {
/// exception object.
template<typename ExceptionFactory>
class basic_semaphore {
public:
using duration = timer<>::duration;
using clock = timer<>::clock;
using time_point = timer<>::time_point;
private:
ssize_t _count;
std::exception_ptr _ex;
struct entry {
promise<> pr;
size_t nr;
timer<> tr;
// points at pointer back to this, to track the entry object as it moves
std::unique_ptr<entry*> tracker;
entry(promise<>&& pr_, size_t nr_) : pr(std::move(pr_)), nr(nr_) {}
entry(entry&& x) noexcept
: pr(std::move(x.pr)), nr(x.nr), tr(std::move(x.tr)), tracker(std::move(x.tracker)) {
if (tracker) {
*tracker = this;
}
}
entry** track() {
tracker = std::make_unique<entry*>(this);
return tracker.get();
};
struct expiry_handler {
void operator()(entry& e) noexcept {
e.pr.set_exception(ExceptionFactory::timeout());
}
entry& operator=(entry&&) noexcept = delete;
};
chunked_fifo<entry> _wait_list;

expiring_fifo<entry, expiry_handler, clock> _wait_list;
bool has_available_units(size_t nr) const {
return _count >= 0 && (static_cast<size_t>(_count) >= nr);
}
bool may_proceed(size_t nr) const {
return has_available_units(nr) && _wait_list.empty();
}
public:
using duration = timer<>::duration;
using clock = timer<>::clock;
using time_point = timer<>::time_point;

/// Returns the maximum number of units the semaphore counter can hold
static constexpr size_t max_counter() {
return std::numeric_limits<decltype(_count)>::max();
Expand All @@ -143,17 +134,7 @@ public:
/// to satisfy the request. If the semaphore was \ref broken(), may
/// contain an exception.
future<> wait(size_t nr = 1) {
if (may_proceed(nr)) {
_count -= nr;
return make_ready_future<>();
}
if (_ex) {
return make_exception_future(_ex);
}
promise<> pr;
auto fut = pr.get_future();
_wait_list.push_back(entry(std::move(pr), nr));
return fut;
return wait(time_point::max(), nr);
}
/// Waits until at least a specific number of units are available in the
/// counter, and reduces the counter by that amount of units. If the request
Expand All @@ -169,29 +150,17 @@ public:
/// \ref semaphore_timed_out exception. If the semaphore was
/// \ref broken(), may contain an exception.
future<> wait(time_point timeout, size_t nr = 1) {
auto fut = wait(nr);
if (!fut.available()) {
auto cancel = [this] (entry** e) {
(*e)->nr = 0;
(*e)->tracker = nullptr;
signal(0);
};

// Since circular_buffer<> can cause objects to move around,
// track them via entry::tracker
entry** e = _wait_list.back().track();
try {
(*e)->tr.set_callback([e, cancel] {
(*e)->pr.set_exception(ExceptionFactory::timeout());
cancel(e);
});
(*e)->tr.arm(timeout);
} catch (...) {
(*e)->pr.set_exception(std::current_exception());
cancel(e);
}
if (may_proceed(nr)) {
_count -= nr;
return make_ready_future<>();
}
return std::move(fut);
if (_ex) {
return make_exception_future(_ex);
}
promise<> pr;
auto fut = pr.get_future();
_wait_list.push_back(entry(std::move(pr), nr), timeout);
return fut;
}

/// Waits until at least a specific number of units are available in the
Expand Down Expand Up @@ -226,11 +195,8 @@ public:
_count += nr;
while (!_wait_list.empty() && has_available_units(_wait_list.front().nr)) {
auto& x = _wait_list.front();
if (x.nr) {
_count -= x.nr;
x.pr.set_value();
x.tr.cancel();
}
_count -= x.nr;
x.pr.set_value();
_wait_list.pop_front();
}
}
Expand Down Expand Up @@ -308,7 +274,6 @@ basic_semaphore<ExceptionFactory>::broken(std::exception_ptr xp) {
while (!_wait_list.empty()) {
auto& x = _wait_list.front();
x.pr.set_exception(xp);
x.tr.cancel();
_wait_list.pop_front();
}
}
Expand Down
1 change: 1 addition & 0 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
'fstream_test',
'foreign_ptr_test',
'semaphore_test',
'expiring_fifo_test',
'shared_ptr_test',
'weak_ptr_test',
'fileiotest',
Expand Down
Loading

0 comments on commit 80195db

Please sign in to comment.