From b9f2ba862bc076f0d300f099c617db640642d3a9 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Thu, 13 Dec 2018 18:20:32 +0100 Subject: [PATCH] fix(db) store connection in request-aware context Calling `db:connect()` used to store the connection in as an instance attribute in `self`, therefore in a worker singleton (`kong.db`), which would produce `bad request` errors when used at runtime (i.e. non-migration contexts). This error presents itself when within the `plugins:select_by_cache_key` special DAO method, in which the schema_state module performs a `connect()` and subsequent operations to determine the database schema state. Storing the connection object in the request aware context (`ngx.ctx`) prevents this issue. We still store the connection as an instance attribute in non-request contexts. From #4081 --- kong/db/strategies/cassandra/connector.lua | 95 +++++++++++++--------- kong/db/strategies/connector.lua | 33 ++++++++ kong/db/strategies/init.lua | 11 ++- kong/db/strategies/postgres/connector.lua | 70 +++++++++------- 4 files changed, 141 insertions(+), 68 deletions(-) diff --git a/kong/db/strategies/cassandra/connector.lua b/kong/db/strategies/cassandra/connector.lua index 6d1025a4e5d..a60258bf0b0 100644 --- a/kong/db/strategies/cassandra/connector.lua +++ b/kong/db/strategies/cassandra/connector.lua @@ -9,14 +9,15 @@ CassandraConnector.__index = CassandraConnector local function wait_for_schema_consensus(self) - if not self.connection then + local conn = self:get_stored_connection() + if not conn then error("no connection") end log.verbose("waiting for Cassandra schema consensus (%dms timeout)...", self.cluster.max_schema_consensus_wait) - local ok, err = self.cluster:wait_schema_consensus(self.connection) + local ok, err = self.cluster:wait_schema_consensus(conn) log.verbose("Cassandra schema consensus: %s", ok and "reached" or "not reached") @@ -172,8 +173,9 @@ end function CassandraConnector:connect() - if self.connection then - return true + local conn = self:get_stored_connection() + if conn then + return conn end local peer, err = self.cluster:next_coordinator() @@ -181,17 +183,18 @@ function CassandraConnector:connect() return nil, err end - self.connection = peer + self:store_connection(peer) - return true + return peer end -- open a connection from the first available contact point, -- without a keyspace function CassandraConnector:connect_migrations(opts) - if self.connection then - return self.connection + local conn = self:get_stored_connection() + if conn then + return conn end opts = opts or {} @@ -208,20 +211,21 @@ function CassandraConnector:connect_migrations(opts) end end - self.connection = peer + self:store_connection(peer) return peer end function CassandraConnector:setkeepalive() - if not self.connection then + local conn = self:get_stored_connection() + if not conn then return end - local ok, err = self.connection:setkeepalive() + local ok, err = conn:setkeepalive() - self.connection = nil + self:store_connection(nil) if not ok then return nil, err @@ -232,13 +236,14 @@ end function CassandraConnector:close() - if not self.connection then + local conn = self:get_stored_connection() + if not conn then return end - local ok, err = self.connection:close() + local ok, err = conn:close() - self.connection = nil + self:store_connection(nil) if not ok then return nil, err @@ -266,8 +271,11 @@ function CassandraConnector:query(query, args, opts, operation) opts.serial_consistency = self.opts.serial_consistency - local coordinator = self.connection - if not coordinator then + local conn = self:get_stored_connection() + + local coordinator = conn + + if not conn then local err coordinator, err = self.cluster:next_coordinator() if not coordinator then @@ -295,7 +303,7 @@ function CassandraConnector:query(query, args, opts, operation) end end - if not self.connection then + if not conn then coordinator:setkeepalive() end @@ -308,7 +316,8 @@ end local function select_keyspaces(self) - if not self.connection then + local conn = self:get_stored_connection() + if not conn then error("no connection") end @@ -327,12 +336,13 @@ local function select_keyspaces(self) WHERE keyspace_name = ?]] end - return self.connection:execute(cql, { self.keyspace }) + return conn:execute(cql, { self.keyspace }) end local function select_tables(self) - if not self.connection then + local conn = self:get_stored_connection() + if not conn then error("no connection") end @@ -351,7 +361,7 @@ local function select_tables(self) WHERE keyspace_name = ?]] end - return self.connection:execute(cql, { self.keyspace }) + return conn:execute(cql, { self.keyspace }) end @@ -545,8 +555,9 @@ do function CassandraConnector:schema_migrations() - if not self.connection then - error("no connection") + local conn, err = self:connect() + if not conn then + error(err) end do @@ -598,7 +609,7 @@ do end end - local ok, err = self.connection:change_keyspace(self.keyspace) + local ok, err = conn:change_keyspace(self.keyspace) if not ok then return nil, err end @@ -606,7 +617,7 @@ do do -- has migrations? - local rows, err = self.connection:execute([[ + local rows, err = conn:execute([[ SELECT * FROM schema_meta WHERE key = ? ]], { SCHEMA_META_KEY, @@ -657,7 +668,8 @@ do -- get a contact point connection (no keyspace set) - if not self.connection then + local conn = self:get_stored_connection() + if not conn then error("no connection") end @@ -666,7 +678,7 @@ do log.debug("creating '%s' keyspace if not existing...", kong_config.cassandra_keyspace) - local res, err = self.connection:execute(string.format([[ + local res, err = conn:execute(string.format([[ CREATE KEYSPACE IF NOT EXISTS %q WITH REPLICATION = %s ]], kong_config.cassandra_keyspace, cql_replication)) @@ -677,7 +689,7 @@ do log.debug("successfully created '%s' keyspace", kong_config.cassandra_keyspace) - local ok, err = self.connection:change_keyspace(kong_config.cassandra_keyspace) + local ok, err = conn:change_keyspace(kong_config.cassandra_keyspace) if not ok then return nil, err end @@ -686,7 +698,7 @@ do log.debug("creating 'schema_meta' table if not existing...") - local res, err = self.connection:execute([[ + local res, err = conn:execute([[ CREATE TABLE IF NOT EXISTS schema_meta( key text, subsystem text, @@ -718,11 +730,12 @@ do function CassandraConnector:schema_reset() - if not self.connection then + local conn = self:get_stored_connection() + if not conn then error("no connection") end - local ok, err = self.connection:execute(string.format([[ + local ok, err = conn:execute(string.format([[ DROP KEYSPACE IF EXISTS %q ]], self.keyspace)) if not ok then @@ -749,7 +762,8 @@ do error("up_cql must be a string", 2) end - if not self.connection then + local conn = self:get_stored_connection() + if not conn then error("no connection") end @@ -758,7 +772,7 @@ do for i = 1, #t_cql do local cql = pl_stringx.strip(t_cql[i]) if cql ~= "" then - local res, err = self.connection:execute(cql) + local res, err = conn:execute(cql) if not res then if string.find(err, "Column .- was not found in table") or string.find(err, "[Ii]nvalid column name") then @@ -785,7 +799,8 @@ do error("name must be a string", 2) end - if not self.connection then + local conn = self:get_stored_connection() + if not conn then error("no connection") end @@ -827,7 +842,7 @@ do table.insert(args, SCHEMA_META_KEY) table.insert(args, subsystem) - local res, err = self.connection:execute(cql, args) + local res, err = conn:execute(cql, args) if not res then return nil, err end @@ -974,7 +989,9 @@ do ]] end - local rows, err = self.connection:execute(cql, { + local conn = self:get_stored_connection() + + local rows, err = conn:execute(cql, { self.keyspace, "schema_migrations", }) @@ -987,12 +1004,12 @@ do return res end - local ok, err = self.connection:change_keyspace(self.keyspace) + local ok, err = conn:change_keyspace(self.keyspace) if not ok then return nil, err end - local schema_migrations_rows, err = self.connection:execute([[ + local schema_migrations_rows, err = conn:execute([[ SELECT id, migrations FROM schema_migrations ]]) if err then diff --git a/kong/db/strategies/connector.lua b/kong/db/strategies/connector.lua index 056d0566e7a..b67244213b8 100644 --- a/kong/db/strategies/connector.lua +++ b/kong/db/strategies/connector.lua @@ -16,6 +16,39 @@ function Connector:init_worker() end +do + local past_init + local ngx = ngx + + + function Connector:store_connection(conn) + if not past_init and ngx and ngx.get_phase() ~= "init" then + past_init = true + end + + if ngx and past_init then + ngx.ctx.connection = conn + + else + self.connection = conn + end + end + + + function Connector:get_stored_connection() + if not past_init and ngx and ngx.get_phase() ~= "init" then + past_init = true + end + + if ngx and past_init then + return ngx.ctx.connection + end + + return self.connection + end +end + + function Connector:infos() error(fmt("infos() not implemented for '%s' strategy", self.database)) end diff --git a/kong/db/strategies/init.lua b/kong/db/strategies/init.lua index 65134856c1c..3c7f021638d 100644 --- a/kong/db/strategies/init.lua +++ b/kong/db/strategies/init.lua @@ -31,7 +31,16 @@ function _M.new(kong_config, strategy, schemas, errors) do local base_connector = require "kong.db.strategies.connector" local mt = getmetatable(connector) - setmetatable(mt, { __index = base_connector }) + setmetatable(mt, { + __index = function(t, k) + -- explicit parent + if k == "super" then + return base_connector + end + + return base_connector[k] + end + }) end local strategies = {} diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 2b44ae5d20b..acb29992b97 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -223,6 +223,14 @@ local _mt = {} _mt.__index = _mt +function _mt:get_stored_connection() + local conn = self.super.get_stored_connection(self) + if conn and conn.sock then + return conn + end +end + + function _mt:init() local res, err = self:query("SHOW server_version_num;") local ver = tonumber(res and res[1] and res[1].server_version_num) @@ -326,8 +334,9 @@ end function _mt:connect() - if self.connection and self.connection.sock then - return true + local conn = self:get_stored_connection() + if conn then + return conn end local connection, err = connect(self.config) @@ -335,32 +344,26 @@ function _mt:connect() return nil, err end - self.connection = connection + self:store_connection(connection) - return true + return connection end -function _mt:connect_migrations(_) - if self.connection and self.connection.sock then - return self.connection - end - - local connection, err = connect(self.config) - if not connection then - return nil, err - end - - self.connection = connection - - return connection +function _mt:connect_migrations() + return self:connect() end function _mt:close() - local ok, err = close(self.connection) + local conn = self:get_stored_connection() + if not conn then + return true + end + + local ok, err = close(conn) - self.connection = nil + self:store_connection(nil) if not ok then return nil, err @@ -371,9 +374,14 @@ end function _mt:setkeepalive() - local ok, err = setkeepalive(self.connection) + local conn = self:get_stored_connection() + if not conn then + return true + end + + local ok, err = setkeepalive(conn) - self.connection = nil + self:store_connection(nil) if not ok then return nil, err @@ -386,8 +394,9 @@ end function _mt:query(sql) local res, err, partial, num_queries - if self.connection and self.connection.sock then - res, err, partial, num_queries = self.connection:query(sql) + local conn = self:get_stored_connection() + if conn then + res, err, partial, num_queries = conn:query(sql) else local connection @@ -584,7 +593,8 @@ end function _mt:schema_migrations() - if not self.connection or not self.connection.sock then + local conn = self:get_stored_connection() + if not conn then error("no connection") end @@ -625,7 +635,8 @@ end function _mt:schema_bootstrap(kong_config, default_locks_ttl) - if not self.connection or not self.connection.sock then + local conn = self:get_stored_connection() + if not conn then error("no connection") end @@ -661,7 +672,8 @@ end function _mt:schema_reset() - if not self.connection or not self.connection.sock then + local conn = self:get_stored_connection() + if not conn then error("no connection") end @@ -690,7 +702,8 @@ function _mt:run_up_migration(name, up_sql) error("up_sql must be a string", 2) end - if not self.connection or not self.connection.sock then + local conn = self:get_stored_connection() + if not conn then error("no connection") end @@ -724,7 +737,8 @@ function _mt:record_migration(subsystem, name, state) error("name must be a string", 2) end - if not self.connection or not self.connection.sock then + local conn = self:get_stored_connection() + if not conn then error("no connection") end