forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.cc
88 lines (80 loc) · 4.09 KB
/
consumer.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#include <seastar/core/coroutine.hh>
#include "consumer.hh"
#include "replica/database.hh"
#include "mutation/mutation_source_metadata.hh"
#include "service/priority_manager.hh"
#include "db/view/view_update_generator.hh"
#include "db/view/view_update_checks.hh"
#include "sstables/sstables.hh"
#include "sstables/sstables_manager.hh"
namespace streaming {
std::function<future<> (flat_mutation_reader_v2)> make_streaming_consumer(sstring origin,
sharded<replica::database>& db,
sharded<db::system_distributed_keyspace>& sys_dist_ks,
sharded<db::view::view_update_generator>& vug,
uint64_t estimated_partitions,
stream_reason reason,
sstables::offstrategy offstrategy) {
return [&db, &sys_dist_ks, &vug, estimated_partitions, reason, offstrategy, origin = std::move(origin)] (flat_mutation_reader_v2 reader) -> future<> {
std::exception_ptr ex;
try {
auto cf = db.local().find_column_family(reader.schema()).shared_from_this();
auto use_view_update_path = co_await db::view::check_needs_view_update_path(sys_dist_ks.local(), db.local().get_token_metadata(), *cf, reason);
//FIXME: for better estimations this should be transmitted from remote
auto metadata = mutation_source_metadata{};
auto& cs = cf->get_compaction_strategy();
const auto adjusted_estimated_partitions = cs.adjust_partition_estimate(metadata, estimated_partitions);
auto make_interposer_consumer = [&cs, offstrategy] (const mutation_source_metadata& ms_meta, reader_consumer_v2 end_consumer) mutable {
// postpone data segregation to off-strategy compaction if enabled
if (offstrategy) {
return end_consumer;
}
return cs.make_interposer_consumer(ms_meta, std::move(end_consumer));
};
auto consumer = make_interposer_consumer(metadata,
[cf = std::move(cf), adjusted_estimated_partitions, use_view_update_path, &vug, origin = std::move(origin), offstrategy] (flat_mutation_reader_v2 reader) {
sstables::shared_sstable sst;
try {
sst = use_view_update_path ? cf->make_streaming_staging_sstable() : cf->make_streaming_sstable_for_write();
} catch (...) {
return current_exception_as_future().finally([reader = std::move(reader)] () mutable {
return reader.close();
});
}
schema_ptr s = reader.schema();
auto& pc = service::get_local_streaming_priority();
return sst->write_components(std::move(reader), adjusted_estimated_partitions, s,
cf->get_sstables_manager().configure_writer(origin),
encoding_stats{}, pc).then([sst] {
return sst->open_data();
}).then([cf, sst, offstrategy, origin] {
if (offstrategy && sstables::repair_origin == origin) {
sstables::sstlog.debug("Enabled automatic off-strategy trigger for table {}.{}",
cf->schema()->ks_name(), cf->schema()->cf_name());
cf->enable_off_strategy_trigger();
}
return cf->add_sstable_and_update_cache(sst, offstrategy);
}).then([cf, s, sst, use_view_update_path, &vug]() mutable -> future<> {
if (!use_view_update_path) {
return make_ready_future<>();
}
return vug.local().register_staging_sstable(sst, std::move(cf));
});
});
co_return co_await consumer(std::move(reader));
} catch (...) {
ex = std::current_exception();
}
if (ex) {
co_await reader.close();
std::rethrow_exception(std::move(ex));
}
};
}
}