diff --git a/lib/em/iterator.rb b/lib/em/iterator.rb index 3bebf2c72..035aa9d01 100644 --- a/lib/em/iterator.rb +++ b/lib/em/iterator.rb @@ -50,6 +50,7 @@ class Iterator # def initialize(list, concurrency = 1) raise ArgumentError, 'argument must be an array' unless list.respond_to?(:to_a) + raise ArgumentError, 'concurrency must be bigger than zero' unless (concurrency > 0) @list = list.to_a.dup @concurrency = concurrency @@ -224,47 +225,7 @@ def spawn_workers end end -if __FILE__ == $0 - $:.unshift File.join(File.dirname(__FILE__), '..') - require 'eventmachine' - - # TODO: real tests - # TODO: pass in one object instead of two? .each{ |iter| puts iter.current; iter.next } - # TODO: support iter.pause/resume/stop/break/continue? - # TODO: create some exceptions instead of using RuntimeError - # TODO: support proc instead of enumerable? EM::Iterator.new(proc{ return queue.pop }) - - EM.run{ - EM::Iterator.new(1..50).each{ |num,iter| p num; iter.next } - EM::Iterator.new([1,2,3], 10).each{ |num,iter| p num; iter.next } - - i = EM::Iterator.new(1..100, 5) - i.each(proc{|num,iter| - p num.to_s - iter.next - }, proc{ - p :done - }) - EM.add_timer(0.03){ - i.concurrency = 1 - } - EM.add_timer(0.04){ - i.concurrency = 3 - } - - EM::Iterator.new(100..150).map(proc{ |num,iter| - EM.add_timer(0.01){ iter.return(num) } - }, proc{ |results| - p results - }) - - EM::Iterator.new(%w[ pwd uptime uname date ], 2).inject({}, proc{ |hash,cmd,iter| - EM.system(cmd){ |output,status| - hash[cmd] = status.exitstatus == 0 ? output.strip : nil - iter.return(hash) - } - }, proc{ |results| - p results - }) - } -end +# TODO: pass in one object instead of two? .each{ |iter| puts iter.current; iter.next } +# TODO: support iter.pause/resume/stop/break/continue? +# TODO: create some exceptions instead of using RuntimeError +# TODO: support proc instead of enumerable? EM::Iterator.new(proc{ return queue.pop }) diff --git a/tests/test_iterator.rb b/tests/test_iterator.rb new file mode 100644 index 000000000..f302a25e0 --- /dev/null +++ b/tests/test_iterator.rb @@ -0,0 +1,97 @@ +require 'em_test_helper' + +class TestIterator < Test::Unit::TestCase + + def get_time + EM.current_time.strftime('%H:%M:%S') + end + + def test_default_concurrency + items = {} + list = 1..10 + EM.run { + EM::Iterator.new(list).each( proc {|num,iter| + time = get_time + items[time] ||= [] + items[time] << num + EM::Timer.new(1) {iter.next} + }, proc {EM.stop}) + } + assert_equal(10, items.keys.size) + assert_equal((list).to_a, items.values.flatten) + end + + def test_concurrency_bigger_than_list_size + items = {} + list = [1,2,3] + EM.run { + EM::Iterator.new(list,10).each(proc {|num,iter| + time = get_time + items[time] ||= [] + items[time] << num + EM::Timer.new(1) {iter.next} + }, proc {EM.stop}) + } + assert_equal(1, items.keys.size) + assert_equal((list).to_a, items.values.flatten) + end + + + def test_changing_concurrency_affects_active_iteration + items = {} + list = 1..25 + EM.run { + i = EM::Iterator.new(list,5) + i.each(proc {|num,iter| + time = get_time + items[time] ||= [] + items[time] << num + EM::Timer.new(1) {iter.next} + }, proc {EM.stop}) + EM.add_timer(1){ + i.concurrency = 1 + } + EM.add_timer(3){ + i.concurrency = 3 + } + } + assert_equal(9, items.keys.size) + assert_equal((list).to_a, items.values.flatten) + end + + def test_map + list = 100..150 + EM.run { + EM::Iterator.new(list).map(proc{ |num,iter| + EM.add_timer(0.01){ iter.return(num) } + }, proc{ |results| + assert_equal((list).to_a.size, results.size) + EM.stop + }) + } + end + + def test_inject + list = %w[ pwd uptime uname date ] + EM.run { + EM::Iterator.new(list, 2).inject({}, proc{ |hash,cmd,iter| + EM.system(cmd){ |output,status| + hash[cmd] = status.exitstatus == 0 ? output.strip : nil + iter.return(hash) + } + }, proc{ |results| + assert_equal(results.keys, list) + EM.stop + }) + } + end + + def test_concurrency_is_0 + EM.run { + assert_raise ArgumentError do + EM::Iterator.new(1..5,0) + end + EM.stop + } + end +end \ No newline at end of file