Skip to content

Commit

Permalink
fix(hybrid) control plane nodes now subscribe to cluster level `inval…
Browse files Browse the repository at this point in the history
…idations` events to be notified of CRUD operations

This change fixes a bug which manifested itself when Kong:

* Is working at Hybrid mode.
* The CP role is being executed by several Kong nodes in a
  cluster.
* A change (any successful CRUD operation) happens in one of these
  nodes.

The desired behaviour is that when this happens, all the CPs in the cluster
would trigger a DP config push. The bug consists in that only the workers in the
same Kong node as the one which received the CRUD operation executes such
push.

Example setup: Two Kong nodes in CP role. Node A has workers A1 and A2, and Node
B has workers B1 and B2. The admins use A1's admin API to create a new Service.

Before this fix, A1 would trigger the CP reload, then A2 would "see" the change
(via a worker event) and refresh its DPs. But B1 and B2 would perform such
refresh.

This fix makes other CP nodes look for the `invalidations` cluster event generated by
a CRUD operation and broadcasts a `push_config` event to all workers on the CP node,
therefore when a CRUD operation happens, all CP nodes in the cluster will promptly push
the updated config to DPs connected to them.

Co-authored-by: Datong Sun <[email protected]>
  • Loading branch information
kikito and dndx authored Oct 14, 2020
1 parent 5d58c40 commit d6126d8
Showing 2 changed files with 99 additions and 6 deletions.
25 changes: 19 additions & 6 deletions kong/clustering.lua
Original file line number Diff line number Diff line change
@@ -455,17 +455,30 @@ function _M.init_worker(conf)

-- ROLE = "control_plane"

kong.worker_events.register(function(data)
-- we have to re-broadcast event using `post` because the dao
-- events were sent using `post_local` which means not all workers
-- can receive it
-- Sends "clustering", "push_config" to all workers in the same node, including self
local function post_push_config_event_to_node_workers(_)
local res, err = kong.worker_events.post("clustering", "push_config")
if not res then
ngx_log(ngx_ERR, "unable to broadcast event: " .. err)
end
end, "dao:crud")
end

kong.worker_events.register(function(data)
-- The "invalidations" cluster event gets inserted in the cluster when there's a crud change
-- (like an insertion or deletion). Only one worker per kong node receives this callback.
-- This makes such node post push_config events to all the cp workers on its node
kong.cluster_events:subscribe("invalidations", post_push_config_event_to_node_workers)

-- The "dao:crud" event is triggered using post_local, which eventually generates an
-- "invalidations" cluster event. It is assumed that the workers in the
-- same node where the dao:crud event originated will "know" about the update mostly via
-- changes in the cache shared dict. Since DPs don't use the cache, nodes in the same
-- kong node where the event originated will need to be notified so they push config to
-- their DPs
kong.worker_events.register(post_push_config_event_to_node_workers, "dao:crud")

-- When "clustering", "push_config" worker event is received by a worker,
-- it loads and pushes the config to its the connected DPs
kong.worker_events.register(function(_)
local res, err = declarative.export_config()
if not res then
ngx_log(ngx_ERR, "unable to export config from database: " .. err)
80 changes: 80 additions & 0 deletions spec/02-integration/09-hybrid_mode/04-cp_cluster_sync_spec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
local helpers = require "spec.helpers"

local function find_in_file(filepath, pat)
local f = assert(io.open(filepath, "r"))

local line = f:read("*l")

local found = false
while line and not found do
if line:find(pat, 1, true) then
found = true
end

line = f:read("*l")
end

f:close()

return found
end

for _, strategy in helpers.each_strategy() do
describe("CP/CP sync works with #" .. strategy .. " backend", function()
lazy_setup(function()
helpers.get_db_utils(strategy, { "routes", "services" })

assert(helpers.start_kong({
prefix = "servroot",
admin_listen = "127.0.0.1:9000",
cluster_listen = "127.0.0.1:9005",

role = "control_plane",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering.crt",
database = strategy,
}))

assert(helpers.start_kong({
prefix = "servroot2",
admin_listen = "127.0.0.1:9001",
cluster_listen = "127.0.0.1:9006",

role = "control_plane",
cluster_cert = "spec/fixtures/kong_clustering.crt",
cluster_cert_key = "spec/fixtures/kong_clustering.key",
lua_ssl_trusted_certificate = "spec/fixtures/kong_clustering.crt",
database = strategy,
}))

end)

lazy_teardown(function()
assert(helpers.stop_kong("servroot"))
assert(helpers.stop_kong("servroot2"))
end)

it("syncs across other nodes in the cluster", function()
local admin_client_2 = assert(helpers.http_client("127.0.0.1", 9001))

local res = admin_client_2:post("/services", {
body = { name = "example", url = "http://example.dev" },
headers = { ["Content-Type"] = "application/json" }
})
assert.res_status(201, res)

assert(admin_client_2:close())

local cfg = helpers.test_conf
local filepath = cfg.prefix .. "/" .. cfg.proxy_error_log
helpers.wait_until(function()
return find_in_file(filepath,
-- this line is only found on the other CP (the one not receiving the Admin API call)
"[cluster_events] new event (channel: 'invalidations')") and
find_in_file(filepath,
"worker-events: handling event; source=clustering, event=push_config")
end, 10)
end)
end)
end

0 comments on commit d6126d8

Please sign in to comment.