Skip to content

Commit

Permalink
Initial introduction of a deferrable pool for resource pooling where …
Browse files Browse the repository at this point in the history
…the work can be encapsulated using deferrables
  • Loading branch information
raggi committed May 24, 2011
1 parent 61239dd commit bc999d2
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 0 deletions.
1 change: 1 addition & 0 deletions lib/em/deferrable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

module EventMachine
module Deferrable
autoload :Pool, 'em/deferrable/pool'

# Specify a block to be executed if and when the Deferrable object receives
# a status of :succeeded. See #set_deferred_status for more information.
Expand Down
97 changes: 97 additions & 0 deletions lib/em/deferrable/pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# = EM::Deferrable::Pool
#
# A simple async resource pool based on a resource and work queue. Resources
# are enqueued and work waits for resources to become available.
#
# Example:
#
# EM.run do
# pool = EM::Deferrable::Pool.new
# spawn = lambda { pool.add EM::HttpRequest.new('http://example.org') }
# 10.times { spawn[] }
# done, scheduled = 0, 0
#
# check = lambda do
# done += 1
# if done >= scheduled
# EM.stop
# end
# end
#
# pool.on_error { |conn| spawn[] }
#
# 100.times do
# pool.perform do |conn|
# req = conn.get :path => '/', :keepalive => true
#
# req.callback do
# p [:success, conn.object_id, i, req.response.size]
# check[]
# end
#
# req.errback { check[] }
#
# req
# end
# end
# end
#
class EM::Deferrable::Pool
def initialize
@resources = EM::Queue.new
@removed = []
end

def add resource
@resources.push resource
end
alias requeue add

def remove resource
@removed << resource
end

def on_error *a, &b
@on_error = EM::Callback(*a, &b)
end

def perform(*a, &b)
work = EM::Callback(*a, &b)

@resources.pop do |resource|
if removed? resource
@removed.delete resource
reschedule work
else
process work, resource
end
end
end
alias reschedule perform

def process work, resource
deferrable = work.call resource
if deferrable.kind_of?(EM::Deferrable)
completion deferrable, resource
else
raise ArgumentError, "deferrable expected from work"
end
end

def completion deferrable, resource
deferrable.callback { requeue resource }
deferrable.errback { failure resource }
end

def failure resource
if @on_error
@on_error.call resource
else
requeue resource
end
end

def removed? resource
@removed.include? resource
end
end
107 changes: 107 additions & 0 deletions tests/test_deferrable_pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
class TestDeferrablePool < Test::Unit::TestCase
def pool
@pool ||= EM::Deferrable::Pool.new
end

def go
EM.run { yield }
end

def stop
EM.stop
end

def deferrable
@deferrable ||= EM::DefaultDeferrable.new
end

def test_supports_more_work_than_resources
ran = false
go do
pool.perform do
ran = true
deferrable
end
stop
end
assert_equal false, ran
go do
pool.add :resource
stop
end
assert_equal true, ran
end

def test_reques_resources_on_error
pooled_res, pooled_res2 = nil
pool.add :res
go do
pool.perform do |res|
pooled_res = res
deferrable
end
stop
end
deferrable.fail
go do
pool.perform do |res|
pooled_res2 = res
deferrable
end
stop
end
assert_equal :res, pooled_res
assert_equal pooled_res, pooled_res2
end

def test_supports_custom_error_handler
eres = nil
pool.on_error do |res|
eres = res
end
performs = []
pool.add :res
go do
pool.perform do |res|
performs << res
deferrable
end
pool.perform do |res|
performs << res
deferrable
end
deferrable.fail
stop
end
assert_equal :res, eres
# manual requeues required when error handler is installed:
assert_equal 1, performs.size
assert_equal :res, performs.first
end

def test_catches_successful_deferrables
performs = []
pool.add :res
go do
pool.perform { |res| performs << res; deferrable }
pool.perform { |res| performs << res; deferrable }
stop
end
assert_equal [:res], performs
deferrable.succeed
go { stop }
assert_equal [:res, :res], performs
end

def test_prunes_locked_and_removed_resources
performs = []
pool.add :res
deferrable.succeed
go do
pool.perform { |res| performs << res; pool.remove res; deferrable }
pool.perform { |res| performs << res; pool.remove res; deferrable }
stop
end
assert_equal [:res], performs
end
end

0 comments on commit bc999d2

Please sign in to comment.