Skip to content

Commit

Permalink
restart the NotificationFailureProcessor job every 5 minutes
Browse files Browse the repository at this point in the history
Change-Id: Ic07a35f13001584a5790f38688539b10c2e6e10f
Reviewed-on: https://gerrit.instructure.com/169709
Tested-by: Jenkins
Reviewed-by: Rob Orton <[email protected]>
QA-Review: Cody Cutrer <[email protected]>
Product-Review: Cody Cutrer <[email protected]>
  • Loading branch information
ccutrer committed Oct 29, 2018
1 parent 380847a commit e6b1a5a
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
11 changes: 10 additions & 1 deletion app/models/notification_failure_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ class NotificationFailureProcessor
POLL_PARAMS = %i(idle_timeout wait_time_seconds visibility_timeout).freeze
DEFAULT_CONFIG = {
notification_failure_queue_name: 'notification-service-failures',
idle_timeout: 10
# stop the loop if no message received for 10s
idle_timeout: 10,
# stop the loop (and wait for it to process the job again) if we've been running
# for this long
iteration_high_water: 300
}.freeze

def self.config
Expand All @@ -47,6 +51,11 @@ def initialize

def process
return nil unless self.class.enabled?
start_time = Time.now
notification_failure_queue.before_request do |_stats|
throw :stop_polling if Time.now - start_time > config[:iteration_high_water]
end

notification_failure_queue.poll(config.slice(*POLL_PARAMS)) do |failure_summary_json|
summary = parse_failure_summary(failure_summary_json)
process_failure_summary(summary) if summary
Expand Down
28 changes: 28 additions & 0 deletions spec/models/notification_failure_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def mock_failure_summary(obj)

def mock_queue(bare_failure_summaries)
queue = double
expect(queue).to receive(:before_request)
expectation = expect(queue).to receive(:poll)
bare_failure_summaries.each do |s|
expectation = expectation.and_yield(mock_failure_summary(s))
Expand Down Expand Up @@ -138,6 +139,33 @@ def mock_queue(bare_failure_summaries)
expect{ nfp.process }.not_to raise_error
end

it "breaks out early when exceeding its timeline" do
nfp = NotificationFailureProcessor.new
allow(NotificationFailureProcessor).to receive(:config).and_return({
access_key: 'key',
secret_access_key: 'secret'
})
queue = double
before_request = nil
expect(queue).to receive(:before_request) do |&block|
before_request = block
end
reached = false
expect(queue).to receive(:poll) do
before_request.call
reached = true
Timecop.travel(10.minutes.from_now)
before_request.call
raise "not reached"
end
allow(nfp).to receive(:notification_failure_queue).and_return(queue)

Timecop.freeze do
expect { nfp.process }.to throw_symbol(:stop_polling)
end
expect(reached).to eq true
end

context 'shards' do
specs_require_sharding

Expand Down

0 comments on commit e6b1a5a

Please sign in to comment.