From 126191ade8d2285ec80d8607bb4d48c2cd4034c1 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Tue, 28 Feb 2017 16:53:33 -0800 Subject: [PATCH] feat(c*) run migrations from a single coordinator --- kong-0.10.0rc4-0.rockspec | 2 +- kong/dao/db/cassandra.lua | 51 +++++++++++++++++++++++++++++++ kong/dao/factory.lua | 27 +++++++++++++++- kong/dao/migrations/cassandra.lua | 5 +++ 4 files changed, 83 insertions(+), 2 deletions(-) diff --git a/kong-0.10.0rc4-0.rockspec b/kong-0.10.0rc4-0.rockspec index 6813e86cd1ab..92bd9f3308fd 100644 --- a/kong-0.10.0rc4-0.rockspec +++ b/kong-0.10.0rc4-0.rockspec @@ -20,7 +20,7 @@ dependencies = { "multipart == 0.4", "version == 0.2", "lapis == 1.5.1", - "lua-cassandra == 1.1.0", + "lua-cassandra == 1.1.1", "pgmoon-mashape == 2.0.1", "luatz == 0.3", "lua_system_constants == 0.1.1", diff --git a/kong/dao/db/cassandra.lua b/kong/dao/db/cassandra.lua index 61844b9e699a..6aad7857a244 100644 --- a/kong/dao/db/cassandra.lua +++ b/kong/dao/db/cassandra.lua @@ -169,6 +169,50 @@ local function deserialize_rows(rows, schema) end end +local coordinator + +function _M:first_coordinator() + local peer, err = self.cluster:first_coordinator() + if not peer then + return nil, err + end + + coordinator = peer + + return true +end + +function _M:coordinator_change_keyspace(keyspace) + if not coordinator then + return nil, "no coordinator" + end + + return coordinator:change_keyspace(keyspace) +end + +function _M:close_coordinator() + if not coordinator then + return nil, "no coordinator" + end + + local ok, err = coordinator:close() + if not ok then + return nil, err + end + + coordinator = nil + + return ok +end + +function _M:wait_for_schema_consensus() + if not coordinator then + return nil, "no coordinator" + end + + return self.cluster:wait_schema_consensus(coordinator) +end + function _M:query(query, args, options, schema, no_keyspace) local opts = self:clone_query_options(options) local coordinator_opts = {} @@ -176,6 +220,13 @@ function _M:query(query, args, options, schema, no_keyspace) coordinator_opts.no_keyspace = true end + if coordinator then + local res, err = coordinator:execute(query, args, coordinator_opts) + if not res then return nil, Errors.db(err) end + + return res + end + local res, err = self.cluster:execute(query, args, opts, coordinator_opts) if not res then return nil, Errors.db(err) end diff --git a/kong/dao/factory.lua b/kong/dao/factory.lua index c00e9a15b2c2..3d989a43a2bd 100644 --- a/kong/dao/factory.lua +++ b/kong/dao/factory.lua @@ -245,7 +245,7 @@ local function migrate(self, identifier, migrations_modules, cur_migrations, on_ if on_success then on_success(identifier, migration.name, self.db:infos()) end -end + end return true, nil, #to_run end @@ -270,6 +270,14 @@ function _M:run_migrations(on_migrate, on_success) log.verbose("running datastore migrations") + if self.db.name == "cassandra" then + local ok, err = self.db:first_coordinator() + if not ok then + return nil, ret_error_string(self.db.name, nil, + "could not find coordinator: " .. err) + end + end + local migrations_modules = self:migrations_modules() local cur_migrations, err = self:current_migrations() if err then return nil, err end @@ -287,6 +295,23 @@ function _M:run_migrations(on_migrate, on_success) end end + if self.db.name == "cassandra" then + log.verbose("now waiting for schema consensus (%dms) timeout", + self.db.cluster.max_schema_consensus_wait) + + local ok, err = self.db:wait_for_schema_consensus() + if not ok then + return nil, ret_error_string(self.db.name, nil, + "could not wait for schema consensus: " .. err) + end + + ok, err = self.db:close_coordinator() + if not ok then + return nil, ret_error_string(self.db.name, nil, + "could not close coordinator: " .. err) + end + end + if migrations_ran > 0 then log("%d migrations ran", migrations_ran) end diff --git a/kong/dao/migrations/cassandra.lua b/kong/dao/migrations/cassandra.lua index aad373da730f..41e6900be476 100644 --- a/kong/dao/migrations/cassandra.lua +++ b/kong/dao/migrations/cassandra.lua @@ -37,6 +37,11 @@ return { return err end + local ok, err = db:coordinator_change_keyspace(keyspace_name) + if not ok then + return err + end + local res, err = db:query [[ CREATE TABLE IF NOT EXISTS schema_migrations( id text PRIMARY KEY,