Skip to content

Commit

Permalink
fix(migrations) prevent cassandra migrations on creating multiple def…
Browse files Browse the repository at this point in the history
…ault workspaces

### Summary

It has been reported on multiple issues that migrations with Cassandra may lead to
multiple default workspaces to be created in database. Most recently by
@jeremyjpj0916 on Kong#6403.

This commit tries to tackle the issue by setting consistency of INSERT (that adds
default workspace) query to QUORUM and consistency of SELECT (that tries to determine
if there is a default workspace already) to SERIAL.

The 212 to 213 uses same migration as 200 to 210 (which we fixed), so we can just
reuse those on 212 to 213 too.

We had in multiple places where we tried to ensure cassandra default workspace.
This commit also moves that to single place.
  • Loading branch information
bungle committed Oct 9, 2020
1 parent 4e56d2a commit 99d0f73
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 127 deletions.
15 changes: 12 additions & 3 deletions kong/db/migrations/core/009_200_to_210.lua
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ local function pg_ca_certificates_migration(connector)
end


local function c_ca_certificates_migration(connector)
local coordinator = assert(connector:get_stored_connection())
local function c_ca_certificates_migration(coordinator)
local cassandra = require "cassandra"
for rows, err in coordinator:iterate("SELECT id, cert, cert_digest FROM ca_certificates") do
if err then
Expand Down Expand Up @@ -242,13 +241,23 @@ return {
CREATE INDEX IF NOT EXISTS upstreams_client_certificate_id_idx ON upstreams(client_certificate_id);
]] .. ws_migration_up(operations.cassandra.up),
teardown = function(connector)
local coordinator = assert(connector:get_stored_connection())
local default_ws, err = operations.cassandra_ensure_default_ws(coordinator)
if err then
return nil, err
end

if not default_ws then
return nil, "unable to find a default workspace"
end

local _, err = ws_migration_teardown(operations.cassandra.teardown)(connector)
if err then
return nil, err
end

-- add `cert_digest` field for `ca_certificates` table
_, err = c_ca_certificates_migration(connector)
_, err = c_ca_certificates_migration(coordinator)
if err then
return nil, err
end
Expand Down
15 changes: 11 additions & 4 deletions kong/db/migrations/operations/200_to_210.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ end


local function cassandra_get_default_ws(coordinator)
local rows, err = coordinator:execute("SELECT id FROM workspaces WHERE name='default'")
local rows, err = coordinator:execute("SELECT id FROM workspaces WHERE name='default'", nil, {
consistency = cassandra.consistencies.serial,
})
if err then
return nil, err
end
Expand All @@ -43,12 +45,14 @@ local function cassandra_create_default_ws(coordinator)
local _, err = coordinator:execute("INSERT INTO workspaces(id, name, created_at) VALUES (?, 'default', ?)", {
cassandra.uuid(default_ws_id),
cassandra.timestamp(created_at),
}, {
consistency = cassandra.consistencies.quorum,
})
if err then
return nil, err
end

return cassandra_get_default_ws(coordinator) or default_ws_id
return cassandra_get_default_ws(coordinator)
end


Expand Down Expand Up @@ -354,7 +358,7 @@ local cassandra = {
-- Update composite cache keys to workspace-aware formats
ws_update_composite_cache_key = function(_, connector, table_name, is_partitioned)
local coordinator = assert(connector:get_stored_connection())
local default_ws, err = cassandra_ensure_default_ws(coordinator)
local default_ws, err = cassandra_get_default_ws(coordinator)
if err then
return nil, err
end
Expand Down Expand Up @@ -395,7 +399,7 @@ local cassandra = {
-- Update keys to workspace-aware formats
ws_update_keys = function(_, connector, table_name, unique_keys, is_partitioned)
local coordinator = assert(connector:get_stored_connection())
local default_ws, err = cassandra_ensure_default_ws(coordinator)
local default_ws, err = cassandra_get_default_ws(coordinator)
if err then
return nil, err
end
Expand Down Expand Up @@ -580,4 +584,7 @@ return {
postgres = postgres,
cassandra = cassandra,
ws_migrate_plugin = ws_migrate_plugin,
cassandra_get_default_ws = cassandra_get_default_ws,
cassandra_create_default_ws = cassandra_create_default_ws,
cassandra_ensure_default_ws = cassandra_ensure_default_ws,
}
127 changes: 7 additions & 120 deletions kong/db/migrations/operations/212_to_213.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,63 +7,7 @@
-- copy the functions over to a new versioned module.


local ngx = ngx
local uuid = require "resty.jit-uuid"
local cassandra = require "cassandra"


local default_ws_id = uuid.generate_v4()


local function render(template, keys)
return (template:gsub("$%(([A-Z_]+)%)", keys))
end


local function cassandra_get_default_ws(coordinator)
local rows, err = coordinator:execute("SELECT id FROM workspaces WHERE name='default'")
if err then
return nil, err
end

if not rows
or not rows[1]
or not rows[1].id
then
return nil
end

return rows[1].id
end


local function cassandra_create_default_ws(coordinator)
local created_at = ngx.time() * 1000

local _, err = coordinator:execute("INSERT INTO workspaces(id, name, created_at) VALUES (?, 'default', ?)", {
cassandra.uuid(default_ws_id),
cassandra.timestamp(created_at)
})
if err then
return nil, err
end

return cassandra_get_default_ws(coordinator) or default_ws_id
end


local function cassandra_ensure_default_ws(coordinator)
local default_ws, err = cassandra_get_default_ws(coordinator)
if err then
return nil, err
end

if default_ws then
return default_ws
end

return cassandra_create_default_ws(coordinator)
end
local operations_200_210 = require "kong.db.migrations.operations.200_to_210"


--------------------------------------------------------------------------------
Expand All @@ -77,24 +21,9 @@ local postgres = {
]],

teardown = {

------------------------------------------------------------------------------
-- Update composite cache keys to workspace-aware formats
ws_update_composite_cache_key = function(_, connector, table_name, is_partitioned)
local _, err = connector:query(render([[
UPDATE "$(TABLE)"
SET cache_key = CONCAT(cache_key, ':',
(SELECT id FROM workspaces WHERE name = 'default'))
WHERE cache_key LIKE '%:';
]], {
TABLE = table_name,
}))
if err then
return nil, err
end

return true
end,
-- These migrations were fixed since they were originally released,
-- thus those that have updated already, need to re-run it.
ws_update_composite_cache_key = operations_200_210.postgres.teardown.ws_update_composite_cache_key,
},

}
Expand All @@ -111,50 +40,9 @@ local cassandra = {
]],

teardown = {

------------------------------------------------------------------------------
-- Update composite cache keys to workspace-aware formats
ws_update_composite_cache_key = function(_, connector, table_name, is_partitioned)
local coordinator = assert(connector:get_stored_connection())
local default_ws, err = cassandra_ensure_default_ws(coordinator)
if err then
return nil, err
end

if not default_ws then
return nil, "unable to find a default workspace"
end

for rows, err in coordinator:iterate("SELECT id, cache_key FROM " .. table_name) do
if err then
return nil, err
end

for i = 1, #rows do
local row = rows[i]
if row.cache_key:match(":$") then
local cql = render([[
UPDATE $(TABLE) SET cache_key = '$(CACHE_KEY)' WHERE $(PARTITION) id = $(ID)
]], {
TABLE = table_name,
CACHE_KEY = row.cache_key .. ":" .. default_ws,
PARTITION = is_partitioned
and "partition = '" .. table_name .. "' AND"
or "",
ID = row.id,
})

local _, err = coordinator:execute(cql)
if err then
return nil, err
end
end
end
end

return true
end,

-- These migrations were fixed since they were originally released,
-- thus those that have updated already, need to re-run it.
ws_update_composite_cache_key = operations_200_210.cassandra.teardown.ws_update_composite_cache_key,
}

}
Expand All @@ -166,7 +54,6 @@ local cassandra = {

local function ws_adjust_data(ops, connector, entities)
for _, entity in ipairs(entities) do

if entity.cache_key and #entity.cache_key > 1 then
local _, err = ops:ws_update_composite_cache_key(connector, entity.name, entity.partitioned)
if err then
Expand Down

0 comments on commit 99d0f73

Please sign in to comment.