From 283ac7fefef05c9a3c78f50b4023a2f228c2e169 Mon Sep 17 00:00:00 2001 From: Kamil Braun Date: Tue, 14 Dec 2021 18:19:41 +0100 Subject: [PATCH] treewide: pass mutation timestamp from call sites into `migration_manager::prepare_*` functions The functions which prepare schema change mutations (such as `prepare_new_column_family_announcement`) would use internally generated timestamps for these mutations. When schema changes are managed by group 0 we want to ensure that timestamps of mutations applied through Raft are monotonic. We will generate these timestamps at call sites and pass them into the `prepare_` functions. This commit prepares the APIs. --- alternator/executor.cc | 16 ++--- auth/common.cc | 2 +- auth/service.cc | 2 +- cql3/statements/alter_keyspace_statement.cc | 4 +- cql3/statements/alter_keyspace_statement.hh | 2 +- cql3/statements/alter_table_statement.cc | 4 +- cql3/statements/alter_table_statement.hh | 2 +- cql3/statements/alter_type_statement.cc | 12 ++-- cql3/statements/alter_type_statement.hh | 4 +- cql3/statements/alter_view_statement.cc | 4 +- cql3/statements/alter_view_statement.hh | 2 +- cql3/statements/create_aggregate_statement.cc | 4 +- cql3/statements/create_aggregate_statement.hh | 2 +- cql3/statements/create_function_statement.cc | 4 +- cql3/statements/create_function_statement.hh | 2 +- cql3/statements/create_index_statement.cc | 4 +- cql3/statements/create_index_statement.hh | 2 +- cql3/statements/create_keyspace_statement.cc | 4 +- cql3/statements/create_keyspace_statement.hh | 2 +- cql3/statements/create_table_statement.cc | 4 +- cql3/statements/create_table_statement.hh | 2 +- cql3/statements/create_type_statement.cc | 4 +- cql3/statements/create_type_statement.hh | 2 +- cql3/statements/create_view_statement.cc | 4 +- cql3/statements/create_view_statement.hh | 2 +- cql3/statements/drop_aggregate_statement.cc | 4 +- cql3/statements/drop_aggregate_statement.hh | 2 +- cql3/statements/drop_function_statement.cc | 4 +- cql3/statements/drop_function_statement.hh | 2 +- cql3/statements/drop_index_statement.cc | 4 +- cql3/statements/drop_index_statement.hh | 2 +- cql3/statements/drop_keyspace_statement.cc | 4 +- cql3/statements/drop_keyspace_statement.hh | 2 +- cql3/statements/drop_table_statement.cc | 4 +- cql3/statements/drop_table_statement.hh | 2 +- cql3/statements/drop_type_statement.cc | 4 +- cql3/statements/drop_type_statement.hh | 2 +- cql3/statements/drop_view_statement.cc | 4 +- cql3/statements/drop_view_statement.hh | 2 +- cql3/statements/schema_altering_statement.cc | 2 +- cql3/statements/schema_altering_statement.hh | 2 +- db/system_distributed_keyspace.cc | 4 +- redis/keyspace_utils.cc | 4 +- service/migration_manager.cc | 71 +++++++++---------- service/migration_manager.hh | 34 +++++---- table_helper.cc | 4 +- test/boost/cql_query_test.cc | 2 +- test/boost/database_test.cc | 4 +- test/boost/memtable_test.cc | 2 +- test/boost/schema_change_test.cc | 14 ++-- test/lib/cql_test_env.cc | 2 +- thrift/handler.cc | 12 ++-- 52 files changed, 143 insertions(+), 152 deletions(-) diff --git a/alternator/executor.cc b/alternator/executor.cc index 35a6d564240f..c7cd4844612d 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -500,8 +500,8 @@ future executor::delete_table(client_state& clien throw api_error::resource_not_found(format("Requested resource not found: Table: {} not found", table_name)); } - auto m = co_await mm.prepare_column_family_drop_announcement(keyspace_name, table_name, service::migration_manager::drop_views::yes); - auto m2 = mm.prepare_keyspace_drop_announcement(keyspace_name); + auto m = co_await mm.prepare_column_family_drop_announcement(keyspace_name, table_name, api::new_timestamp(), service::migration_manager::drop_views::yes); + auto m2 = mm.prepare_keyspace_drop_announcement(keyspace_name, api::new_timestamp()); std::move(m2.begin(), m2.end(), std::back_inserter(m)); @@ -755,7 +755,7 @@ future<> update_tags(service::migration_manager& mm, schema_ptr schema, std::map schema_builder builder(s); builder.add_extension(tags_extension::NAME, ::make_shared(tags_map)); - auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector(), std::nullopt); + auto m = co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector(), api::new_timestamp()); co_await mm.announce(std::move(m)); }); @@ -1045,12 +1045,12 @@ static future create_table_on_shard0(tracing::tra // The code should be rewritten in a way that allows creating mutations // for all the changes in a single mutation array before announcing. // See https://github.com/scylladb/scylla/issues/9868 - co_await mm.announce(co_await mm.prepare_new_column_family_announcement(schema)); + co_await mm.announce(co_await mm.prepare_new_column_family_announcement(schema, api::new_timestamp())); std::vector m; co_await parallel_for_each(std::move(view_builders), [&mm, schema, &m] (schema_builder builder) -> future<> { - auto vm = co_await mm.prepare_new_view_announcement(view_ptr(builder.build())); + auto vm = co_await mm.prepare_new_view_announcement(view_ptr(builder.build()), api::new_timestamp()); std::move(vm.begin(), vm.end(), std::back_inserter(m)); }); @@ -1060,7 +1060,7 @@ static future create_table_on_shard0(tracing::tra schema_builder builder(schema); builder.add_extension(tags_extension::NAME, ::make_shared(tags_map)); - co_await mm.announce(co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector(), std::nullopt)); + co_await mm.announce(co_await mm.prepare_column_family_update_announcement(builder.build(), false, std::vector(), api::new_timestamp())); } co_await wait_for_schema_agreement(mm, db::timeout_clock::now() + 10s); @@ -1129,7 +1129,7 @@ future executor::update_table(client_state& clien auto schema = builder.build(); - auto m = co_await mm.prepare_column_family_update_announcement(schema, false, std::vector(), std::nullopt); + auto m = co_await mm.prepare_column_family_update_announcement(schema, false, std::vector(), api::new_timestamp()); co_await mm.announce(std::move(m)); @@ -4183,7 +4183,7 @@ static future> create_keyspace(std::string_view keyspace_n auto opts = get_network_topology_options(gossiper, rf); auto ksm = keyspace_metadata::new_keyspace(keyspace_name_str, "org.apache.cassandra.locator.NetworkTopologyStrategy", std::move(opts), true); - co_return mm.prepare_new_keyspace_announcement(ksm); + co_return mm.prepare_new_keyspace_announcement(ksm, api::new_timestamp()); } future<> executor::start() { diff --git a/auth/common.cc b/auth/common.cc index f8db0faed983..be9fedfa5d0b 100644 --- a/auth/common.cc +++ b/auth/common.cc @@ -70,7 +70,7 @@ static future<> create_metadata_table_if_missing_impl( if (!db.has_schema(table->ks_name(), table->cf_name())) { co_await mm.schema_read_barrier(); try { - co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table)); + co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(table, api::new_timestamp())); } catch (exceptions::already_exists_exception&) {} } } diff --git a/auth/service.cc b/auth/service.cc index aeaf3dd6f5c1..fcbc5efd1557 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -146,7 +146,7 @@ future<> service::create_keyspace_if_missing(::service::migration_manager& mm) c opts, true); - co_return co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm)); + co_return co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm, api::new_timestamp())); } } } diff --git a/cql3/statements/alter_keyspace_statement.cc b/cql3/statements/alter_keyspace_statement.cc index 2302658af6b5..a9335f4aeef6 100644 --- a/cql3/statements/alter_keyspace_statement.cc +++ b/cql3/statements/alter_keyspace_statement.cc @@ -63,12 +63,12 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c } future, std::vector>> -cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp) const { +cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { try { auto old_ksm = qp.db().find_keyspace(_name).metadata(); const auto& tm = *qp.proxy().get_token_metadata_ptr(); - auto m = qp.get_migration_manager().prepare_keyspace_update_announcement(_attrs->as_ks_metadata_update(old_ksm, tm)); + auto m = qp.get_migration_manager().prepare_keyspace_update_announcement(_attrs->as_ks_metadata_update(old_ksm, tm), ts); using namespace cql_transport; auto ret = ::make_shared( diff --git a/cql3/statements/alter_keyspace_statement.hh b/cql3/statements/alter_keyspace_statement.hh index 064082dfff14..bd04ea610a63 100644 --- a/cql3/statements/alter_keyspace_statement.hh +++ b/cql3/statements/alter_keyspace_statement.hh @@ -36,7 +36,7 @@ public: future<> check_access(query_processor& qp, const service::client_state& state) const override; void validate(query_processor& qp, const service::client_state& state) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; virtual future<::shared_ptr> execute(query_processor& qp, service::query_state& state, const query_options& options) const override; }; diff --git a/cql3/statements/alter_table_statement.cc b/cql3/statements/alter_table_statement.cc index 534516cbd19b..908f12872621 100644 --- a/cql3/statements/alter_table_statement.cc +++ b/cql3/statements/alter_table_statement.cc @@ -385,11 +385,11 @@ std::pair> alter_table_statement::prepare_ } future, std::vector>> -alter_table_statement::prepare_schema_mutations(query_processor& qp) const { +alter_table_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { data_dictionary::database db = qp.db(); auto& mm = qp.get_migration_manager(); auto [cfm, view_updates] = prepare_schema_update(db); - auto m = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, std::move(view_updates), std::nullopt); + auto m = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, std::move(view_updates), ts); using namespace cql_transport; auto ret = ::make_shared( diff --git a/cql3/statements/alter_table_statement.hh b/cql3/statements/alter_table_statement.hh index ece01447ea69..d69018366d66 100644 --- a/cql3/statements/alter_table_statement.hh +++ b/cql3/statements/alter_table_statement.hh @@ -58,7 +58,7 @@ public: virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; virtual future<::shared_ptr> execute(query_processor& qp, service::query_state& state, const query_options& options) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; private: void add_column(const schema& schema, data_dictionary::table cf, schema_builder& cfm, std::vector& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const; void alter_column(const schema& schema, data_dictionary::table cf, schema_builder& cfm, std::vector& view_updates, const column_identifier& column_name, const cql3_type validator, const column_definition* def, bool is_static) const; diff --git a/cql3/statements/alter_type_statement.cc b/cql3/statements/alter_type_statement.cc index 21a77f6e39a7..40a94ea8e5e2 100644 --- a/cql3/statements/alter_type_statement.cc +++ b/cql3/statements/alter_type_statement.cc @@ -56,7 +56,7 @@ const sstring& alter_type_statement::keyspace() const return _name.get_keyspace(); } -future> alter_type_statement::prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm) const { +future> alter_type_statement::prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm, api::timestamp_type ts) const { std::vector m; auto&& ks = db.find_keyspace(keyspace()); auto&& all_types = ks.metadata()->user_types().get_all_types(); @@ -78,7 +78,7 @@ future> alter_type_statement::prepare_announcement_mutatio auto&& updated = make_updated_type(db, to_update->second); // Now, we need to announce the type update to basically change it for new tables using this type, // but we also need to find all existing user types and CF using it and change them. - auto res = co_await mm.prepare_update_type_announcement(updated); + auto res = co_await mm.prepare_update_type_announcement(updated, ts); std::move(res.begin(), res.end(), std::back_inserter(m)); for (auto&& schema : ks.metadata()->cf_meta_data() | boost::adaptors::map_values) { @@ -94,10 +94,10 @@ future> alter_type_statement::prepare_announcement_mutatio } if (modified) { if (schema->is_view()) { - auto res = co_await mm.prepare_view_update_announcement(view_ptr(cfm.build())); + auto res = co_await mm.prepare_view_update_announcement(view_ptr(cfm.build()), ts); std::move(res.begin(), res.end(), std::back_inserter(m)); } else { - auto res = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, {}, std::nullopt); + auto res = co_await mm.prepare_column_family_update_announcement(cfm.build(), false, {}, ts); std::move(res.begin(), res.end(), std::back_inserter(m)); } } @@ -107,9 +107,9 @@ future> alter_type_statement::prepare_announcement_mutatio } future, std::vector>> -alter_type_statement::prepare_schema_mutations(query_processor& qp) const { +alter_type_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { try { - auto m = co_await prepare_announcement_mutations(qp.db(), qp.get_migration_manager()); + auto m = co_await prepare_announcement_mutations(qp.db(), qp.get_migration_manager(), ts); using namespace cql_transport; auto ret = ::make_shared( diff --git a/cql3/statements/alter_type_statement.hh b/cql3/statements/alter_type_statement.hh index 0296524d115b..234a9370f2b2 100644 --- a/cql3/statements/alter_type_statement.hh +++ b/cql3/statements/alter_type_statement.hh @@ -44,7 +44,7 @@ public: virtual const sstring& keyspace() const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; class add_or_alter; class renames; @@ -57,7 +57,7 @@ private: virtual future<> operator()(schema_ptr cfm, bool from_thrift, std::vector&& view_updates, std::optional ts_opt) = 0; }; - future> prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm) const; + future> prepare_announcement_mutations(data_dictionary::database db, service::migration_manager& mm, api::timestamp_type) const; }; class alter_type_statement::add_or_alter : public alter_type_statement { diff --git a/cql3/statements/alter_view_statement.cc b/cql3/statements/alter_view_statement.cc index 755888463e72..d8ef2258ffda 100644 --- a/cql3/statements/alter_view_statement.cc +++ b/cql3/statements/alter_view_statement.cc @@ -84,8 +84,8 @@ view_ptr alter_view_statement::prepare_view(data_dictionary::database db) const return view_ptr(builder.build()); } -future, std::vector>> alter_view_statement::prepare_schema_mutations(query_processor& qp) const { - auto m = co_await qp.get_migration_manager().prepare_view_update_announcement(prepare_view(qp.db())); +future, std::vector>> alter_view_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { + auto m = co_await qp.get_migration_manager().prepare_view_update_announcement(prepare_view(qp.db()), ts); using namespace cql_transport; auto ret = ::make_shared( diff --git a/cql3/statements/alter_view_statement.hh b/cql3/statements/alter_view_statement.hh index 1fb82620e9c2..88c3d12ea1ba 100644 --- a/cql3/statements/alter_view_statement.hh +++ b/cql3/statements/alter_view_statement.hh @@ -39,7 +39,7 @@ public: virtual void validate(query_processor&, const service::client_state& state) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; }; diff --git a/cql3/statements/create_aggregate_statement.cc b/cql3/statements/create_aggregate_statement.cc index 9e5812111335..140bd0ae7c2a 100644 --- a/cql3/statements/create_aggregate_statement.cc +++ b/cql3/statements/create_aggregate_statement.cc @@ -57,13 +57,13 @@ std::unique_ptr create_aggregate_statement::prepare(data_dic } future, std::vector>> -create_aggregate_statement::prepare_schema_mutations(query_processor& qp) const { +create_aggregate_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; auto aggregate = dynamic_pointer_cast(validate_while_executing(qp)); if (aggregate) { - m = co_await qp.get_migration_manager().prepare_new_aggregate_announcement(aggregate); + m = co_await qp.get_migration_manager().prepare_new_aggregate_announcement(aggregate, ts); ret = create_schema_change(*aggregate, true); } diff --git a/cql3/statements/create_aggregate_statement.hh b/cql3/statements/create_aggregate_statement.hh index 231d37bc4ddc..4a163150c8e8 100644 --- a/cql3/statements/create_aggregate_statement.hh +++ b/cql3/statements/create_aggregate_statement.hh @@ -24,7 +24,7 @@ namespace statements { class create_aggregate_statement final : public create_function_statement_base { virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual shared_ptr create(query_processor& qp, functions::function* old) const override; diff --git a/cql3/statements/create_function_statement.cc b/cql3/statements/create_function_statement.cc index 337f8dc445a4..9d317e7391bd 100644 --- a/cql3/statements/create_function_statement.cc +++ b/cql3/statements/create_function_statement.cc @@ -64,14 +64,14 @@ std::unique_ptr create_function_statement::prepare(data_dict } future, std::vector>> -create_function_statement::prepare_schema_mutations(query_processor& qp) const { +create_function_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; auto func = dynamic_pointer_cast(validate_while_executing(qp)); if (func) { - m = co_await qp.get_migration_manager().prepare_new_function_announcement(func); + m = co_await qp.get_migration_manager().prepare_new_function_announcement(func, ts); ret = create_schema_change(*func, true); } diff --git a/cql3/statements/create_function_statement.hh b/cql3/statements/create_function_statement.hh index 6c46334e4895..5db8c79aa99e 100644 --- a/cql3/statements/create_function_statement.hh +++ b/cql3/statements/create_function_statement.hh @@ -23,7 +23,7 @@ namespace statements { class create_function_statement final : public create_function_statement_base { virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual shared_ptr create(query_processor& qp, functions::function* old) const override; sstring _language; diff --git a/cql3/statements/create_index_statement.cc b/cql3/statements/create_index_statement.cc index a96fc029b0ef..305370484c2f 100644 --- a/cql3/statements/create_index_statement.cc +++ b/cql3/statements/create_index_statement.cc @@ -299,7 +299,7 @@ schema_ptr create_index_statement::build_index_schema(query_processor& qp) const } future, std::vector>> -create_index_statement::prepare_schema_mutations(query_processor& qp) const { +create_index_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { using namespace cql_transport; auto schema = build_index_schema(qp); @@ -307,7 +307,7 @@ create_index_statement::prepare_schema_mutations(query_processor& qp) const { std::vector m; if (schema) { - m = co_await qp.get_migration_manager().prepare_column_family_update_announcement(std::move(schema), false, {}, std::nullopt); + m = co_await qp.get_migration_manager().prepare_column_family_update_announcement(std::move(schema), false, {}, ts); ret = ::make_shared( event::schema_change::change_type::UPDATED, diff --git a/cql3/statements/create_index_statement.hh b/cql3/statements/create_index_statement.hh index ef459f89b048..f84e6a514318 100644 --- a/cql3/statements/create_index_statement.hh +++ b/cql3/statements/create_index_statement.hh @@ -50,7 +50,7 @@ public: future<> check_access(query_processor& qp, const service::client_state& state) const override; void validate(query_processor&, const service::client_state& state) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; diff --git a/cql3/statements/create_keyspace_statement.cc b/cql3/statements/create_keyspace_statement.cc index b04426d6a8a9..964661c69be3 100644 --- a/cql3/statements/create_keyspace_statement.cc +++ b/cql3/statements/create_keyspace_statement.cc @@ -85,14 +85,14 @@ void create_keyspace_statement::validate(query_processor&, const service::client #endif } -future, std::vector>> create_keyspace_statement::prepare_schema_mutations(query_processor& qp) const { +future, std::vector>> create_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { using namespace cql_transport; const auto& tm = *qp.proxy().get_token_metadata_ptr(); ::shared_ptr ret; std::vector m; try { - m = qp.get_migration_manager().prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm)); + m = qp.get_migration_manager().prepare_new_keyspace_announcement(_attrs->as_ks_metadata(_name, tm), ts); ret = ::make_shared( event::schema_change::change_type::CREATED, diff --git a/cql3/statements/create_keyspace_statement.hh b/cql3/statements/create_keyspace_statement.hh index b53944e6a3ec..e1638583e9ce 100644 --- a/cql3/statements/create_keyspace_statement.hh +++ b/cql3/statements/create_keyspace_statement.hh @@ -67,7 +67,7 @@ public: virtual void validate(query_processor&, const service::client_state& state) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; diff --git a/cql3/statements/create_table_statement.cc b/cql3/statements/create_table_statement.cc index 9dfdf3d885f1..b0ded2fb73e8 100644 --- a/cql3/statements/create_table_statement.cc +++ b/cql3/statements/create_table_statement.cc @@ -77,12 +77,12 @@ std::vector create_table_statement::get_columns() const } future, std::vector>> -create_table_statement::prepare_schema_mutations(query_processor& qp) const { +create_table_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; try { - m = co_await qp.get_migration_manager().prepare_new_column_family_announcement(get_cf_meta_data(qp.db())); + m = co_await qp.get_migration_manager().prepare_new_column_family_announcement(get_cf_meta_data(qp.db()), ts); using namespace cql_transport; ret = ::make_shared( diff --git a/cql3/statements/create_table_statement.hh b/cql3/statements/create_table_statement.hh index 291f402d34ac..8666cb8d0e20 100644 --- a/cql3/statements/create_table_statement.hh +++ b/cql3/statements/create_table_statement.hh @@ -75,7 +75,7 @@ public: virtual void validate(query_processor&, const service::client_state& state) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; diff --git a/cql3/statements/create_type_statement.cc b/cql3/statements/create_type_statement.cc index b64d4b57289f..1ea6cd609b4f 100644 --- a/cql3/statements/create_type_statement.cc +++ b/cql3/statements/create_type_statement.cc @@ -121,13 +121,13 @@ std::optional create_type_statement::make_type(query_processor& qp) c return type; } -future, std::vector>> create_type_statement::prepare_schema_mutations(query_processor& qp) const { +future, std::vector>> create_type_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; try { auto t = make_type(qp); if (t) { - m = co_await qp.get_migration_manager().prepare_new_type_announcement(*t); + m = co_await qp.get_migration_manager().prepare_new_type_announcement(*t, ts); using namespace cql_transport; ret = ::make_shared( diff --git a/cql3/statements/create_type_statement.hh b/cql3/statements/create_type_statement.hh index 06ddbb1751dd..a8a0805730f6 100644 --- a/cql3/statements/create_type_statement.hh +++ b/cql3/statements/create_type_statement.hh @@ -40,7 +40,7 @@ public: virtual const sstring& keyspace() const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; diff --git a/cql3/statements/create_view_statement.cc b/cql3/statements/create_view_statement.cc index feb5bcdf8a60..b1ae8af83c38 100644 --- a/cql3/statements/create_view_statement.cc +++ b/cql3/statements/create_view_statement.cc @@ -322,12 +322,12 @@ view_ptr create_view_statement::prepare_view(data_dictionary::database db) const } future, std::vector>> -create_view_statement::prepare_schema_mutations(query_processor& qp) const { +create_view_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; auto definition = prepare_view(qp.db()); try { - m = co_await qp.get_migration_manager().prepare_new_view_announcement(std::move(definition)); + m = co_await qp.get_migration_manager().prepare_new_view_announcement(std::move(definition), ts); using namespace cql_transport; ret = ::make_shared( event::schema_change::change_type::CREATED, diff --git a/cql3/statements/create_view_statement.hh b/cql3/statements/create_view_statement.hh index a715ec55a7d8..4590028e30a1 100644 --- a/cql3/statements/create_view_statement.hh +++ b/cql3/statements/create_view_statement.hh @@ -57,7 +57,7 @@ public: // Functions we need to override to subclass schema_altering_statement virtual future<> check_access(query_processor& qp, const service::client_state& state) const override; virtual void validate(query_processor&, const service::client_state& state) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; diff --git a/cql3/statements/drop_aggregate_statement.cc b/cql3/statements/drop_aggregate_statement.cc index 45d77556548e..ad9bafeb8cff 100644 --- a/cql3/statements/drop_aggregate_statement.cc +++ b/cql3/statements/drop_aggregate_statement.cc @@ -24,7 +24,7 @@ std::unique_ptr drop_aggregate_statement::prepare(data_dicti } future, std::vector>> -drop_aggregate_statement::prepare_schema_mutations(query_processor& qp) const { +drop_aggregate_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; @@ -34,7 +34,7 @@ drop_aggregate_statement::prepare_schema_mutations(query_processor& qp) const { if (!user_aggr) { throw exceptions::invalid_request_exception(format("'{}' is not a user defined aggregate", func)); } - m = co_await qp.get_migration_manager().prepare_aggregate_drop_announcement(user_aggr); + m = co_await qp.get_migration_manager().prepare_aggregate_drop_announcement(user_aggr, ts); ret = create_schema_change(*func, false); } diff --git a/cql3/statements/drop_aggregate_statement.hh b/cql3/statements/drop_aggregate_statement.hh index dd91715b1a64..c000b2cab5b7 100644 --- a/cql3/statements/drop_aggregate_statement.hh +++ b/cql3/statements/drop_aggregate_statement.hh @@ -15,7 +15,7 @@ class query_processor; namespace statements { class drop_aggregate_statement final : public drop_function_statement_base { virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; public: drop_aggregate_statement(functions::function_name name, std::vector> arg_types, diff --git a/cql3/statements/drop_function_statement.cc b/cql3/statements/drop_function_statement.cc index d779d7f6ff43..05898c9cb146 100644 --- a/cql3/statements/drop_function_statement.cc +++ b/cql3/statements/drop_function_statement.cc @@ -24,7 +24,7 @@ std::unique_ptr drop_function_statement::prepare(data_dictio } future, std::vector>> -drop_function_statement::prepare_schema_mutations(query_processor& qp) const { +drop_function_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; @@ -38,7 +38,7 @@ drop_function_statement::prepare_schema_mutations(query_processor& qp) const { if (auto aggregate = functions::functions::used_by_user_aggregate(user_func->name()); bool(aggregate)) { throw exceptions::invalid_request_exception(format("Cannot delete function {}, as it is used by user-defined aggregate {}", func, *aggregate)); } - m = co_await qp.get_migration_manager().prepare_function_drop_announcement(user_func); + m = co_await qp.get_migration_manager().prepare_function_drop_announcement(user_func, ts); ret = create_schema_change(*func, false); } diff --git a/cql3/statements/drop_function_statement.hh b/cql3/statements/drop_function_statement.hh index f6f37d84c65c..52c47f2aa924 100644 --- a/cql3/statements/drop_function_statement.hh +++ b/cql3/statements/drop_function_statement.hh @@ -15,7 +15,7 @@ class query_processor; namespace statements { class drop_function_statement final : public drop_function_statement_base { virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; public: drop_function_statement(functions::function_name name, std::vector> arg_types, diff --git a/cql3/statements/drop_index_statement.cc b/cql3/statements/drop_index_statement.cc index cb5095da9fdd..a45013df6444 100644 --- a/cql3/statements/drop_index_statement.cc +++ b/cql3/statements/drop_index_statement.cc @@ -76,13 +76,13 @@ schema_ptr drop_index_statement::make_drop_idex_schema(query_processor& qp) cons } future, std::vector>> -drop_index_statement::prepare_schema_mutations(query_processor& qp) const { +drop_index_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; auto cfm = make_drop_idex_schema(qp); if (cfm) { - m = co_await qp.get_migration_manager().prepare_column_family_update_announcement(cfm, false, {}, std::nullopt); + m = co_await qp.get_migration_manager().prepare_column_family_update_announcement(cfm, false, {}, ts); using namespace cql_transport; ret = ::make_shared(event::schema_change::change_type::UPDATED, diff --git a/cql3/statements/drop_index_statement.hh b/cql3/statements/drop_index_statement.hh index e9a93f648e72..a35d30212954 100644 --- a/cql3/statements/drop_index_statement.hh +++ b/cql3/statements/drop_index_statement.hh @@ -47,7 +47,7 @@ public: virtual void validate(query_processor&, const service::client_state& state) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; private: diff --git a/cql3/statements/drop_keyspace_statement.cc b/cql3/statements/drop_keyspace_statement.cc index e856a0a29f97..43a10847a402 100644 --- a/cql3/statements/drop_keyspace_statement.cc +++ b/cql3/statements/drop_keyspace_statement.cc @@ -50,12 +50,12 @@ const sstring& drop_keyspace_statement::keyspace() const } future, std::vector>> -drop_keyspace_statement::prepare_schema_mutations(query_processor& qp) const { +drop_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { std::vector m; ::shared_ptr ret; try { - m = qp.get_migration_manager().prepare_keyspace_drop_announcement(_keyspace); + m = qp.get_migration_manager().prepare_keyspace_drop_announcement(_keyspace, ts); using namespace cql_transport; ret = ::make_shared( diff --git a/cql3/statements/drop_keyspace_statement.hh b/cql3/statements/drop_keyspace_statement.hh index a91d1cb04ea9..653c1e9de183 100644 --- a/cql3/statements/drop_keyspace_statement.hh +++ b/cql3/statements/drop_keyspace_statement.hh @@ -33,7 +33,7 @@ public: virtual const sstring& keyspace() const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; }; diff --git a/cql3/statements/drop_table_statement.cc b/cql3/statements/drop_table_statement.cc index d5474a8ec15e..63108cac70e1 100644 --- a/cql3/statements/drop_table_statement.cc +++ b/cql3/statements/drop_table_statement.cc @@ -48,12 +48,12 @@ void drop_table_statement::validate(query_processor&, const service::client_stat } future, std::vector>> -drop_table_statement::prepare_schema_mutations(query_processor& qp) const { +drop_table_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; try { - m = co_await qp.get_migration_manager().prepare_column_family_drop_announcement(keyspace(), column_family()); + m = co_await qp.get_migration_manager().prepare_column_family_drop_announcement(keyspace(), column_family(), ts); using namespace cql_transport; ret = ::make_shared( diff --git a/cql3/statements/drop_table_statement.hh b/cql3/statements/drop_table_statement.hh index 338698b677e2..1626faacb34a 100644 --- a/cql3/statements/drop_table_statement.hh +++ b/cql3/statements/drop_table_statement.hh @@ -31,7 +31,7 @@ public: virtual void validate(query_processor&, const service::client_state& state) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; diff --git a/cql3/statements/drop_type_statement.cc b/cql3/statements/drop_type_statement.cc index 2592f2ef0834..88c0d2b5a8f5 100644 --- a/cql3/statements/drop_type_statement.cc +++ b/cql3/statements/drop_type_statement.cc @@ -124,7 +124,7 @@ const sstring& drop_type_statement::keyspace() const } future, std::vector>> -drop_type_statement::prepare_schema_mutations(query_processor& qp) const { +drop_type_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { validate_while_executing(qp); data_dictionary::database db = qp.db(); @@ -140,7 +140,7 @@ drop_type_statement::prepare_schema_mutations(query_processor& qp) const { // Can happen with if_exists if (to_drop != all_types.end()) { - m = co_await qp.get_migration_manager().prepare_type_drop_announcement(to_drop->second); + m = co_await qp.get_migration_manager().prepare_type_drop_announcement(to_drop->second, ts); using namespace cql_transport; ret = ::make_shared( diff --git a/cql3/statements/drop_type_statement.hh b/cql3/statements/drop_type_statement.hh index 5a356d16ff40..949a936a32a1 100644 --- a/cql3/statements/drop_type_statement.hh +++ b/cql3/statements/drop_type_statement.hh @@ -34,7 +34,7 @@ public: virtual const sstring& keyspace() const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; diff --git a/cql3/statements/drop_view_statement.cc b/cql3/statements/drop_view_statement.cc index f21742f145ec..9f70502e87fd 100644 --- a/cql3/statements/drop_view_statement.cc +++ b/cql3/statements/drop_view_statement.cc @@ -50,12 +50,12 @@ void drop_view_statement::validate(query_processor&, const service::client_state } future, std::vector>> -drop_view_statement::prepare_schema_mutations(query_processor& qp) const { +drop_view_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const { ::shared_ptr ret; std::vector m; try { - m = co_await qp.get_migration_manager().prepare_view_drop_announcement(keyspace(), column_family()); + m = co_await qp.get_migration_manager().prepare_view_drop_announcement(keyspace(), column_family(), ts); using namespace cql_transport; ret = ::make_shared( diff --git a/cql3/statements/drop_view_statement.hh b/cql3/statements/drop_view_statement.hh index 3bc85502cc3f..3e9698a23a1e 100644 --- a/cql3/statements/drop_view_statement.hh +++ b/cql3/statements/drop_view_statement.hh @@ -37,7 +37,7 @@ public: virtual void validate(query_processor&, const service::client_state& state) const override; - future, std::vector>> prepare_schema_mutations(query_processor& qp) const override; + future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const override; virtual std::unique_ptr prepare(data_dictionary::database db, cql_stats& stats) override; diff --git a/cql3/statements/schema_altering_statement.cc b/cql3/statements/schema_altering_statement.cc index f37c3e3da341..a460265b1241 100644 --- a/cql3/statements/schema_altering_statement.cc +++ b/cql3/statements/schema_altering_statement.cc @@ -78,7 +78,7 @@ schema_altering_statement::execute0(query_processor& qp, service::query_state& s co_await mm.schema_read_barrier(); - auto [ret, m] = co_await prepare_schema_mutations(qp); + auto [ret, m] = co_await prepare_schema_mutations(qp, api::new_timestamp()); if (!m.empty()) { co_await mm.announce(std::move(m)); diff --git a/cql3/statements/schema_altering_statement.hh b/cql3/statements/schema_altering_statement.hh index abc6a110f5e4..474e970c5ede 100644 --- a/cql3/statements/schema_altering_statement.hh +++ b/cql3/statements/schema_altering_statement.hh @@ -61,7 +61,7 @@ protected: virtual void prepare_keyspace(const service::client_state& state) override; - virtual future, std::vector>> prepare_schema_mutations(query_processor& qp) const = 0; + virtual future, std::vector>> prepare_schema_mutations(query_processor& qp, api::timestamp_type) const = 0; virtual future<::shared_ptr> execute(query_processor& qp, service::query_state& state, const query_options& options) const override; diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 89d05b1779d4..f2cc560c7b84 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -253,7 +253,7 @@ future<> system_distributed_keyspace::start() { "org.apache.cassandra.locator.SimpleStrategy", {{"replication_factor", "3"}}, true /* durable_writes */); - co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm)); + co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm, api::new_timestamp())); } catch (exceptions::already_exists_exception&) {} } else { dlogger.info("{} keyspase is already present. Not creating", NAME); @@ -268,7 +268,7 @@ future<> system_distributed_keyspace::start() { "org.apache.cassandra.locator.EverywhereStrategy", {}, true /* durable_writes */); - co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm)); + co_await _mm.announce(_mm.prepare_new_keyspace_announcement(ksm, api::new_timestamp())); } catch (exceptions::already_exists_exception&) {} } else { dlogger.info("{} keyspase is already present. Not creating", NAME_EVERYWHERE); diff --git a/redis/keyspace_utils.cc b/redis/keyspace_utils.cc index b371d7cdb905..da35cdcf18df 100644 --- a/redis/keyspace_utils.cc +++ b/redis/keyspace_utils.cc @@ -196,7 +196,7 @@ future<> create_keyspace_if_not_exists_impl(seastar::sharded create_keyspace_if_not_exists_impl(seastar::sharded migration_manager::prepare_keyspace_update_announcement(lw_shared_ptr ksm) { +std::vector migration_manager::prepare_keyspace_update_announcement(lw_shared_ptr ksm, api::timestamp_type ts) { auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); db.validate_keyspace_update(*ksm); mlogger.info("Update Keyspace: {}", ksm); - return db::schema_tables::make_create_keyspace_mutations(ksm, api::new_timestamp()); + return db::schema_tables::make_create_keyspace_mutations(ksm, ts); } -std::vector migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr ksm) { +std::vector migration_manager::prepare_new_keyspace_announcement(lw_shared_ptr ksm, api::timestamp_type timestamp) { auto& proxy = _storage_proxy; auto& db = proxy.get_db().local(); db.validate_new_keyspace(*ksm); mlogger.info("Create new Keyspace: {}", ksm); - return db::schema_tables::make_create_keyspace_mutations(ksm, api::new_timestamp()); -} - -future> migration_manager::prepare_new_column_family_announcement(schema_ptr cfm) -{ - return prepare_new_column_family_announcement(std::move(cfm), api::new_timestamp()); + return db::schema_tables::make_create_keyspace_mutations(ksm, timestamp); } future> migration_manager::include_keyspace( @@ -652,13 +647,12 @@ future> migration_manager::prepare_new_column_family_annou } } -future> migration_manager::prepare_column_family_update_announcement(schema_ptr cfm, bool from_thrift, std::vector view_updates, std::optional ts_opt) { +future> migration_manager::prepare_column_family_update_announcement(schema_ptr cfm, bool from_thrift, std::vector view_updates, api::timestamp_type ts) { warn(unimplemented::cause::VALIDATION); #if 0 cfm.validate(); #endif try { - auto ts = ts_opt.value_or(api::new_timestamp()); auto& db = _storage_proxy.get_db().local(); auto&& old_schema = db.find_column_family(cfm->ks_name(), cfm->cf_name()).schema(); // FIXME: Should we lookup by id? #if 0 @@ -685,63 +679,63 @@ future> migration_manager::prepare_column_family_update_an } } -future> migration_manager::do_prepare_new_type_announcement(user_type new_type) { +future> migration_manager::do_prepare_new_type_announcement(user_type new_type, api::timestamp_type ts) { auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(new_type->_keyspace); - auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, api::new_timestamp()); + auto mutations = db::schema_tables::make_create_type_mutations(keyspace.metadata(), new_type, ts); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } -future> migration_manager::prepare_new_type_announcement(user_type new_type) { +future> migration_manager::prepare_new_type_announcement(user_type new_type, api::timestamp_type ts) { mlogger.info("Prepare Create new User Type: {}", new_type->get_name_as_string()); - return do_prepare_new_type_announcement(std::move(new_type)); + return do_prepare_new_type_announcement(std::move(new_type), ts); } -future> migration_manager::prepare_update_type_announcement(user_type updated_type) { +future> migration_manager::prepare_update_type_announcement(user_type updated_type, api::timestamp_type ts) { mlogger.info("Prepare Update User Type: {}", updated_type->get_name_as_string()); - return do_prepare_new_type_announcement(updated_type); + return do_prepare_new_type_announcement(updated_type, ts); } -future> migration_manager::prepare_new_function_announcement(shared_ptr func) { +future> migration_manager::prepare_new_function_announcement(shared_ptr func, api::timestamp_type ts) { auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(func->name().keyspace); - auto mutations = db::schema_tables::make_create_function_mutations(func, api::new_timestamp()); + auto mutations = db::schema_tables::make_create_function_mutations(func, ts); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } -future> migration_manager::prepare_function_drop_announcement(shared_ptr func) { +future> migration_manager::prepare_function_drop_announcement(shared_ptr func, api::timestamp_type ts) { auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(func->name().keyspace); - auto mutations = db::schema_tables::make_drop_function_mutations(func, api::new_timestamp()); + auto mutations = db::schema_tables::make_drop_function_mutations(func, ts); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } -future> migration_manager::prepare_new_aggregate_announcement(shared_ptr aggregate) { +future> migration_manager::prepare_new_aggregate_announcement(shared_ptr aggregate, api::timestamp_type ts) { auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(aggregate->name().keyspace); - auto mutations = db::schema_tables::make_create_aggregate_mutations(aggregate, api::new_timestamp()); + auto mutations = db::schema_tables::make_create_aggregate_mutations(aggregate, ts); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } -future> migration_manager::prepare_aggregate_drop_announcement(shared_ptr aggregate) { +future> migration_manager::prepare_aggregate_drop_announcement(shared_ptr aggregate, api::timestamp_type ts) { auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(aggregate->name().keyspace); - auto mutations = db::schema_tables::make_drop_aggregate_mutations(aggregate, api::new_timestamp()); + auto mutations = db::schema_tables::make_drop_aggregate_mutations(aggregate, ts); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } -std::vector migration_manager::prepare_keyspace_drop_announcement(const sstring& ks_name) { +std::vector migration_manager::prepare_keyspace_drop_announcement(const sstring& ks_name, api::timestamp_type ts) { auto& db = _storage_proxy.get_db().local(); if (!db.has_keyspace(ks_name)) { throw exceptions::configuration_exception(format("Cannot drop non existing keyspace '{}'.", ks_name)); } auto& keyspace = db.find_keyspace(ks_name); mlogger.info("Drop Keyspace '{}'", ks_name); - return db::schema_tables::make_drop_keyspace_mutations(keyspace.metadata(), api::new_timestamp()); + return db::schema_tables::make_drop_keyspace_mutations(keyspace.metadata(), ts); } future> migration_manager::prepare_column_family_drop_announcement(const sstring& ks_name, - const sstring& cf_name, drop_views drop_views) { + const sstring& cf_name, api::timestamp_type ts, drop_views drop_views) { try { auto& db = _storage_proxy.get_db().local(); auto& old_cfm = db.find_column_family(ks_name, cf_name); @@ -767,15 +761,14 @@ future> migration_manager::prepare_column_family_drop_anno std::vector drop_si_mutations; if (!schema->all_indices().empty()) { auto builder = schema_builder(schema).without_indexes(); - drop_si_mutations = db::schema_tables::make_update_table_mutations(db, keyspace, schema, builder.build(), api::new_timestamp(), false); + drop_si_mutations = db::schema_tables::make_update_table_mutations(db, keyspace, schema, builder.build(), ts, false); } - auto ts = api::new_timestamp(); auto mutations = db::schema_tables::make_drop_table_mutations(keyspace, schema, ts); mutations.insert(mutations.end(), std::make_move_iterator(drop_si_mutations.begin()), std::make_move_iterator(drop_si_mutations.end())); for (auto& v : views) { if (!old_cfm.get_index_manager().is_index(v)) { mlogger.info("Drop view '{}.{}' of table '{}'", v->ks_name(), v->cf_name(), schema->cf_name()); - auto m = db::schema_tables::make_drop_view_mutations(keyspace, v, api::new_timestamp()); + auto m = db::schema_tables::make_drop_view_mutations(keyspace, v, ts); mutations.insert(mutations.end(), std::make_move_iterator(m.begin()), std::make_move_iterator(m.end())); } } @@ -790,16 +783,16 @@ future> migration_manager::prepare_column_family_drop_anno } } -future> migration_manager::prepare_type_drop_announcement(user_type dropped_type) { +future> migration_manager::prepare_type_drop_announcement(user_type dropped_type, api::timestamp_type ts) { auto& db = _storage_proxy.get_db().local(); auto&& keyspace = db.find_keyspace(dropped_type->_keyspace); mlogger.info("Drop User Type: {}", dropped_type->get_name_as_string()); auto mutations = - db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, api::new_timestamp()); + db::schema_tables::make_drop_type_mutations(keyspace.metadata(), dropped_type, ts); return include_keyspace(*keyspace.metadata(), std::move(mutations)); } -future> migration_manager::prepare_new_view_announcement(view_ptr view) { +future> migration_manager::prepare_new_view_announcement(view_ptr view, api::timestamp_type ts) { #if 0 view.metadata.validate(); #endif @@ -810,14 +803,14 @@ future> migration_manager::prepare_new_view_announcement(v throw exceptions::already_exists_exception(view->ks_name(), view->cf_name()); } mlogger.info("Create new view: {}", view); - auto mutations = db::schema_tables::make_create_view_mutations(keyspace, std::move(view), api::new_timestamp()); + auto mutations = db::schema_tables::make_create_view_mutations(keyspace, std::move(view), ts); co_return co_await include_keyspace(*keyspace, std::move(mutations)); } catch (const replica::no_such_keyspace& e) { co_return coroutine::make_exception(exceptions::configuration_exception(format("Cannot add view '{}' to non existing keyspace '{}'.", view->cf_name(), view->ks_name()))); } } -future> migration_manager::prepare_view_update_announcement(view_ptr view) { +future> migration_manager::prepare_view_update_announcement(view_ptr view, api::timestamp_type ts) { #if 0 view.metadata.validate(); #endif @@ -832,7 +825,7 @@ future> migration_manager::prepare_view_update_announcemen oldCfm.validateCompatility(cfm); #endif mlogger.info("Update view '{}.{}' From {} To {}", view->ks_name(), view->cf_name(), *old_view, *view); - auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), api::new_timestamp(), true); + auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(old_view), std::move(view), ts, true); co_return co_await include_keyspace(*keyspace, std::move(mutations)); } catch (const std::out_of_range& e) { co_return coroutine::make_exception(exceptions::configuration_exception(format("Cannot update non existing materialized view '{}' in keyspace '{}'.", @@ -840,7 +833,7 @@ future> migration_manager::prepare_view_update_announcemen } } -future> migration_manager::prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name) { +future> migration_manager::prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type ts) { auto& db = _storage_proxy.get_db().local(); try { auto& view = db.find_column_family(ks_name, cf_name).schema(); @@ -852,7 +845,7 @@ future> migration_manager::prepare_view_drop_announcement( } auto keyspace = db.find_keyspace(ks_name).metadata(); mlogger.info("Drop view '{}.{}'", view->ks_name(), view->cf_name()); - auto mutations = db::schema_tables::make_drop_view_mutations(keyspace, view_ptr(std::move(view)), api::new_timestamp()); + auto mutations = db::schema_tables::make_drop_view_mutations(keyspace, view_ptr(std::move(view)), ts); return include_keyspace(*keyspace, std::move(mutations)); } catch (const replica::no_such_column_family& e) { throw exceptions::configuration_exception(format("Cannot drop non existing materialized view '{}' in keyspace '{}'.", diff --git a/service/migration_manager.hh b/service/migration_manager.hh index cac05685c023..35c8110e1055 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -101,44 +101,42 @@ public: bool should_pull_schema_from(const gms::inet_address& endpoint); bool has_compatible_schema_tables_version(const gms::inet_address& endpoint); - std::vector prepare_keyspace_update_announcement(lw_shared_ptr ksm); + std::vector prepare_keyspace_update_announcement(lw_shared_ptr ksm, api::timestamp_type); - std::vector prepare_new_keyspace_announcement(lw_shared_ptr ksm); + std::vector prepare_new_keyspace_announcement(lw_shared_ptr ksm, api::timestamp_type); // The timestamp parameter can be used to ensure that all nodes update their internal tables' schemas // with identical timestamps, which can prevent an undeeded schema exchange - future> prepare_column_family_update_announcement(schema_ptr cfm, bool from_thrift, std::vector view_updates, std::optional ts_opt); - - future> prepare_new_column_family_announcement(schema_ptr cfm); + future> prepare_column_family_update_announcement(schema_ptr cfm, bool from_thrift, std::vector view_updates, api::timestamp_type ts); future> prepare_new_column_family_announcement(schema_ptr cfm, api::timestamp_type timestamp); - future> prepare_new_type_announcement(user_type new_type); + future> prepare_new_type_announcement(user_type new_type, api::timestamp_type); - future> prepare_new_function_announcement(shared_ptr func); + future> prepare_new_function_announcement(shared_ptr func, api::timestamp_type); - future> prepare_new_aggregate_announcement(shared_ptr aggregate); + future> prepare_new_aggregate_announcement(shared_ptr aggregate, api::timestamp_type); - future> prepare_function_drop_announcement(shared_ptr func); + future> prepare_function_drop_announcement(shared_ptr func, api::timestamp_type); - future> prepare_aggregate_drop_announcement(shared_ptr aggregate); + future> prepare_aggregate_drop_announcement(shared_ptr aggregate, api::timestamp_type); - future> prepare_update_type_announcement(user_type updated_type); + future> prepare_update_type_announcement(user_type updated_type, api::timestamp_type); - std::vector prepare_keyspace_drop_announcement(const sstring& ks_name); + std::vector prepare_keyspace_drop_announcement(const sstring& ks_name, api::timestamp_type); class drop_views_tag; using drop_views = bool_class; - future> prepare_column_family_drop_announcement(const sstring& ks_name, const sstring& cf_name, drop_views drop_views = drop_views::no); + future> prepare_column_family_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type, drop_views drop_views = drop_views::no); - future> prepare_type_drop_announcement(user_type dropped_type); + future> prepare_type_drop_announcement(user_type dropped_type, api::timestamp_type); - future> prepare_new_view_announcement(view_ptr view); + future> prepare_new_view_announcement(view_ptr view, api::timestamp_type); - future> prepare_view_update_announcement(view_ptr view); + future> prepare_view_update_announcement(view_ptr view, api::timestamp_type); - future> prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name); + future> prepare_view_drop_announcement(const sstring& ks_name, const sstring& cf_name, api::timestamp_type); // the function need to be called if a user wants to access most up-to-date schema state future<> schema_read_barrier(); @@ -168,7 +166,7 @@ private: future<> uninit_messaging_service(); future> include_keyspace(const keyspace_metadata& keyspace, std::vector mutations); - future> do_prepare_new_type_announcement(user_type new_type); + future> do_prepare_new_type_announcement(user_type new_type, api::timestamp_type); future<> push_schema_mutation(const gms::inet_address& endpoint, const std::vector& schema); diff --git a/table_helper.cc b/table_helper.cc index feb540178796..cdd35598e54e 100644 --- a/table_helper.cc +++ b/table_helper.cc @@ -52,7 +52,7 @@ future<> table_helper::setup_table(cql3::query_processor& qp, const sstring& cre // The important thing is that it will converge eventually (some traces may // be lost in a process but that's ok). try { - co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(b.build())); + co_return co_await mm.announce(co_await mm.prepare_new_column_family_announcement(b.build(), api::new_timestamp())); } catch (...) {} } @@ -135,7 +135,7 @@ future<> table_helper::setup_keyspace(cql3::query_processor& qp, std::string_vie std::map opts; opts["replication_factor"] = replication_factor; auto ksm = keyspace_metadata::new_keyspace(keyspace_name, "org.apache.cassandra.locator.SimpleStrategy", std::move(opts), true); - co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm)); + co_await mm.announce(mm.prepare_new_keyspace_announcement(ksm, api::new_timestamp())); } } diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index 892029fb0014..3ca1f58976f6 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -3842,7 +3842,7 @@ SEASTAR_TEST_CASE(test_view_with_two_regular_base_columns_in_key) { schema_ptr view_schema = view_builder.build(); auto& mm = e.migration_manager().local(); - mm.announce(mm.prepare_new_view_announcement(view_ptr(view_schema)).get()).get(); + mm.announce(mm.prepare_new_view_announcement(view_ptr(view_schema), api::new_timestamp()).get()).get(); // Verify that deleting and restoring columns behaves as expected - i.e. the row is deleted and regenerated cquery_nofail(e, "INSERT INTO t (p, c, v1, v2) VALUES (1, 2, 3, 4)"); diff --git a/test/boost/database_test.cc b/test/boost/database_test.cc index 77afd6123316..7982a7490e57 100644 --- a/test/boost/database_test.cc +++ b/test/boost/database_test.cc @@ -144,11 +144,11 @@ static void test_database(void (*run_tests)(populate_fn_ex, bool)) { auto& mm = e.migration_manager().local(); try { e.local_db().find_column_family(s->ks_name(), s->cf_name()); - mm.announce(mm.prepare_column_family_drop_announcement(s->ks_name(), s->cf_name()).get()).get(); + mm.announce(mm.prepare_column_family_drop_announcement(s->ks_name(), s->cf_name(), api::new_timestamp()).get()).get(); } catch (const replica::no_such_column_family&) { // expected } - mm.announce(mm.prepare_new_column_family_announcement(s).get()).get(); + mm.announce(mm.prepare_new_column_family_announcement(s, api::new_timestamp()).get()).get(); replica::column_family& cf = e.local_db().find_column_family(s); for (auto&& m : partitions) { e.local_db().apply(cf.schema(), freeze(m), tracing::trace_state_ptr(), db::commitlog::force_sync::no, db::no_timeout).get(); diff --git a/test/boost/memtable_test.cc b/test/boost/memtable_test.cc index eac4b554e38c..47fda6516b7b 100644 --- a/test/boost/memtable_test.cc +++ b/test/boost/memtable_test.cc @@ -710,7 +710,7 @@ SEASTAR_TEST_CASE(sstable_compaction_does_not_resurrect_data) { .with_column(to_bytes("id"), int32_type) .set_gc_grace_seconds(1) .build(); - mm.announce(mm.prepare_new_column_family_announcement(s).get()).get(); + mm.announce(mm.prepare_new_column_family_announcement(s, api::new_timestamp()).get()).get(); replica::table& t = db.find_column_family(ks_name, table_name); diff --git a/test/boost/schema_change_test.cc b/test/boost/schema_change_test.cc index dee093df20bd..5c0870a89641 100644 --- a/test/boost/schema_change_test.cc +++ b/test/boost/schema_change_test.cc @@ -44,7 +44,7 @@ SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) { auto& mm = e.migration_manager().local(); - mm.announce(mm.prepare_new_column_family_announcement(old_schema).get()).get(); + mm.announce(mm.prepare_new_column_family_announcement(old_schema, api::new_timestamp()).get()).get(); auto old_table_version = e.db().local().find_schema(old_schema->id())->version(); auto old_node_version = e.db().local().get_version(); @@ -52,7 +52,7 @@ SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) { auto new_schema = partial.build(); BOOST_REQUIRE_NE(new_schema->version(), old_schema->version()); - mm.announce(mm.prepare_column_family_update_announcement(new_schema, false, std::vector(), std::nullopt).get()).get(); + mm.announce(mm.prepare_column_family_update_announcement(new_schema, false, std::vector(), api::new_timestamp()).get()).get(); BOOST_REQUIRE_NE(e.db().local().find_schema(old_schema->id())->version(), old_table_version); BOOST_REQUIRE_NE(e.db().local().get_version(), old_node_version); @@ -72,7 +72,7 @@ SEASTAR_TEST_CASE(test_schema_is_updated_in_keyspace) { auto old_schema = builder.build(); auto& mm = e.migration_manager().local(); - mm.announce(mm.prepare_new_column_family_announcement(old_schema).get()).get(); + mm.announce(mm.prepare_new_column_family_announcement(old_schema, api::new_timestamp()).get()).get(); auto s = e.local_db().find_schema(old_schema->id()); BOOST_REQUIRE_EQUAL(*old_schema, *s); @@ -82,7 +82,7 @@ SEASTAR_TEST_CASE(test_schema_is_updated_in_keyspace) { builder.set_gc_grace_seconds(1); auto new_schema = builder.build(); - mm.announce(mm.prepare_column_family_update_announcement(new_schema, false, std::vector(), std::nullopt).get()).get(); + mm.announce(mm.prepare_column_family_update_announcement(new_schema, false, std::vector(), api::new_timestamp()).get()).get(); s = e.local_db().find_schema(old_schema->id()); BOOST_REQUIRE_NE(*old_schema, *s); @@ -104,7 +104,7 @@ SEASTAR_TEST_CASE(test_tombstones_are_ignored_in_version_calculation) { .build(); auto& mm = e.migration_manager().local(); - mm.announce(mm.prepare_new_column_family_announcement(table_schema).get()).get(); + mm.announce(mm.prepare_new_column_family_announcement(table_schema, api::new_timestamp()).get()).get(); auto old_table_version = e.db().local().find_schema(table_schema->id())->version(); auto old_node_version = e.db().local().get_version(); @@ -152,7 +152,7 @@ SEASTAR_TEST_CASE(test_concurrent_column_addition) { .with_column("v2", bytes_type) .build(); - mm.announce(mm.prepare_new_column_family_announcement(s1).get()).get(); + mm.announce(mm.prepare_new_column_family_announcement(s1, api::new_timestamp()).get()).get(); auto old_version = e.db().local().find_schema(s1->id())->version(); // Apply s0 -> s2 change. @@ -295,7 +295,7 @@ SEASTAR_TEST_CASE(test_combined_column_add_and_drop) { .with_column("v1", bytes_type) .build(); - mm.announce(mm.prepare_new_column_family_announcement(s1).get()).get(); + mm.announce(mm.prepare_new_column_family_announcement(s1, api::new_timestamp()).get()).get(); auto&& keyspace = e.db().local().find_keyspace(s1->ks_name()).metadata(); diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index 536f02977e6d..6d7fbaa7726b 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -271,7 +271,7 @@ class single_node_cql_env : public cql_test_env { schema_builder builder(make_lw_shared(schema_maker(ks_name))); builder.set_uuid(id); auto s = builder.build(schema_builder::compact_storage::no); - co_return co_await _mm.local().announce(co_await _mm.local().prepare_new_column_family_announcement(s)); + co_return co_await _mm.local().announce(co_await _mm.local().prepare_new_column_family_announcement(s, api::new_timestamp())); } virtual future<> require_keyspace_exists(const sstring& ks_name) override { diff --git a/thrift/handler.cc b/thrift/handler.cc index acfd012b24ac..14d886829563 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -889,7 +889,7 @@ class thrift_handler : public CassandraCobSvIf { } auto s = schema_from_thrift(cf_def, cf_def.keyspace); - co_return co_await mm.prepare_new_column_family_announcement(std::move(s)); + co_return co_await mm.prepare_new_column_family_announcement(std::move(s), api::new_timestamp()); }); }); } @@ -910,7 +910,7 @@ class thrift_handler : public CassandraCobSvIf { throw make_exception("Cannot drop table with Materialized Views {}", column_family); } - co_return co_await mm.prepare_column_family_drop_announcement(current_keyspace, column_family); + co_return co_await mm.prepare_column_family_drop_announcement(current_keyspace, column_family, api::new_timestamp()); }); }); } @@ -924,7 +924,7 @@ class thrift_handler : public CassandraCobSvIf { co_await t._query_state.get_client_state().has_all_keyspaces_access(auth::permission::CREATE); co_return co_await t.execute_schema_command([&ks_def] (service::migration_manager& mm, data_dictionary::database db) -> future> { - co_return mm.prepare_new_keyspace_announcement(keyspace_from_thrift(ks_def)); + co_return mm.prepare_new_keyspace_announcement(keyspace_from_thrift(ks_def), api::new_timestamp()); }); }); } @@ -943,7 +943,7 @@ class thrift_handler : public CassandraCobSvIf { throw NotFoundException(); } - co_return mm.prepare_keyspace_drop_announcement(keyspace); + co_return mm.prepare_keyspace_drop_announcement(keyspace, api::new_timestamp()); }); }); } @@ -966,7 +966,7 @@ class thrift_handler : public CassandraCobSvIf { } auto ksm = keyspace_from_thrift(ks_def); - co_return mm.prepare_keyspace_update_announcement(std::move(ksm)); + co_return mm.prepare_keyspace_update_announcement(std::move(ksm), api::new_timestamp()); }); }); } @@ -1001,7 +1001,7 @@ class thrift_handler : public CassandraCobSvIf { if (schema->thrift().is_dynamic() != s->thrift().is_dynamic()) { fail(unimplemented::cause::MIXED_CF); } - co_return co_await mm.prepare_column_family_update_announcement(std::move(s), true, std::vector(), std::nullopt); + co_return co_await mm.prepare_column_family_update_announcement(std::move(s), true, std::vector(), api::new_timestamp()); }); }); }