Skip to content

Commit

Permalink
Merge pull request ManageIQ#21443 from jrafanie/on_worker_destroy_err…
Browse files Browse the repository at this point in the history
…or_out_tasks_with_active_queue_message

Error out tasks with active messages when deleting a worker
  • Loading branch information
Fryguy authored Sep 21, 2021
2 parents 571ee76 + 553a4cd commit 4b06072
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
11 changes: 10 additions & 1 deletion app/models/miq_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ class MiqWorker < ApplicationRecord
include_concern 'SystemdCommon'
include UuidMixin

before_destroy :log_destroy_of_worker_messages
before_destroy :error_out_tasks_with_active_queue_message, :log_destroy_of_worker_messages

belongs_to :miq_server
has_many :messages, :as => :handler, :class_name => 'MiqQueue'
Expand Down Expand Up @@ -455,6 +455,15 @@ def clean_active_messages
end
end

private def error_out_tasks_with_active_queue_message
message = "Task Handler: [#{friendly_name}] ID [#{id}] has been deleted!"
processed_messages.includes(:miq_task).where.not(:miq_task_id => nil).each do |m|
# Note, this is done synchronously from destroy because workers have 1 message they're currently "handling"
# and each message can only have 1 task, so this should be very fast even for many workers.
m.miq_task.update_status(MiqTask::STATE_FINISHED, MiqTask::STATUS_ERROR, message)
end
end

def log_destroy_of_worker_messages
ready_messages.each do |m|
_log.warn("Nullifying: #{MiqQueue.format_full_log_msg(m)}") rescue nil
Expand Down
33 changes: 33 additions & 0 deletions spec/models/miq_worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,39 @@ def check_has_required_role(worker_role_names, expected_result)
expect(@worker.worker_options).to eq(:guid => @worker.guid)
end

context "#destroy" do
context "where it has messages it's handling" do
before { EvmSpecHelper.local_guid_miq_server_zone }
let(:miq_task) do
queue_opts = {:class_name => "MiqServer", :method_name => "my_server", :args => []}
task_opts = {:name => "Thing1", :userid => "admin"}
MiqTask.generic_action_with_callback(task_opts, queue_opts, true)
end

let(:message_linked_to_task) { miq_task.miq_queue }

it "synchronously errors out tasks linked to these soon to be deleted active messages" do
miq_task.state_active
message_linked_to_task.update!(:handler => @worker, :state => MiqQueue::STATE_DEQUEUE)
@worker.destroy

expect(miq_task.reload.active?).to be_falsey
expect(miq_task.status_error?).to be_truthy
expect { message_linked_to_task.reload }.to raise_error(ActiveRecord::RecordNotFound)
end

it "let's other handlers pick up tasks and messages not yet started" do
miq_task.state_queued
message_linked_to_task.update!(:handler => @worker, :state => MiqQueue::STATE_READY)
@worker.destroy

expect(miq_task.reload.active?).to be_falsey
expect(miq_task.status_ok?).to be_truthy
expect(message_linked_to_task.reload.handler).to be_nil
end
end
end

context "#command_line" do
it "without guid in worker_options" do
allow(@worker).to receive(:worker_options).and_return({})
Expand Down

0 comments on commit 4b06072

Please sign in to comment.