Skip to content

Commit

Permalink
treewide: use shared_sstable, make_sstable in place of lw_shared_ptr<…
Browse files Browse the repository at this point in the history
…sstable>

Since shared_sstable is going to be its own type soon, we can't use the old alias.
  • Loading branch information
avikivity committed Sep 12, 2017
1 parent 1a3cdff commit f702350
Show file tree
Hide file tree
Showing 16 changed files with 115 additions and 113 deletions.
8 changes: 5 additions & 3 deletions compaction_strategy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class sstable_set;
struct compaction_descriptor;
struct resharding_descriptor;

using shared_sstable = lw_shared_ptr<sstable>;

class compaction_strategy {
::shared_ptr<compaction_strategy_impl> _compaction_strategy_impl;
public:
Expand All @@ -54,13 +56,13 @@ public:
compaction_strategy& operator=(compaction_strategy&&);

// Return a list of sstables to be compacted after applying the strategy.
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<lw_shared_ptr<sstable>> candidates);
compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<shared_sstable> candidates);

std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<lw_shared_ptr<sstable>> candidates);
std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<shared_sstable> candidates);

// Some strategies may look at the compacted and resulting sstables to
// get some useful information for subsequent compactions.
void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added);
void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added);

// Return if parallel compaction is allowed by strategy.
bool parallel_compaction() const;
Expand Down
24 changes: 12 additions & 12 deletions database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class single_key_sstable_reader final : public mutation_reader::impl {
}
auto candidates = filter_sstable_for_reader(_sstables->select(_pr), *_cf, _schema, _key, _slice);
return parallel_for_each(std::move(candidates),
[this](const lw_shared_ptr<sstables::sstable>& sstable) {
[this](const sstables::shared_sstable& sstable) {
tracing::trace(_trace_state, "Reading key {} from sstable {}", _pr, seastar::value_of([&sstable] { return sstable->get_filename(); }));
return sstable->read_row(_schema, _pr.start()->value(), _slice, _pc, _fwd).then([this](auto smo) {
if (smo) {
Expand Down Expand Up @@ -751,7 +751,7 @@ static bool belongs_to_other_shard(const std::vector<shard_id>& shards) {
future<sstables::shared_sstable>
column_family::open_sstable(sstables::foreign_sstable_open_info info, sstring dir, int64_t generation,
sstables::sstable::version_types v, sstables::sstable::format_types f) {
auto sst = make_lw_shared<sstables::sstable>(_schema, dir, generation, v, f);
auto sst = sstables::make_sstable(_schema, dir, generation, v, f);
if (!belongs_to_current_shard(info.owners)) {
dblog.debug("sstable {} not relevant for this shard, ignoring", sst->get_filename());
sst->mark_for_deletion();
Expand Down Expand Up @@ -798,7 +798,7 @@ void column_family::update_stats_for_new_sstable(uint64_t disk_space_used_by_sst
}
}

void column_family::add_sstable(lw_shared_ptr<sstables::sstable> sstable, std::vector<unsigned>&& shards_for_the_sstable) {
void column_family::add_sstable(sstables::shared_sstable sstable, std::vector<unsigned>&& shards_for_the_sstable) {
// allow in-progress reads to continue using old list
auto new_sstables = make_lw_shared(*_sstables);
new_sstables->insert(sstable);
Expand Down Expand Up @@ -866,7 +866,7 @@ column_family::seal_active_streaming_memtable_immediate(flush_permit&& permit) {
auto f = current_waiters.get_shared_future(); // for this seal

with_lock(_sstables_lock.for_read(), [this, old, permit = std::move(permit)] () mutable {
auto newtab = make_lw_shared<sstables::sstable>(_schema,
auto newtab = sstables::make_sstable(_schema,
_config.datadir, calculate_generation_for_new_table(),
sstables::sstable::version_types::ka,
sstables::sstable::format_types::big);
Expand Down Expand Up @@ -924,7 +924,7 @@ future<> column_family::seal_active_streaming_memtable_big(streaming_memtable_bi
return with_gate(_streaming_flush_gate, [this, old, &smb, permit = std::move(permit)] () mutable {
return with_gate(smb.flush_in_progress, [this, old, &smb, permit = std::move(permit)] () mutable {
return with_lock(_sstables_lock.for_read(), [this, old, &smb, permit = std::move(permit)] () mutable {
auto newtab = make_lw_shared<sstables::sstable>(_schema,
auto newtab = sstables::make_sstable(_schema,
_config.datadir, calculate_generation_for_new_table(),
sstables::sstable::version_types::ka,
sstables::sstable::format_types::big);
Expand Down Expand Up @@ -999,7 +999,7 @@ future<stop_iteration>
column_family::try_flush_memtable_to_sstable(lw_shared_ptr<memtable> old, sstable_write_permit&& permit) {
auto gen = calculate_generation_for_new_table();

auto newtab = make_lw_shared<sstables::sstable>(_schema,
auto newtab = sstables::make_sstable(_schema,
_config.datadir, gen,
sstables::sstable::version_types::ka,
sstables::sstable::format_types::big);
Expand Down Expand Up @@ -1098,7 +1098,7 @@ distributed_loader::flush_upload_dir(distributed<database>& db, sstring ks_name,
[ks_name, cf_name, comps = pair.second] (database& db) {
auto& cf = db.find_column_family(ks_name, cf_name);

auto sst = make_lw_shared<sstables::sstable>(cf.schema(), cf._config.datadir + "/upload", comps.generation,
auto sst = sstables::make_sstable(cf.schema(), cf._config.datadir + "/upload", comps.generation,
comps.version, comps.format, gc_clock::now(),
[] (disk_error_signal_type&) { return error_handler_for_upload_dir(); });
auto gen = cf.calculate_generation_for_new_table();
Expand Down Expand Up @@ -1151,7 +1151,7 @@ column_family::reshuffle_sstables(std::set<int64_t> all_generations, int64_t sta
if (work.all_generations.count(comps.generation) != 0) {
return make_ready_future<>();
}
auto sst = make_lw_shared<sstables::sstable>(_schema,
auto sst = sstables::make_sstable(_schema,
_config.datadir, comps.generation,
comps.version, comps.format);
work.sstables.emplace(comps.generation, std::move(sst));
Expand Down Expand Up @@ -1348,7 +1348,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool

auto create_sstable = [this] {
auto gen = this->calculate_generation_for_new_table();
auto sst = make_lw_shared<sstables::sstable>(_schema, _config.datadir, gen,
auto sst = sstables::make_sstable(_schema, _config.datadir, gen,
sstables::sstable::version_types::ka,
sstables::sstable::format_types::big);
sst->set_unshared();
Expand All @@ -1362,7 +1362,7 @@ column_family::compact_sstables(sstables::compaction_descriptor descriptor, bool
});
}

static bool needs_cleanup(const lw_shared_ptr<sstables::sstable>& sst,
static bool needs_cleanup(const sstables::shared_sstable& sst,
const dht::token_range_vector& owned_ranges,
schema_ptr s) {
auto first = sst->get_first_partition_key();
Expand Down Expand Up @@ -1592,7 +1592,7 @@ load_sstables_with_open_info(std::vector<sstables::foreign_sstable_open_info> ss
if (!pred(info)) {
return make_ready_future<>();
}
auto sst = make_lw_shared<sstables::sstable>(s, dir, info.generation, info.version, info.format);
auto sst = sstables::make_sstable(s, dir, info.generation, info.version, info.format);
return sst->load(std::move(info)).then([&ssts, sst] {
ssts.push_back(std::move(sst));
return make_ready_future<>();
Expand Down Expand Up @@ -1724,7 +1724,7 @@ void distributed_loader::reshard(distributed<database>& db, sstring ks_name, sst
return cf->calculate_generation_for_new_table();
}).get0();

auto sst = make_lw_shared<sstables::sstable>(cf->schema(), cf->dir(), gen,
auto sst = sstables::make_sstable(cf->schema(), cf->dir(), gen,
sstables::sstable::version_types::ka, sstables::sstable::format_types::big,
gc_clock::now(), default_io_error_handler_gen());
return sst;
Expand Down
6 changes: 3 additions & 3 deletions database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,11 @@ private:
// Cache must be synchronized atomically with this, otherwise write atomicity may not be respected.
// Doesn't trigger compaction.
// Strong exception guarantees.
void add_sstable(lw_shared_ptr<sstables::sstable> sstable, std::vector<unsigned>&& shards_for_the_sstable);
void add_sstable(sstables::shared_sstable sstable, std::vector<unsigned>&& shards_for_the_sstable);
// returns an empty pointer if sstable doesn't belong to current shard.
future<lw_shared_ptr<sstables::sstable>> open_sstable(sstables::foreign_sstable_open_info info, sstring dir,
future<sstables::shared_sstable> open_sstable(sstables::foreign_sstable_open_info info, sstring dir,
int64_t generation, sstables::sstable_version_types v, sstables::sstable_format_types f);
void load_sstable(lw_shared_ptr<sstables::sstable>& sstable, bool reset_level = false);
void load_sstable(sstables::shared_sstable& sstable, bool reset_level = false);
lw_shared_ptr<memtable> new_memtable();
lw_shared_ptr<memtable> new_streaming_memtable();
future<stop_iteration> try_flush_memtable_to_sstable(lw_shared_ptr<memtable> memt, sstable_write_permit&& permit);
Expand Down
4 changes: 2 additions & 2 deletions sstable_mutation_readers.hh
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
#include "mutation_reader.hh"

class sstable_range_wrapping_reader final : public mutation_reader::impl {
lw_shared_ptr<sstables::sstable> _sst;
sstables::shared_sstable _sst;
sstables::mutation_reader _smr;
public:
sstable_range_wrapping_reader(lw_shared_ptr<sstables::sstable> sst,
sstable_range_wrapping_reader(sstables::shared_sstable sst,
schema_ptr s,
const dht::partition_range& pr,
const query::partition_slice& slice,
Expand Down
2 changes: 1 addition & 1 deletion sstables/compaction_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ std::vector<resharding_descriptor> compaction_strategy::get_resharding_jobs(colu
return _compaction_strategy_impl->get_resharding_jobs(cf, std::move(candidates));
}

void compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
void compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
_compaction_strategy_impl->notify_completion(removed, added);
}

Expand Down
2 changes: 1 addition & 1 deletion sstables/compaction_strategy_impl.hh
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public:
virtual ~compaction_strategy_impl() {}
virtual compaction_descriptor get_sstables_for_compaction(column_family& cfs, std::vector<sstables::shared_sstable> candidates) = 0;
virtual std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<sstables::shared_sstable> candidates);
virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) { }
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) { }
virtual compaction_strategy_type type() const = 0;
virtual bool parallel_compaction() const {
return true;
Expand Down
4 changes: 2 additions & 2 deletions sstables/leveled_compaction_strategy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public:

virtual std::vector<resharding_descriptor> get_resharding_jobs(column_family& cf, std::vector<shared_sstable> candidates) override;

virtual void notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) override;
virtual void notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) override;

// for each level > 0, get newest sstable and use its last key as last
// compacted key for the previous level.
Expand Down Expand Up @@ -145,7 +145,7 @@ std::vector<resharding_descriptor> leveled_compaction_strategy::get_resharding_j
return descriptors;
}

void leveled_compaction_strategy::notify_completion(const std::vector<lw_shared_ptr<sstable>>& removed, const std::vector<lw_shared_ptr<sstable>>& added) {
void leveled_compaction_strategy::notify_completion(const std::vector<shared_sstable>& removed, const std::vector<shared_sstable>& added) {
if (removed.empty() || added.empty()) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion sstables/sstables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1380,7 +1380,7 @@ future<> sstable::load(sstables::foreign_sstable_open_info info) {

future<sstable_open_info> sstable::load_shared_components(const schema_ptr& s, sstring dir, int generation, version_types v, format_types f,
const io_priority_class& pc) {
auto sst = make_lw_shared<sstables::sstable>(s, dir, generation, v, f);
auto sst = sstables::make_sstable(s, dir, generation, v, f);
return sst->load(pc).then([sst] () mutable {
auto shards = sst->get_shards_for_this_sstable();
auto info = sstable_open_info{make_lw_shared<shareable_components>(std::move(*sst->_components)),
Expand Down
2 changes: 1 addition & 1 deletion tests/combined_mutation_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ struct sst_factory {
{}

sstables::shared_sstable operator()() {
auto sst = make_lw_shared<sstables::sstable>(s, path, gen, sstables::sstable::version_types::la, sstables::sstable::format_types::big);
auto sst = sstables::make_sstable(s, path, gen, sstables::sstable::version_types::la, sstables::sstable::format_types::big);
sst->set_unshared();

//TODO set sstable level, to make the test more interesting
Expand Down
2 changes: 1 addition & 1 deletion tests/memory_footprint.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ static sizes calculate_sizes(const mutation& m) {
result.query_result = m.query(partition_slice_builder(*s).build(), query::result_request::only_result).buf().size();

tmpdir sstable_dir;
auto sst = make_lw_shared<sstables::sstable>(s,
auto sst = sstables::make_sstable(s,
sstable_dir.path,
1 /* generation */,
sstables::sstable::version_types::la,
Expand Down
4 changes: 2 additions & 2 deletions tests/perf/perf_sstable.hh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private:
std::default_random_engine _generator;
std::uniform_int_distribution<char> _distribution;
lw_shared_ptr<memtable> _mt;
std::vector<lw_shared_ptr<sstable>> _sst;
std::vector<shared_sstable> _sst;

schema_ptr create_schema() {
std::vector<schema::column> columns;
Expand Down Expand Up @@ -116,7 +116,7 @@ public:
}

future<> load_sstables(unsigned iterations) {
_sst.push_back(make_lw_shared<sstable>(s, this->dir(), 0, sstable::version_types::ka, sstable::format_types::big));
_sst.push_back(make_sstable(s, this->dir(), 0, sstable::version_types::ka, sstable::format_types::big));
return _sst.back()->load();
}

Expand Down
Loading

0 comments on commit f702350

Please sign in to comment.