Skip to content

Commit

Permalink
Merge pull request Kong#2145 from Mashape/feat/single-cass-peer-migra…
Browse files Browse the repository at this point in the history
…tions

feat(c*) run migrations from a single coordinator
  • Loading branch information
subnetmarco authored Mar 4, 2017
2 parents aa57a13 + 126191a commit 756f46a
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 2 deletions.
2 changes: 1 addition & 1 deletion kong-0.10.0rc4-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dependencies = {
"multipart == 0.4",
"version == 0.2",
"lapis == 1.5.1",
"lua-cassandra == 1.1.0",
"lua-cassandra == 1.1.1",
"pgmoon-mashape == 2.0.1",
"luatz == 0.3",
"lua_system_constants == 0.1.1",
Expand Down
51 changes: 51 additions & 0 deletions kong/dao/db/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,64 @@ local function deserialize_rows(rows, schema)
end
end

local coordinator

function _M:first_coordinator()
local peer, err = self.cluster:first_coordinator()
if not peer then
return nil, err
end

coordinator = peer

return true
end

function _M:coordinator_change_keyspace(keyspace)
if not coordinator then
return nil, "no coordinator"
end

return coordinator:change_keyspace(keyspace)
end

function _M:close_coordinator()
if not coordinator then
return nil, "no coordinator"
end

local ok, err = coordinator:close()
if not ok then
return nil, err
end

coordinator = nil

return ok
end

function _M:wait_for_schema_consensus()
if not coordinator then
return nil, "no coordinator"
end

return self.cluster:wait_schema_consensus(coordinator)
end

function _M:query(query, args, options, schema, no_keyspace)
local opts = self:clone_query_options(options)
local coordinator_opts = {}
if no_keyspace then
coordinator_opts.no_keyspace = true
end

if coordinator then
local res, err = coordinator:execute(query, args, coordinator_opts)
if not res then return nil, Errors.db(err) end

return res
end

local res, err = self.cluster:execute(query, args, opts, coordinator_opts)
if not res then return nil, Errors.db(err) end

Expand Down
27 changes: 26 additions & 1 deletion kong/dao/factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ local function migrate(self, identifier, migrations_modules, cur_migrations, on_
if on_success then
on_success(identifier, migration.name, self.db:infos())
end
end
end

return true, nil, #to_run
end
Expand All @@ -270,6 +270,14 @@ function _M:run_migrations(on_migrate, on_success)

log.verbose("running datastore migrations")

if self.db.name == "cassandra" then
local ok, err = self.db:first_coordinator()
if not ok then
return nil, ret_error_string(self.db.name, nil,
"could not find coordinator: " .. err)
end
end

local migrations_modules = self:migrations_modules()
local cur_migrations, err = self:current_migrations()
if err then return nil, err end
Expand All @@ -287,6 +295,23 @@ function _M:run_migrations(on_migrate, on_success)
end
end

if self.db.name == "cassandra" then
log.verbose("now waiting for schema consensus (%dms) timeout",
self.db.cluster.max_schema_consensus_wait)

local ok, err = self.db:wait_for_schema_consensus()
if not ok then
return nil, ret_error_string(self.db.name, nil,
"could not wait for schema consensus: " .. err)
end

ok, err = self.db:close_coordinator()
if not ok then
return nil, ret_error_string(self.db.name, nil,
"could not close coordinator: " .. err)
end
end

if migrations_ran > 0 then
log("%d migrations ran", migrations_ran)
end
Expand Down
5 changes: 5 additions & 0 deletions kong/dao/migrations/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ return {
return err
end

local ok, err = db:coordinator_change_keyspace(keyspace_name)
if not ok then
return err
end

local res, err = db:query [[
CREATE TABLE IF NOT EXISTS schema_migrations(
id text PRIMARY KEY,
Expand Down

0 comments on commit 756f46a

Please sign in to comment.