Skip to content

Commit

Permalink
fix(db) polishing after merge with next
Browse files Browse the repository at this point in the history
- make db instance accessible from Factory for `init_worker()` call
- update to new CLI flag
- use the new no_keyspace option from coordinator_options
- correcly instanciate Factory in new quiet_spec tests
- replace left over `unset`
- fix rate-limiting and response-ratelimiting tests (renamed dao_spec to
policies_spec)
  • Loading branch information
thibaultcha committed Oct 17, 2016
1 parent fa67cc2 commit e9b9613
Show file tree
Hide file tree
Showing 20 changed files with 270 additions and 254 deletions.
2 changes: 2 additions & 0 deletions kong-0.9.3-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ build = {
["kong.plugins.rate-limiting.migrations.postgres"] = "kong/plugins/rate-limiting/migrations/postgres.lua",
["kong.plugins.rate-limiting.handler"] = "kong/plugins/rate-limiting/handler.lua",
["kong.plugins.rate-limiting.schema"] = "kong/plugins/rate-limiting/schema.lua",
["kong.plugins.rate-limiting.daos"] = "kong/plugins/rate-limiting/daos.lua",
["kong.plugins.rate-limiting.policies"] = "kong/plugins/rate-limiting/policies/init.lua",
["kong.plugins.rate-limiting.policies.cluster"] = "kong/plugins/rate-limiting/policies/cluster.lua",

Expand All @@ -176,6 +177,7 @@ build = {
["kong.plugins.response-ratelimiting.header_filter"] = "kong/plugins/response-ratelimiting/header_filter.lua",
["kong.plugins.response-ratelimiting.log"] = "kong/plugins/response-ratelimiting/log.lua",
["kong.plugins.response-ratelimiting.schema"] = "kong/plugins/response-ratelimiting/schema.lua",
["kong.plugins.response-ratelimiting.daos"] = "kong/plugins/response-ratelimiting/daos.lua",
["kong.plugins.response-ratelimiting.policies"] = "kong/plugins/response-ratelimiting/policies/init.lua",
["kong.plugins.response-ratelimiting.policies.cluster"] = "kong/plugins/response-ratelimiting/policies/cluster.lua",

Expand Down
7 changes: 3 additions & 4 deletions kong/dao/db/cassandra.lua
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ function _M.new(kong_config)
self.query_options = query_opts
self.cluster_options = cluster_options

if ngx.RESTY_CLI then
if ngx.IS_CLI then
-- we must manually call our init phase (usually called from `init_by_lua`)
-- to refresh the cluster.
local ok, err = self:init()
Expand Down Expand Up @@ -159,8 +159,7 @@ function _M:query(query, args, options, schema, no_keyspace)
local opts = self:clone_query_options(options)
local coordinator_opts = {}
if no_keyspace then
-- defaults to the system keyspace, always present
coordinator_opts.keyspace = "system"
coordinator_opts.no_keyspace = true
end

local res, err = self.cluster:execute(query, args, opts, coordinator_opts)
Expand Down Expand Up @@ -427,7 +426,7 @@ function _M:update(table_name, schema, constraints, filter_keys, values, nils, f
if full then
for col in pairs(nils) do
sets[#sets + 1] = col.." = ?"
args[#args + 1] = cassandra.unset
args[#args + 1] = cassandra.null
end
end

Expand Down
1 change: 1 addition & 0 deletions kong/dao/db/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ function _M:init_worker()
if not ok then
log(ERR, "could not create TTL timer: ", err)
end
return true
end

--- TTL utils
Expand Down
95 changes: 55 additions & 40 deletions kong/dao/factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ local utils = require "kong.tools.utils"
local ModelFactory = require "kong.dao.model_factory"

local CORE_MODELS = {"apis", "consumers", "plugins", "nodes"}
local _db

-- returns db errors as strings, including the initial `nil`
local function ret_error_string(db_name, res, err)
Expand Down Expand Up @@ -73,87 +72,103 @@ local function load_daos(self, schemas, constraints, events_handler)
end

for m_name, schema in pairs(schemas) do
self.daos[m_name] = DAO(_db, ModelFactory(schema), schema, constraints[m_name], events_handler)
self.daos[m_name] = DAO(self.db, ModelFactory(schema), schema, constraints[m_name], events_handler)
end
end

function _M.new(kong_config, events_handler)
local factory = {
local self = {
db_type = kong_config.database,
daos = {},
additional_tables = {},
kong_config = kong_config,
plugin_names = kong_config.plugins or {}
}

local DB = require("kong.dao.db."..factory.db_type)
local DB = require("kong.dao.db."..self.db_type)
local db, err = DB.new(kong_config)
if not db then return ret_error_string(factory.db_type, nil, err) end
if not db then return ret_error_string(self.db_type, nil, err) end

_db = db -- avoid setting a previous upvalue to `nil` in case `DB.new()` fails
self.db = db

local schemas = {}
for _, m_name in ipairs(CORE_MODELS) do
schemas[m_name] = require("kong.dao.schemas."..m_name)
end

for plugin_name in pairs(factory.plugin_names) do
local has_dao, plugin_daos = utils.load_module_if_exists("kong.plugins."..plugin_name..".dao."..factory.db_type)
if has_dao then
for k, v in pairs(plugin_daos) do
factory.daos[k] = v(kong_config)
end
end

for plugin_name in pairs(self.plugin_names) do
local has_schema, plugin_schemas = utils.load_module_if_exists("kong.plugins."..plugin_name..".daos")
if has_schema then
for k, v in pairs(plugin_schemas) do
schemas[k] = v
if plugin_schemas.tables then
for _, v in ipairs(plugin_schemas.tables) do
table.insert(self.additional_tables, v)
end
else
for k, v in pairs(plugin_schemas) do
schemas[k] = v
end
end
end
end

local constraints = build_constraints(schemas)
load_daos(factory, schemas, constraints, events_handler)
load_daos(self, schemas, constraints, events_handler)

return setmetatable(factory, _M)
return setmetatable(self, _M)
end

function _M:init()
return _db:init()
return self.db:init()
end

function _M:init_worker()
return self.db:init_worker()
end

-- Migrations

function _M:infos()
return _db:infos()
return self.db:infos()
end

function _M:drop_schema()
for _, dao in pairs(self.daos) do
_db:drop_table(dao.table)
self.db:drop_table(dao.table)
end

if self.additional_tables then
for _, v in ipairs(self.additional_tables) do
self.db:drop_table(v)
end
end

if _db.additional_tables then
for _, v in ipairs(_db.additional_tables) do
_db:drop_table(v)
if self.db.additional_tables then
for _, v in ipairs(self.db.additional_tables) do
self.db:drop_table(v)
end
end

_db:drop_table("schema_migrations")
self.db:drop_table("schema_migrations")
end

function _M:truncate_table(dao_name)
_db:truncate_table(self.daos[dao_name].table)
self.db:truncate_table(self.daos[dao_name].table)
end

function _M:truncate_tables()
for _, dao in pairs(self.daos) do
_db:truncate_table(dao.table)
self.db:truncate_table(dao.table)
end

if self.db.additional_tables then
for _, v in ipairs(self.db.additional_tables) do
self.db:truncate_table(v)
end
end

if _db.additional_tables then
for _, v in ipairs(_db.additional_tables) do
_db:truncate_table(v)
if self.additional_tables then
for _, v in ipairs(self.additional_tables) do
self.db:truncate_table(v)
end
end
end
Expand All @@ -174,8 +189,8 @@ function _M:migrations_modules()
end

function _M:current_migrations()
local rows, err = _db:current_migrations()
if err then return ret_error_string(_db.name, nil, err) end
local rows, err = self.db:current_migrations()
if err then return ret_error_string(self.db.name, nil, err) end

local cur_migrations = {}
for _, row in ipairs(rows) do
Expand All @@ -196,30 +211,30 @@ local function migrate(self, identifier, migrations_modules, cur_migrations, on_

if #to_run > 0 and on_migrate then
-- we have some migrations to run
on_migrate(identifier, _db:infos())
on_migrate(identifier, self.db:infos())
end

for _, migration in ipairs(to_run) do
local err
local mig_type = type(migration.up)
if mig_type == "string" then
err = _db:queries(migration.up)
err = self.db:queries(migration.up)
elseif mig_type == "function" then
err = migration.up(_db, self.kong_config, self)
err = migration.up(self.db, self.kong_config, self)
end

if err then
return nil, string.format("Error during migration %s: %s", migration.name, err)
end

-- record success
local ok, err = _db:record_migration(identifier, migration.name)
local ok, err = self.db:record_migration(identifier, migration.name)
if not ok then
return nil, string.format("Error recording migration %s: %s", migration.name, err)
end

if on_success then
on_success(identifier, migration.name, _db:infos())
on_success(identifier, migration.name, self.db:infos())
end
end

Expand Down Expand Up @@ -248,15 +263,15 @@ function _M:run_migrations(on_migrate, on_success)

local migrations_modules = self:migrations_modules()
local cur_migrations, err = self:current_migrations()
if err then return ret_error_string(_db.name, nil, err) end
if err then return ret_error_string(self.db.name, nil, err) end

local ok, err, migrations_ran = migrate(self, "core", migrations_modules, cur_migrations, on_migrate, on_success)
if not ok then return ret_error_string(_db.name, nil, err) end
if not ok then return ret_error_string(self.db.name, nil, err) end

for identifier in pairs(migrations_modules) do
if identifier ~= "core" then
local ok, err, n_ran = migrate(self, identifier, migrations_modules, cur_migrations, on_migrate, on_success)
if not ok then return ret_error_string(_db.name, nil, err)
if not ok then return ret_error_string(self.db.name, nil, err)
else
migrations_ran = migrations_ran + n_ran
end
Expand Down
5 changes: 4 additions & 1 deletion kong/kong.lua
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,10 @@ function Kong.init_worker()

core.init_worker.before()

singletons.dao:init_worker()
local ok, err = singletons.dao:init_worker()
if not ok then
ngx.log(ngx.ERR, "could not init DB: ", err)
end

for _, plugin in ipairs(singletons.loaded_plugins) do
plugin.handler:init_worker()
Expand Down
3 changes: 3 additions & 0 deletions kong/plugins/rate-limiting/daos.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
return {
tables = {"ratelimiting_metrics"}
}
5 changes: 3 additions & 2 deletions kong/plugins/rate-limiting/policies/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ return {
db.cassandra.timestamp(periods[period]),
period,
})
if not rows then return nil, err
elseif #rows > 0 then return rows[1] end
if not rows then return nil, err
elseif #rows <= 1 then return rows[1]
else return nil, "bad rows result" end
end,
},
["postgres"] = {
Expand Down
8 changes: 5 additions & 3 deletions kong/plugins/rate-limiting/policies/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ return {
ngx_log(ngx.ERR, "[rate-limiting] cluster policy: could not increment ",
db.name, " counter: ", err)
end

return ok, err
end,
usage = function(conf, api_id, identifier, current_timestamp, name)
local db = singletons.dao.db
local rows, err = policy_cluster[db.name].find(db, api_id, identifier,
local row, err = policy_cluster[db.name].find(db, api_id, identifier,
current_timestamp, name)
if not rows then return nil, err end
if err then return nil, err end

return rows and rows.value or 0
return row and row.value or 0
end
},
["redis"] = {
Expand Down
3 changes: 3 additions & 0 deletions kong/plugins/response-ratelimiting/daos.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
return {
tables = {"response_ratelimiting_metrics"}
}
5 changes: 3 additions & 2 deletions kong/plugins/response-ratelimiting/policies/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ return {
db.cassandra.timestamp(periods[period]),
name.."_"..period,
})
if not rows then return nil, err
elseif #rows > 0 then return rows[1] end
if not rows then return nil, err
elseif #rows <= 1 then return rows[1]
else return nil, "bad rows result" end
end,
},
["postgres"] = {
Expand Down
4 changes: 3 additions & 1 deletion kong/plugins/response-ratelimiting/policies/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ return {
ngx_log(ngx.ERR, "[response-ratelimiting] cluster policy: could not increment ",
db.name, " counter: ", err)
end

return ok, err
end,
usage = function(conf, api_id, identifier, current_timestamp, period, name)
local db = singletons.dao.db
local rows, err = policy_cluster[db.name].find(db, api_id, identifier,
current_timestamp, period,
name)
if not rows then return nil, err end
if err then return nil, err end

return rows and rows.value or 0
end
Expand Down
21 changes: 21 additions & 0 deletions spec/01-unit/13-db/01-init_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
local db = require "kong.dao.db"

describe("kong.dao.db.init", function()
it("has __index set to the init module so we can call base functions", function()
local my_db_module = db.new_db("cassandra")

function my_db_module.new()
local self = my_db_module.super.new()
self.foo = "bar"
return self
end

local my_db = my_db_module.new()
assert.equal("bar", my_db.foo)

assert.has_no_error(function()
my_db:init()
my_db:init_worker()
end)
end)
end)
File renamed without changes.
2 changes: 1 addition & 1 deletion spec/02-integration/02-dao/08-quiet_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ helpers.for_each_dao(function(kong_config)
describe("Quiet with #"..kong_config.database, function()
local factory
setup(function()
factory = Factory(kong_config, events)
factory = Factory.new(kong_config, events)
assert(factory:run_migrations())

factory:truncate_tables()
Expand Down
Loading

0 comments on commit e9b9613

Please sign in to comment.