Skip to content

Commit

Permalink
feat: Adds leave rate limit to muc_rate_limit.
Browse files Browse the repository at this point in the history
  • Loading branch information
damencho committed Oct 31, 2023
1 parent e2de06f commit 73b3309
Showing 1 changed file with 106 additions and 23 deletions.
129 changes: 106 additions & 23 deletions resources/prosody-plugins/mod_muc_rate_limit.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
local queue = require "util.queue";
local new_throttle = require "util.throttle".create;
local timer = require "util.timer";
local st = require "util.stanza";

-- we max to 500 participants per meeting so this should be enough, we are not suppose to handle all
-- participants in one meeting
local PRESENCE_QUEUE_MAX_SIZE = 1000;

-- default to 5 participants per second
local join_rate_per_conference = module:get_option_number("muc_rate_joins", 5);
-- default to 3 participants per second
local join_rate_per_conference = module:get_option_number("muc_rate_joins", 3);
local leave_rate_per_conference = module:get_option_number("muc_rate_leaves", 5);

-- Measure/monitor the room rate limiting queue
local measure = require "core.statsmanager".measure;
Expand All @@ -31,16 +33,20 @@ local stat_longest_queue = 0;

-- Adds item to the queue
-- @returns false if queue is full and item was not added, true otherwise
local function add_item_to_queue(joining_queue, item, room, from)
if not joining_queue:push(item) then
module:log('error', 'Error pushing presence in queue for %s in %s', from, room.jid);
local function add_item_to_queue(queue, item, room, from, send_stats)
if not queue:push(item) then
module:log('error',
'Error pushing item in %s queue for %s in %s', send_stats and 'join' or 'leave', from, room.jid);

if send_stats then
measure_full_queue();
end

measure_full_queue();
return false;
else
-- check is this the longest queue and if so throws a stat
if joining_queue:count() > stat_longest_queue then
stat_longest_queue = joining_queue:count();
if send_stats and queue:count() > stat_longest_queue then
stat_longest_queue = queue:count();
measure_longest_queue(stat_longest_queue);
end

Expand All @@ -50,27 +56,23 @@ end

-- process join_rate_presence_queue in the room and pops element passing them to handle_normal_presence
-- returns 1 if we want to reschedule it after 1 second
local function timer_process_queue_elements (room)
local presence_queue = room.join_rate_presence_queue;

if not presence_queue or presence_queue:count() == 0 then
local function timer_process_queue_elements (rate, queue, process, queue_empty_cb)
if not queue or queue:count() == 0 then
return;
end

for _ = 1, join_rate_per_conference do
local ev = presence_queue:pop();
for _ = 1, rate do
local ev = queue:pop();
if ev then
-- we mark what we pass here so we can skip it on the next muc-occupant-pre-join event
ev.stanza.delayed_join_skip = true;
room:handle_normal_presence(ev.origin, ev.stanza);
process(ev);
end
end

-- if there are elements left, schedule an execution in a second
if presence_queue:count() > 0 then
if queue:count() > 0 then
return 1;
else
room.join_rate_queue_timer = false;
queue_empty_cb();
end
end

Expand Down Expand Up @@ -99,17 +101,28 @@ module:hook("muc-occupant-pre-join", function (event)
room.join_rate_presence_queue = queue.new(PRESENCE_QUEUE_MAX_SIZE);
end

if not add_item_to_queue(room.join_rate_presence_queue, event, room, stanza.attr.from) then
if not add_item_to_queue(room.join_rate_presence_queue, event, room, stanza.attr.from, true) then
-- let's not stop processing the event
return nil;
end

if not room.join_rate_queue_timer then
timer.add_task(1, function ()
local status, result = pcall(timer_process_queue_elements, room);
local status, result = pcall(timer_process_queue_elements,
join_rate_per_conference,
room.join_rate_presence_queue,
function(ev)
-- we mark what we pass here so we can skip it on the next muc-occupant-pre-join event
ev.stanza.delayed_join_skip = true;
room:handle_normal_presence(ev.origin, ev.stanza);
end,
function() -- empty callback
room.join_rate_queue_timer = false;
end
);
if not status then
-- there was an error in the timer function
module:log('error', 'Error processing queue: %s', result);
module:log('error', 'Error processing join queue: %s', result);

measure_errors_processing_queue();

Expand All @@ -130,7 +143,7 @@ module:hook("muc-occupant-pre-join", function (event)

-- if add fails as queue is full we return false and the event will continue processing, we risk re-order
-- but not losing it
return add_item_to_queue(room.join_rate_presence_queue, event, room, stanza.attr.from);
return add_item_to_queue(room.join_rate_presence_queue, event, room, stanza.attr.from, true);
end

end, 9); -- as we will rate limit joins we need to be the first to execute
Expand All @@ -141,3 +154,73 @@ end, 9); -- as we will rate limit joins we need to be the first to execute
module:hook('muc-room-destroyed',function(event)
event.room.join_rate_presence_queue = nil;
end);

module:hook('muc-occupant-pre-leave', function (event)
local occupant, room, stanza = event.occupant, event.room, event.stanza;
local throttle = room.leave_rate_throttle;

if not throttle then
throttle = new_throttle(leave_rate_per_conference, 1); -- rate per one second
room.leave_rate_throttle = throttle;
end

if not throttle:poll(1) then
if not room.leave_rate_presence_queue then
room.leave_rate_presence_queue = queue.new(PRESENCE_QUEUE_MAX_SIZE);
end

-- we need it later when processing the event
event.orig_role = occupant.role;

if not add_item_to_queue(room.leave_rate_presence_queue, event, room, stanza.attr.from, false) then
-- let's not stop processing the event
return nil;
end

-- set role to nil so the occupant will be removed from room occupants when we save it
-- we remove occupant from the list early on batches so we can spare sending few presences
occupant.role = nil;
room:save_occupant(occupant);

if not room.leave_rate_queue_timer then
timer.add_task(1, function ()
local status, result = pcall(timer_process_queue_elements,
leave_rate_per_conference,
room.leave_rate_presence_queue,
function(ev)
local occupant, orig_role, origin, room, stanza
= ev.occupant, ev.orig_role, ev.origin, ev.room, ev.stanza;

room:publicise_occupant_status(
occupant,
st.stanza("x", {xmlns = "http://jabber.org/protocol/muc#user";}),
nil, nil, nil, orig_role);

module:fire_event("muc-occupant-left", {
room = room;
nick = occupant.nick;
occupant = occupant;
origin = origin;
stanza = stanza;
});
end,
function() -- empty callback
room.leave_rate_queue_timer = false;
end
);
if not status then
-- there was an error in the timer function
module:log('error', 'Error processing leave queue: %s', result);

-- let's re-schedule timer so we do not lose the queue
return 1;
end

return result;
end);
room.leave_rate_queue_timer = true;
end

return true; -- we stop execution, so we do not process this leave at the moment
end
end);

0 comments on commit 73b3309

Please sign in to comment.