Skip to content

Commit

Permalink
fix(db) store connection in request-aware context
Browse files Browse the repository at this point in the history
Calling `db:connect()` used to store the connection in as an instance
attribute in `self`, therefore in a worker singleton (`kong.db`), which
would produce `bad request` errors when used at runtime (i.e.
non-migration contexts).

This error presents itself when within the `plugins:select_by_cache_key`
special DAO method, in which the schema_state module performs a
`connect()` and subsequent operations to determine the database schema
state.

Storing the connection object in the request aware context (`ngx.ctx`)
prevents this issue. We still store the connection as an instance
attribute in non-request contexts.

From Kong#4081
  • Loading branch information
thibaultcha authored Dec 13, 2018
1 parent 4ef9df5 commit b9f2ba8
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 68 deletions.
95 changes: 56 additions & 39 deletions kong/db/strategies/cassandra/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@ CassandraConnector.__index = CassandraConnector


local function wait_for_schema_consensus(self)
if not self.connection then
local conn = self:get_stored_connection()
if not conn then
error("no connection")
end

log.verbose("waiting for Cassandra schema consensus (%dms timeout)...",
self.cluster.max_schema_consensus_wait)

local ok, err = self.cluster:wait_schema_consensus(self.connection)
local ok, err = self.cluster:wait_schema_consensus(conn)

log.verbose("Cassandra schema consensus: %s",
ok and "reached" or "not reached")
Expand Down Expand Up @@ -172,26 +173,28 @@ end


function CassandraConnector:connect()
if self.connection then
return true
local conn = self:get_stored_connection()
if conn then
return conn
end

local peer, err = self.cluster:next_coordinator()
if not peer then
return nil, err
end

self.connection = peer
self:store_connection(peer)

return true
return peer
end


-- open a connection from the first available contact point,
-- without a keyspace
function CassandraConnector:connect_migrations(opts)
if self.connection then
return self.connection
local conn = self:get_stored_connection()
if conn then
return conn
end

opts = opts or {}
Expand All @@ -208,20 +211,21 @@ function CassandraConnector:connect_migrations(opts)
end
end

self.connection = peer
self:store_connection(peer)

return peer
end


function CassandraConnector:setkeepalive()
if not self.connection then
local conn = self:get_stored_connection()
if not conn then
return
end

local ok, err = self.connection:setkeepalive()
local ok, err = conn:setkeepalive()

self.connection = nil
self:store_connection(nil)

if not ok then
return nil, err
Expand All @@ -232,13 +236,14 @@ end


function CassandraConnector:close()
if not self.connection then
local conn = self:get_stored_connection()
if not conn then
return
end

local ok, err = self.connection:close()
local ok, err = conn:close()

self.connection = nil
self:store_connection(nil)

if not ok then
return nil, err
Expand Down Expand Up @@ -266,8 +271,11 @@ function CassandraConnector:query(query, args, opts, operation)

opts.serial_consistency = self.opts.serial_consistency

local coordinator = self.connection
if not coordinator then
local conn = self:get_stored_connection()

local coordinator = conn

if not conn then
local err
coordinator, err = self.cluster:next_coordinator()
if not coordinator then
Expand Down Expand Up @@ -295,7 +303,7 @@ function CassandraConnector:query(query, args, opts, operation)
end
end

if not self.connection then
if not conn then
coordinator:setkeepalive()
end

Expand All @@ -308,7 +316,8 @@ end


local function select_keyspaces(self)
if not self.connection then
local conn = self:get_stored_connection()
if not conn then
error("no connection")
end

Expand All @@ -327,12 +336,13 @@ local function select_keyspaces(self)
WHERE keyspace_name = ?]]
end

return self.connection:execute(cql, { self.keyspace })
return conn:execute(cql, { self.keyspace })
end


local function select_tables(self)
if not self.connection then
local conn = self:get_stored_connection()
if not conn then
error("no connection")
end

Expand All @@ -351,7 +361,7 @@ local function select_tables(self)
WHERE keyspace_name = ?]]
end

return self.connection:execute(cql, { self.keyspace })
return conn:execute(cql, { self.keyspace })
end


Expand Down Expand Up @@ -545,8 +555,9 @@ do


function CassandraConnector:schema_migrations()
if not self.connection then
error("no connection")
local conn, err = self:connect()
if not conn then
error(err)
end

do
Expand Down Expand Up @@ -598,15 +609,15 @@ do
end
end

local ok, err = self.connection:change_keyspace(self.keyspace)
local ok, err = conn:change_keyspace(self.keyspace)
if not ok then
return nil, err
end

do
-- has migrations?

local rows, err = self.connection:execute([[
local rows, err = conn:execute([[
SELECT * FROM schema_meta WHERE key = ?
]], {
SCHEMA_META_KEY,
Expand Down Expand Up @@ -657,7 +668,8 @@ do

-- get a contact point connection (no keyspace set)

if not self.connection then
local conn = self:get_stored_connection()
if not conn then
error("no connection")
end

Expand All @@ -666,7 +678,7 @@ do
log.debug("creating '%s' keyspace if not existing...",
kong_config.cassandra_keyspace)

local res, err = self.connection:execute(string.format([[
local res, err = conn:execute(string.format([[
CREATE KEYSPACE IF NOT EXISTS %q
WITH REPLICATION = %s
]], kong_config.cassandra_keyspace, cql_replication))
Expand All @@ -677,7 +689,7 @@ do
log.debug("successfully created '%s' keyspace",
kong_config.cassandra_keyspace)

local ok, err = self.connection:change_keyspace(kong_config.cassandra_keyspace)
local ok, err = conn:change_keyspace(kong_config.cassandra_keyspace)
if not ok then
return nil, err
end
Expand All @@ -686,7 +698,7 @@ do

log.debug("creating 'schema_meta' table if not existing...")

local res, err = self.connection:execute([[
local res, err = conn:execute([[
CREATE TABLE IF NOT EXISTS schema_meta(
key text,
subsystem text,
Expand Down Expand Up @@ -718,11 +730,12 @@ do


function CassandraConnector:schema_reset()
if not self.connection then
local conn = self:get_stored_connection()
if not conn then
error("no connection")
end

local ok, err = self.connection:execute(string.format([[
local ok, err = conn:execute(string.format([[
DROP KEYSPACE IF EXISTS %q
]], self.keyspace))
if not ok then
Expand All @@ -749,7 +762,8 @@ do
error("up_cql must be a string", 2)
end

if not self.connection then
local conn = self:get_stored_connection()
if not conn then
error("no connection")
end

Expand All @@ -758,7 +772,7 @@ do
for i = 1, #t_cql do
local cql = pl_stringx.strip(t_cql[i])
if cql ~= "" then
local res, err = self.connection:execute(cql)
local res, err = conn:execute(cql)
if not res then
if string.find(err, "Column .- was not found in table")
or string.find(err, "[Ii]nvalid column name") then
Expand All @@ -785,7 +799,8 @@ do
error("name must be a string", 2)
end

if not self.connection then
local conn = self:get_stored_connection()
if not conn then
error("no connection")
end

Expand Down Expand Up @@ -827,7 +842,7 @@ do
table.insert(args, SCHEMA_META_KEY)
table.insert(args, subsystem)

local res, err = self.connection:execute(cql, args)
local res, err = conn:execute(cql, args)
if not res then
return nil, err
end
Expand Down Expand Up @@ -974,7 +989,9 @@ do
]]
end

local rows, err = self.connection:execute(cql, {
local conn = self:get_stored_connection()

local rows, err = conn:execute(cql, {
self.keyspace,
"schema_migrations",
})
Expand All @@ -987,12 +1004,12 @@ do
return res
end

local ok, err = self.connection:change_keyspace(self.keyspace)
local ok, err = conn:change_keyspace(self.keyspace)
if not ok then
return nil, err
end

local schema_migrations_rows, err = self.connection:execute([[
local schema_migrations_rows, err = conn:execute([[
SELECT id, migrations FROM schema_migrations
]])
if err then
Expand Down
33 changes: 33 additions & 0 deletions kong/db/strategies/connector.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,39 @@ function Connector:init_worker()
end


do
local past_init
local ngx = ngx


function Connector:store_connection(conn)
if not past_init and ngx and ngx.get_phase() ~= "init" then
past_init = true
end

if ngx and past_init then
ngx.ctx.connection = conn

else
self.connection = conn
end
end


function Connector:get_stored_connection()
if not past_init and ngx and ngx.get_phase() ~= "init" then
past_init = true
end

if ngx and past_init then
return ngx.ctx.connection
end

return self.connection
end
end


function Connector:infos()
error(fmt("infos() not implemented for '%s' strategy", self.database))
end
Expand Down
11 changes: 10 additions & 1 deletion kong/db/strategies/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,16 @@ function _M.new(kong_config, strategy, schemas, errors)
do
local base_connector = require "kong.db.strategies.connector"
local mt = getmetatable(connector)
setmetatable(mt, { __index = base_connector })
setmetatable(mt, {
__index = function(t, k)
-- explicit parent
if k == "super" then
return base_connector
end

return base_connector[k]
end
})
end

local strategies = {}
Expand Down
Loading

0 comments on commit b9f2ba8

Please sign in to comment.