Skip to content

Commit

Permalink
Merge pull request eventmachine#468 from morandanieli/iterator_fixes
Browse files Browse the repository at this point in the history
Added concurrency validation to EM::Iterator
  • Loading branch information
tmm1 committed Sep 30, 2013
2 parents 8338ceb + d32fbae commit 848638c
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 44 deletions.
49 changes: 5 additions & 44 deletions lib/em/iterator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class Iterator
#
def initialize(list, concurrency = 1)
raise ArgumentError, 'argument must be an array' unless list.respond_to?(:to_a)
raise ArgumentError, 'concurrency must be bigger than zero' unless (concurrency > 0)
@list = list.to_a.dup
@concurrency = concurrency

Expand Down Expand Up @@ -224,47 +225,7 @@ def spawn_workers
end
end

if __FILE__ == $0
$:.unshift File.join(File.dirname(__FILE__), '..')
require 'eventmachine'

# TODO: real tests
# TODO: pass in one object instead of two? .each{ |iter| puts iter.current; iter.next }
# TODO: support iter.pause/resume/stop/break/continue?
# TODO: create some exceptions instead of using RuntimeError
# TODO: support proc instead of enumerable? EM::Iterator.new(proc{ return queue.pop })

EM.run{
EM::Iterator.new(1..50).each{ |num,iter| p num; iter.next }
EM::Iterator.new([1,2,3], 10).each{ |num,iter| p num; iter.next }

i = EM::Iterator.new(1..100, 5)
i.each(proc{|num,iter|
p num.to_s
iter.next
}, proc{
p :done
})
EM.add_timer(0.03){
i.concurrency = 1
}
EM.add_timer(0.04){
i.concurrency = 3
}

EM::Iterator.new(100..150).map(proc{ |num,iter|
EM.add_timer(0.01){ iter.return(num) }
}, proc{ |results|
p results
})

EM::Iterator.new(%w[ pwd uptime uname date ], 2).inject({}, proc{ |hash,cmd,iter|
EM.system(cmd){ |output,status|
hash[cmd] = status.exitstatus == 0 ? output.strip : nil
iter.return(hash)
}
}, proc{ |results|
p results
})
}
end
# TODO: pass in one object instead of two? .each{ |iter| puts iter.current; iter.next }
# TODO: support iter.pause/resume/stop/break/continue?
# TODO: create some exceptions instead of using RuntimeError
# TODO: support proc instead of enumerable? EM::Iterator.new(proc{ return queue.pop })
97 changes: 97 additions & 0 deletions tests/test_iterator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
require 'em_test_helper'

class TestIterator < Test::Unit::TestCase

def get_time
EM.current_time.strftime('%H:%M:%S')
end

def test_default_concurrency
items = {}
list = 1..10
EM.run {
EM::Iterator.new(list).each( proc {|num,iter|
time = get_time
items[time] ||= []
items[time] << num
EM::Timer.new(1) {iter.next}
}, proc {EM.stop})
}
assert_equal(10, items.keys.size)
assert_equal((list).to_a, items.values.flatten)
end

def test_concurrency_bigger_than_list_size
items = {}
list = [1,2,3]
EM.run {
EM::Iterator.new(list,10).each(proc {|num,iter|
time = get_time
items[time] ||= []
items[time] << num
EM::Timer.new(1) {iter.next}
}, proc {EM.stop})
}
assert_equal(1, items.keys.size)
assert_equal((list).to_a, items.values.flatten)
end


def test_changing_concurrency_affects_active_iteration
items = {}
list = 1..25
EM.run {
i = EM::Iterator.new(list,5)
i.each(proc {|num,iter|
time = get_time
items[time] ||= []
items[time] << num
EM::Timer.new(1) {iter.next}
}, proc {EM.stop})
EM.add_timer(1){
i.concurrency = 1
}
EM.add_timer(3){
i.concurrency = 3
}
}
assert_equal(9, items.keys.size)
assert_equal((list).to_a, items.values.flatten)
end

def test_map
list = 100..150
EM.run {
EM::Iterator.new(list).map(proc{ |num,iter|
EM.add_timer(0.01){ iter.return(num) }
}, proc{ |results|
assert_equal((list).to_a.size, results.size)
EM.stop
})
}
end

def test_inject
list = %w[ pwd uptime uname date ]
EM.run {
EM::Iterator.new(list, 2).inject({}, proc{ |hash,cmd,iter|
EM.system(cmd){ |output,status|
hash[cmd] = status.exitstatus == 0 ? output.strip : nil
iter.return(hash)
}
}, proc{ |results|
assert_equal(results.keys, list)
EM.stop
})
}
end

def test_concurrency_is_0
EM.run {
assert_raise ArgumentError do
EM::Iterator.new(1..5,0)
end
EM.stop
}
end
end

0 comments on commit 848638c

Please sign in to comment.