Skip to content

Commit

Permalink
Improve timeout specifications
Browse files Browse the repository at this point in the history
  • Loading branch information
gdb committed Jan 21, 2013
1 parent d141246 commit a1e25ea
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 22 deletions.
40 changes: 22 additions & 18 deletions lib/bunny/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ def initialize(connection = nil, id = nil, work_pool = ConsumerWorkPool.new(1))
@next_publish_seq_no = 0
end

def read_write_timeout
@connection.read_write_timeout
end

# Opens the channel and resets its internal state
# @return [Bunny::Channel] Self
# @api public
Expand Down Expand Up @@ -573,7 +577,7 @@ def basic_qos(prefetch_count, global = false)

@connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, prefetch_count, global))

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_qos_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -592,7 +596,7 @@ def basic_recover(requeue)
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_recover_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down Expand Up @@ -791,7 +795,7 @@ def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, e
add_consumer(queue_name, consumer_tag, no_ack, exclusive, arguments, &block)
end

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_consume_ok = wait_on_continuations
end
# covers server-generated consumer tags
Expand Down Expand Up @@ -827,7 +831,7 @@ def basic_consume_with(consumer)
register_consumer(consumer.consumer_tag, consumer)
end

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_consume_ok = wait_on_continuations
end
# covers server-generated consumer tags
Expand All @@ -849,7 +853,7 @@ def basic_consume_with(consumer)
def basic_cancel(consumer_tag)
@connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_basic_cancel_ok = wait_on_continuations
end

Expand Down Expand Up @@ -915,7 +919,7 @@ def queue_delete(name, opts = {})
opts[:if_unused],
opts[:if_empty],
false))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_queue_delete_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -935,7 +939,7 @@ def queue_purge(name, opts = {})

@connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@id, name, false))

Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_queue_purge_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down Expand Up @@ -971,7 +975,7 @@ def queue_bind(name, exchange, opts = {})
opts[:routing_key],
false,
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_queue_bind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1006,7 +1010,7 @@ def queue_unbind(name, exchange, opts = {})
exchange_name,
opts[:routing_key],
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_queue_unbind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1045,7 +1049,7 @@ def exchange_declare(name, type, opts = {})
false,
false,
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_declare_ok = wait_on_continuations
end

Expand All @@ -1070,7 +1074,7 @@ def exchange_delete(name, opts = {})
name,
opts[:if_unused],
false))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_delete_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1114,7 +1118,7 @@ def exchange_bind(source, destination, opts = {})
opts[:routing_key],
false,
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_bind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1158,7 +1162,7 @@ def exchange_unbind(source, destination, opts = {})
opts[:routing_key],
false,
opts[:arguments]))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_unbind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1186,7 +1190,7 @@ def channel_flow(active)
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_channel_flow_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1207,7 +1211,7 @@ def tx_select
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_tx_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1222,7 +1226,7 @@ def tx_commit
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_tx_commit_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1237,7 +1241,7 @@ def tx_rollback
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_tx_rollback_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down Expand Up @@ -1277,7 +1281,7 @@ def confirm_select(callback=nil)
@confirms_callback = callback

@connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
Bunny::Timer.timeout(1, ClientTimeout) do
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
@last_confirm_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down
7 changes: 5 additions & 2 deletions lib/bunny/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ def start
@default_channel = self.create_channel
end

def read_write_timeout
@transport.read_write_timeout
end

def create_channel(n = nil)
if n && (ch = @channels[n])
Expand All @@ -142,7 +145,7 @@ def close
if @transport.open?
close_all_channels

Bunny::Timer.timeout(@disconnect_timeout, ClientTimeout) do
Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do
self.close_connection(false)
end
end
Expand Down Expand Up @@ -229,7 +232,7 @@ def close_channel(ch)

def close_all_channels
@channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
Bunny::Timer.timeout(@disconnect_timeout, ClientTimeout) { ch.close }
Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
end
end

Expand Down
5 changes: 3 additions & 2 deletions lib/bunny/transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Transport
DEFAULT_CONNECTION_TIMEOUT = 5.0


attr_reader :session, :host, :port, :socket, :connect_timeout
attr_reader :session, :host, :port, :socket, :connect_timeout, :read_write_timeout, :disconnect_timeout

def initialize(session, host, port, opts)
@session = session
Expand All @@ -31,8 +31,9 @@ def initialize(session, host, port, opts)

@read_write_timeout = opts[:socket_timeout] || 1
@read_write_timeout = nil if @read_write_timeout == 0
@disconnect_timeout = @read_write_timeout || @connect_timeout
@connect_timeout = self.timeout_from(opts)
@connect_timeout = nil if @connect_timeout == 0
@disconnect_timeout = @read_write_timeout || @connect_timeout

@frames = Hash.new { Array.new }

Expand Down

0 comments on commit a1e25ea

Please sign in to comment.