Skip to content

Commit

Permalink
use method-level ensure blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed Jun 29, 2019
1 parent 0304da9 commit 4b2857d
Showing 1 changed file with 47 additions and 53 deletions.
100 changes: 47 additions & 53 deletions logstash-core/lib/logstash/pipelines_registry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,27 @@ def initialize
# @return [Boolean] new pipeline creation success
def create_pipeline(pipeline_id, pipeline, &create_block)
lock = get_lock(pipeline_id)
begin
lock.lock

success = false

state = @states.get(pipeline_id)
if state
if state.terminated?
success = yield
state.set_pipeline(pipeline)
else
logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
end
@states.put(pipeline_id, state)
else
lock.lock

success = false

state = @states.get(pipeline_id)
if state
if state.terminated?
success = yield
@states.put(pipeline_id, PipelineState.new(pipeline_id, pipeline)) if success
state.set_pipeline(pipeline)
else
logger.error("Attempted to create a pipeline that already exists", :pipeline_id => pipeline_id)
end

success
ensure
lock.unlock
@states.put(pipeline_id, state)
else
success = yield
@states.put(pipeline_id, PipelineState.new(pipeline_id, pipeline)) if success
end

success
ensure
lock.unlock
end

# Execute the passed termination logic block
Expand All @@ -80,20 +78,18 @@ def create_pipeline(pipeline_id, pipeline, &create_block)
# @yieldparam [Pipeline] the pipeline to terminate
def terminate_pipeline(pipeline_id, &stop_block)
lock = get_lock(pipeline_id)
begin
lock.lock
lock.lock

state = @states.get(pipeline_id)
if state.nil?
logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id)
@states.remove(pipeline_id)
else
yield(state.pipeline)
@states.put(pipeline_id, state)
end
ensure
lock.unlock
state = @states.get(pipeline_id)
if state.nil?
logger.error("Attempted to terminate a pipeline that does not exists", :pipeline_id => pipeline_id)
@states.remove(pipeline_id)
else
yield(state.pipeline)
@states.put(pipeline_id, state)
end
ensure
lock.unlock
end

# Execute the passed reloading logic block in the context of the reloading state and set new pipeline in state
Expand All @@ -105,29 +101,27 @@ def terminate_pipeline(pipeline_id, &stop_block)
# @return [Boolean] new pipeline creation success
def reload_pipeline(pipeline_id, &reload_block)
lock = get_lock(pipeline_id)
begin
lock.lock
success = false

state = @states.get(pipeline_id)
if state.nil?
logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id)
@states.remove(pipeline_id)
else
state.set_reloading(true)
begin
success, new_pipeline = yield
state.set_pipeline(new_pipeline)
ensure
state.set_reloading(false)
end
@states.put(pipeline_id, state)
end
lock.lock
success = false

success
ensure
lock.unlock
state = @states.get(pipeline_id)
if state.nil?
logger.error("Attempted to reload a pipeline that does not exists", :pipeline_id => pipeline_id)
@states.remove(pipeline_id)
else
state.set_reloading(true)
begin
success, new_pipeline = yield
state.set_pipeline(new_pipeline)
ensure
state.set_reloading(false)
end
@states.put(pipeline_id, state)
end

success
ensure
lock.unlock
end

# @param pipeline_id [String, Symbol] the pipeline id
Expand Down

0 comments on commit 4b2857d

Please sign in to comment.