Skip to content

Commit

Permalink
large_bitset: use a chunked_vector internally and simplify API
Browse files Browse the repository at this point in the history
save and load functions for the large_bitset were introduced by Avi with
d590e32.

In that commit, Avi says:

"... providing iterator-based load() and save() methods.  The methods
support partial load/save so that access to very large bitmaps can be
split over multiple tasks."

The only user of this interface is SSTables. And turns out we don't really
split the access like that. What we do instead is to create a chunked vector
and then pass its begin() method with position = 0 and let it write everything.

The problem here is that this require the chunked vector to be fully
initialized, not just reserved. If the bitmap is large enough that in itself
can take a long time without yielding (up to 16ms seen in my setup).

We can simplify things considerably by moving the large_bitset to use a
chunked vector internally: it already uses a poor man's version of it
by allocating chunks internally (it predates the chunked_vector).

By doing that, we can turn save() into a simple copy operation, and do
away with load altogether by adding a new constructor that will just
copy an existing chunked_vector.

Fixes scylladb#3341
Tests: unit (release)

Signed-off-by: Glauber Costa <[email protected]>
Message-Id: <[email protected]>
  • Loading branch information
Glauber Costa authored and avikivity committed Apr 10, 2018
1 parent 252c5df commit b2f9958
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 139 deletions.
7 changes: 7 additions & 0 deletions sstables/disk_types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ struct disk_array {
utils::chunked_vector<Members> elements;
};

template <typename Size, typename Members>
struct disk_array_ref {
static_assert(std::is_integral<Size>::value, "Length type must be convertible to integer");
const utils::chunked_vector<Members>& elements;
disk_array_ref(const utils::chunked_vector<Members>& elements) : elements(elements) {}
};

template <typename Size, typename Key, typename Value>
struct disk_hash {
std::unordered_map<Key, Value, std::hash<Key>> map;
Expand Down
17 changes: 11 additions & 6 deletions sstables/sstables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,14 @@ inline void write(file_writer& out, const disk_array<Size, Members>& arr) {
write(out, arr.elements);
}

template <typename Size, typename Members>
inline void write(file_writer& out, const disk_array_ref<Size, Members>& arr) {
Size len = 0;
check_truncate_and_assign(len, arr.elements.size());
write(out, len);
write(out, arr.elements);
}

template <typename Size, typename Key, typename Value>
future<> parse(random_access_reader& in, Size& len, std::unordered_map<Key, Value>& map) {
return do_with(Size(), [&in, len, &map] (Size& count) {
Expand Down Expand Up @@ -1425,8 +1433,7 @@ future<> sstable::read_filter(const io_priority_class& pc) {
return seastar::async([this, &pc] () mutable {
sstables::filter filter;
read_simple<sstable::component_type::Filter>(filter, pc).get();
large_bitset bs(filter.buckets.elements.size() * 64);
bs.load(filter.buckets.elements.begin(), filter.buckets.elements.end());
large_bitset bs(filter.buckets.elements.size() * sizeof(decltype(filter.buckets.elements)::value_type), std::move(filter.buckets.elements));
_components->filter = utils::filter::create_filter(filter.hashes, std::move(bs));
});
}
Expand All @@ -1439,10 +1446,8 @@ void sstable::write_filter(const io_priority_class& pc) {
auto f = static_cast<utils::filter::murmur3_bloom_filter *>(_components->filter.get());

auto&& bs = f->bits();
utils::chunked_vector<uint64_t> v(align_up(bs.size(), size_t(64)) / 64);
bs.save(v.begin());
auto filter = sstables::filter(f->num_hashes(), std::move(v));
write_simple<sstable::component_type::Filter>(filter, pc);
auto filter_ref = sstables::filter_ref(f->num_hashes(), bs.get_storage());
write_simple<sstable::component_type::Filter>(filter_ref, pc);
}

// This interface is only used during tests, snapshot loading and early initialization.
Expand Down
10 changes: 10 additions & 0 deletions sstables/types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ struct filter {
explicit filter(int hashes, utils::chunked_vector<uint64_t> buckets) : hashes(hashes), buckets({std::move(buckets)}) {}
};

// Do this so we don't have to copy on write time. We can just keep a reference.
struct filter_ref {
uint32_t hashes;
disk_array_ref<uint32_t, uint64_t> buckets;

template <typename Describer>
auto describe_type(Describer f) { return f(hashes, buckets); }
explicit filter_ref(int hashes, const utils::chunked_vector<uint64_t>& buckets) : hashes(hashes), buckets(buckets) {}
};

enum class indexable_element {
partition,
cell
Expand Down
18 changes: 5 additions & 13 deletions utils/large_bitset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@ using namespace seastar;
large_bitset::large_bitset(size_t nr_bits) : _nr_bits(nr_bits) {
assert(thread::running_in_thread());

auto nr_blocks = align_up(nr_bits, bits_per_block()) / bits_per_block();
_storage.reserve(nr_blocks);
size_t nr_ints = align_up(nr_bits, bits_per_int()) / bits_per_int();
_storage.reserve(nr_ints);
while (nr_ints) {
auto now = std::min(ints_per_block(), nr_ints);
_storage.push_back(std::make_unique<int_type[]>(now));
std::fill_n(_storage.back().get(), now, 0);
nr_ints -= now;
_storage.push_back(0);
--nr_ints;
if (need_preempt()) {
thread::yield();
}
Expand All @@ -47,13 +44,8 @@ large_bitset::large_bitset(size_t nr_bits) : _nr_bits(nr_bits) {
void
large_bitset::clear() {
assert(thread::running_in_thread());

size_t nr_ints = align_up(_nr_bits, bits_per_int()) / bits_per_int();
auto bp = _storage.begin();
while (nr_ints) {
auto now = std::min(ints_per_block(), nr_ints);
std::fill_n(bp++->get(), now, 0);
nr_ints -= now;
for (auto&& pos: _storage) {
pos = 0;
if (need_preempt()) {
thread::yield();
}
Expand Down
137 changes: 17 additions & 120 deletions utils/large_bitset.hh
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,20 @@
#include <algorithm>
#include <seastar/core/thread.hh>
#include <seastar/core/preempt.hh>
#include "utils/chunked_vector.hh"

using namespace seastar;

class large_bitset {
static constexpr size_t block_size() { return 128 * 1024; }
using int_type = unsigned long;
using int_type = uint64_t;
static constexpr size_t bits_per_int() {
return std::numeric_limits<int_type>::digits;
}
static constexpr size_t ints_per_block() {
return block_size() / sizeof(int_type);
}
static constexpr size_t bits_per_block() {
return ints_per_block() * bits_per_int();
}
size_t _nr_bits = 0;
std::vector<std::unique_ptr<int_type[]>> _storage;
utils::chunked_vector<int_type> _storage;
public:
explicit large_bitset(size_t nr_bits);
explicit large_bitset(size_t nr_bits, utils::chunked_vector<int_type> storage) : _nr_bits(nr_bits), _storage(std::move(storage)) {}
large_bitset(large_bitset&&) = default;
large_bitset(const large_bitset&) = delete;
large_bitset& operator=(const large_bitset&) = delete;
Expand All @@ -58,128 +53,30 @@ public:
}

size_t memory_size() const {
return block_size() * _storage.size() + sizeof(_nr_bits);
return _storage.size() * sizeof(int_type);
}

bool test(size_t idx) const {
auto idx1 = idx / bits_per_block();
idx %= bits_per_block();
auto idx2 = idx / bits_per_int();
auto idx1 = idx / bits_per_int();
idx %= bits_per_int();
auto idx3 = idx;
return (_storage[idx1][idx2] >> idx3) & 1;
auto idx2 = idx;
return (_storage[idx1] >> idx2) & 1;
}
void set(size_t idx) {
auto idx1 = idx / bits_per_block();
idx %= bits_per_block();
auto idx2 = idx / bits_per_int();
auto idx1 = idx / bits_per_int();
idx %= bits_per_int();
auto idx3 = idx;
_storage[idx1][idx2] |= int_type(1) << idx3;
auto idx2 = idx;
_storage[idx1] |= int_type(1) << idx2;
}
void clear(size_t idx) {
auto idx1 = idx / bits_per_block();
idx %= bits_per_block();
auto idx2 = idx / bits_per_int();
auto idx1 = idx / bits_per_int();
idx %= bits_per_int();
auto idx3 = idx;
_storage[idx1][idx2] &= ~(int_type(1) << idx3);
auto idx2 = idx;
_storage[idx1] &= ~(int_type(1) << idx2);
}
void clear();
// load data from host bitmap (in host byte order); returns end bit position
template <typename IntegerIterator>
size_t load(IntegerIterator start, IntegerIterator finish, size_t position = 0);
template <typename IntegerIterator>
IntegerIterator save(IntegerIterator out, size_t position = 0, size_t n = std::numeric_limits<size_t>::max());
};

template <typename IntegerIterator>
size_t
large_bitset::load(IntegerIterator start, IntegerIterator finish, size_t position) {
assert(thread::running_in_thread());

using input_int_type = typename std::iterator_traits<IntegerIterator>::value_type;
if (position % bits_per_int() == 0 && sizeof(input_int_type) == sizeof(int_type)) {
auto idx = position;
auto idx1 = idx / bits_per_block();
idx %= bits_per_block();
auto idx2 = idx / bits_per_int();
while (start != finish) {
auto now = std::min<size_t>(ints_per_block() - idx2, std::distance(start, finish));
std::copy_n(start, now, _storage[idx1].get() + idx2);
start += now;
++idx1;
idx2 = 0;
if (need_preempt()) {
thread::yield();
}
}
} else {
while (start != finish) {
auto bitmask = *start++;
for (size_t i = 0; i < std::numeric_limits<input_int_type>::digits; ++i) {
if (bitmask & 1) {
set(position);
} else {
clear(position);
}
bitmask >>= 1;
++position;
if (need_preempt()) {
thread::yield();
}
}
}
const utils::chunked_vector<int_type>& get_storage() const {
return _storage;
}
return position;
}

template <typename IntegerIterator>
IntegerIterator
large_bitset::save(IntegerIterator out, size_t position, size_t n) {
assert(thread::running_in_thread());
n = std::min(n, size() - position);
using output_int_type = typename std::iterator_traits<IntegerIterator>::value_type;
if (position % bits_per_int() == 0
&& n % bits_per_int() == 0
&& sizeof(output_int_type) == sizeof(int_type)) {
auto idx = position;
auto idx1 = idx / bits_per_block();
idx %= bits_per_block();
auto idx2 = idx / bits_per_int();
auto n_ints = n / bits_per_int();
while (n_ints) {
auto now = std::min(ints_per_block() - idx2, n_ints);
out = std::copy_n(_storage[idx1].get() + idx2, now, out);
++idx1;
idx2 = 0;
n_ints -= now;
if (need_preempt()) {
thread::yield();
}
}
} else {
output_int_type result = 0;
unsigned bitpos = 0;
while (n) {
result |= output_int_type(test(position)) << bitpos;
++position;
++bitpos;
--n;
if (bitpos == std::numeric_limits<output_int_type>::digits) {
*out = result;
++out;
result = 0;
bitpos = 0;
}
if (need_preempt()) {
thread::yield();
}
}
if (bitpos) {
*out = result;
++out;
}
}
return out;
}
};

0 comments on commit b2f9958

Please sign in to comment.