Skip to content

Commit

Permalink
treewide: pass mutation timestamp from call sites into `migration_man…
Browse files Browse the repository at this point in the history
…ager::prepare_*` functions

The functions which prepare schema change mutations (such as
`prepare_new_column_family_announcement`) would use internally
generated timestamps for these mutations. When schema changes are
managed by group 0 we want to ensure that timestamps of mutations
applied through Raft are monotonic. We will generate these timestamps at
call sites and pass them into the `prepare_` functions. This commit
prepares the APIs.
  • Loading branch information
kbr-scylla committed Jan 24, 2022
1 parent f97edb1 commit 283ac7f
Show file tree
Hide file tree
Showing 52 changed files with 143 additions and 152 deletions.
16 changes: 8 additions & 8 deletions alternator/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,8 @@ future<executor::request_return_type> executor::delete_table(client_state& clien
throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name));
}

auto m = co_await mm.prepare_column_family_drop_announcement(keyspace_name, table_name, service::migration_manager::drop_views::yes);
auto m2 = mm.prepare_keyspace_drop_announcement(keyspace_name);
auto m = co_await mm.prepare_column_family_drop_announcement(keyspace_name, table_name, api::new_timestamp(), service::migration_manager::drop_views::yes);
auto m2 = mm.prepare_keyspace_drop_announcement(keyspace_name, api::new_timestamp());

std::move(m2.begin(), m2.end(), std::back_inserter(m));

Expand Down Expand Up @@ -755,7 +755,7 @@ future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map
schema_builder builder(s);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));

auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), std::nullopt);
auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), api::new_timestamp());

co_await mm.announce(std::move(m));
});
Expand Down Expand Up @@ -1045,12 +1045,12 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
// The code should be rewritten in a way that allows creating mutations
// for all the changes in a single mutation array before announcing.
// See https://github.com/scylladb/scylla/issues/9868
co_await mm.announce(co_await mm.prepare_new_column_family_announcement(schema));
co_await mm.announce(co_await mm.prepare_new_column_family_announcement(schema, api::new_timestamp()));

std::vector<mutation> m;

co_await parallel_for_each(std::move(view_builders), [&mm, schema, &m] (schema_builder builder) -> future<> {
auto vm = co_await mm.prepare_new_view_announcement(view_ptr(builder.build()));
auto vm = co_await mm.prepare_new_view_announcement(view_ptr(builder.build()), api::new_timestamp());
std::move(vm.begin(), vm.end(), std::back_inserter(m));
});

Expand All @@ -1060,7 +1060,7 @@ static future<executor::request_return_type> create_table_on_shard0(tracing::tra
schema_builder builder(schema);
builder.add_extension(tags_extension::NAME, ::make_shared<tags_extension>(tags_map));

co_await mm.announce(co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), std::nullopt));
co_await mm.announce(co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector<view_ptr>(), api::new_timestamp()));
}

co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s);
Expand Down Expand Up @@ -1129,7 +1129,7 @@ future<executor::request_return_type> executor::update_table(client_state& clien

auto schema = builder.build();

auto m = co_await mm.prepare_column_family_update_announcement(schema, false, std::vector<view_ptr>(), std::nullopt);
auto m = co_await mm.prepare_column_family_update_announcement(schema, false, std::vector<view_ptr>(), api::new_timestamp());

co_await mm.announce(std::move(m));

Expand Down Expand Up @@ -4183,7 +4183,7 @@ static future<std::vector<mutation>> create_keyspace(std::string_view keyspace_n
auto opts = get_network_topology_options(gossiper, rf);
auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true);

co_return mm.prepare_new_keyspace_announcement(ksm);
co_return mm.prepare_new_keyspace_announcement(ksm, api::new_timestamp());
}

future<> executor::start() {
Expand Down
2 changes: 1 addition & 1 deletion auth/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ static future<> create_metadata_table_if_missing_impl(
if (!db.has_schema(table->ks_name(), table->cf_name())) {
co_await mm.schema_read_barrier();
try {
co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table));
co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table, api::new_timestamp()));
} catch (exceptions::already_exists_exception&) {}
}
}
Expand Down
2 changes: 1 addition & 1 deletion auth/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ future<> service::create_keyspace_if_missing(::service::migration_manager& mm) c
opts,
true);

co_return co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm));
co_return co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm, api::new_timestamp()));
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cql3/statements/alter_keyspace_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
}

future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>>
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp) const {
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
try {
auto old_ksm = qp.db().find_keyspace(_name).metadata();
const auto& tm = *qp.proxy().get_token_metadata_ptr();

auto m = qp.get_migration_manager().prepare_keyspace_update_announcement(_attrs->as_ks_metadata_update(old_ksm, tm));
auto m = qp.get_migration_manager().prepare_keyspace_update_announcement(_attrs->as_ks_metadata_update(old_ksm, tm), ts);

using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/alter_keyspace_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public:

future<> check_access(query_processor& qp, const service::client_state& state) const override;
void validate(query_processor& qp, const service::client_state& state) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
virtual future<::shared_ptr<messages::result_message>> execute(query_processor& qp, service::query_state& state, const query_options& options) const override;
};
Expand Down
4 changes: 2 additions & 2 deletions cql3/statements/alter_table_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,11 @@ std::pair<schema_builder, std::vector<view_ptr>> alter_table_statement::prepare_
}

future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>>
alter_table_statement::prepare_schema_mutations(query_processor& qp) const {
alter_table_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
data_dictionary::database db = qp.db();
auto& mm = qp.get_migration_manager();
auto [cfm, view_updates] = prepare_schema_update(db);
auto m = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, std::move(view_updates), std::nullopt);
auto m = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, std::move(view_updates), ts);

using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/alter_table_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public:
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
virtual future<::shared_ptr<messages::result_message>> execute(query_processor& qp, service::query_state& state, const query_options& options) const override;

future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;
private:
void add_column(const schema& schema, data_dictionary::table cf, schema_builder& cfm, std::vector<view_ptr>& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const;
void alter_column(const schema& schema, data_dictionary::table cf, schema_builder& cfm, std::vector<view_ptr>& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const;
Expand Down
12 changes: 6 additions & 6 deletions cql3/statements/alter_type_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const sstring& alter_type_statement::keyspace() const
return _name.get_keyspace();
}

future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm) const {
future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm, api::timestamp_type ts) const {
std::vector<mutation> m;
auto&& ks = db.find_keyspace(keyspace());
auto&& all_types = ks.metadata()->user_types().get_all_types();
Expand All @@ -78,7 +78,7 @@ future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutatio
auto&& updated = make_updated_type(db, to_update->second);
// Now, we need to announce the type update to basically change it for new tables using this type,
// but we also need to find all existing user types and CF using it and change them.
auto res = co_await mm.prepare_update_type_announcement(updated);
auto res = co_await mm.prepare_update_type_announcement(updated, ts);
std::move(res.begin(), res.end(), std::back_inserter(m));

for (auto&& schema : ks.metadata()->cf_meta_data() | boost::adaptors::map_values) {
Expand All @@ -94,10 +94,10 @@ future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutatio
}
if (modified) {
if (schema->is_view()) {
auto res = co_await mm.prepare_view_update_announcement(view_ptr(cfm.build()));
auto res = co_await mm.prepare_view_update_announcement(view_ptr(cfm.build()), ts);
std::move(res.begin(), res.end(), std::back_inserter(m));
} else {
auto res = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, {}, std::nullopt);
auto res = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, {}, ts);
std::move(res.begin(), res.end(), std::back_inserter(m));
}
}
Expand All @@ -107,9 +107,9 @@ future<std::vector<mutation>> alter_type_statement::prepare_announcement_mutatio
}

future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>>
alter_type_statement::prepare_schema_mutations(query_processor& qp) const {
alter_type_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
try {
auto m = co_await prepare_announcement_mutations(qp.db(), qp.get_migration_manager());
auto m = co_await prepare_announcement_mutations(qp.db(), qp.get_migration_manager(), ts);

using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(
Expand Down
4 changes: 2 additions & 2 deletions cql3/statements/alter_type_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public:
virtual const sstring& keyspace() const override;


future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;

class add_or_alter;
class renames;
Expand All @@ -57,7 +57,7 @@ private:
virtual future<> operator()(schema_ptr cfm, bool from_thrift, std::vector<view_ptr>&& view_updates, std::optional<api::timestamp_type> ts_opt) = 0;
};

future<std::vector<mutation>> prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm) const;
future<std::vector<mutation>> prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm, api::timestamp_type) const;
};

class alter_type_statement::add_or_alter : public alter_type_statement {
Expand Down
4 changes: 2 additions & 2 deletions cql3/statements/alter_view_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const
return view_ptr(builder.build());
}

future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> alter_view_statement::prepare_schema_mutations(query_processor& qp) const {
auto m = co_await qp.get_migration_manager().prepare_view_update_announcement(prepare_view(qp.db()));
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> alter_view_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
auto m = co_await qp.get_migration_manager().prepare_view_update_announcement(prepare_view(qp.db()), ts);

using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/alter_view_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public:
virtual void validate(query_processor&, const service::client_state& state) const override;


future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;

virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
};
Expand Down
4 changes: 2 additions & 2 deletions cql3/statements/create_aggregate_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ std::unique_ptr<prepared_statement> create_aggregate_statement::prepare(data_dic
}

future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>>
create_aggregate_statement::prepare_schema_mutations(query_processor& qp) const {
create_aggregate_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;

auto aggregate = dynamic_pointer_cast<functions::user_aggregate>(validate_while_executing(qp));
if (aggregate) {
m = co_await qp.get_migration_manager().prepare_new_aggregate_announcement(aggregate);
m = co_await qp.get_migration_manager().prepare_new_aggregate_announcement(aggregate, ts);
ret = create_schema_change(*aggregate, true);
}

Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/create_aggregate_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace statements {

class create_aggregate_statement final : public create_function_statement_base {
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;

virtual shared_ptr<functions::function> create(query_processor& qp, functions::function* old) const override;

Expand Down
4 changes: 2 additions & 2 deletions cql3/statements/create_function_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ std::unique_ptr<prepared_statement> create_function_statement::prepare(data_dict
}

future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>>
create_function_statement::prepare_schema_mutations(query_processor& qp) const {
create_function_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
::shared_ptr<cql_transport::event::schema_change> ret;
std::vector<mutation> m;

auto func = dynamic_pointer_cast<functions::user_function>(validate_while_executing(qp));

if (func) {
m = co_await qp.get_migration_manager().prepare_new_function_announcement(func);
m = co_await qp.get_migration_manager().prepare_new_function_announcement(func, ts);
ret = create_schema_change(*func, true);
}

Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/create_function_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace statements {

class create_function_statement final : public create_function_statement_base {
virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;

virtual shared_ptr<functions::function> create(query_processor& qp, functions::function* old) const override;
sstring _language;
Expand Down
4 changes: 2 additions & 2 deletions cql3/statements/create_index_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -299,15 +299,15 @@ schema_ptr create_index_statement::build_index_schema(query_processor& qp) const
}

future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>>
create_index_statement::prepare_schema_mutations(query_processor& qp) const {
create_index_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
using namespace cql_transport;
auto schema = build_index_schema(qp);

::shared_ptr<event::schema_change> ret;
std::vector<mutation> m;

if (schema) {
m = co_await qp.get_migration_manager().prepare_column_family_update_announcement(std::move(schema), false, {}, std::nullopt);
m = co_await qp.get_migration_manager().prepare_column_family_update_announcement(std::move(schema), false, {}, ts);

ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/create_index_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public:

future<> check_access(query_processor& qp, const service::client_state& state) const override;
void validate(query_processor&, const service::client_state& state) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;


virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;
Expand Down
4 changes: 2 additions & 2 deletions cql3/statements/create_keyspace_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,14 @@ void create_keyspace_statement::validate(query_processor&, const service::client
#endif
}

future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> create_keyspace_statement::prepare_schema_mutations(query_processor& qp) const {
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> create_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
using namespace cql_transport;
const auto& tm = *qp.proxy().get_token_metadata_ptr();
::shared_ptr<event::schema_change> ret;
std::vector<mutation> m;

try {
m = qp.get_migration_manager().prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm));
m = qp.get_migration_manager().prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm), ts);

ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::CREATED,
Expand Down
2 changes: 1 addition & 1 deletion cql3/statements/create_keyspace_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public:
virtual void validate(query_processor&, const service::client_state& state) const override;


future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp) const override;
future<std::pair<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override;

virtual std::unique_ptr<prepared_statement> prepare(data_dictionary::database db, cql_stats& stats) override;

Expand Down
Loading

0 comments on commit 283ac7f

Please sign in to comment.