Skip to content
This repository has been archived by the owner on Nov 4, 2020. It is now read-only.

Commit

Permalink
add acked_queue concurrent stress spec
Browse files Browse the repository at this point in the history
introduce rake test:core-fast and rake test:core-slow

remove custom rspec config
  • Loading branch information
colinsurprenant committed Apr 21, 2017
1 parent 6e180b0 commit e732255
Show file tree
Hide file tree
Showing 2 changed files with 302 additions and 3 deletions.
291 changes: 291 additions & 0 deletions logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
# encoding: utf-8
require "logstash/util/wrapped_acked_queue"
require "logstash/event"
require "logstash/instrument/namespaced_metric"

describe LogStash::Util::WrappedAckedQueue, :stress_test => true do
let(:path) { Stud::Temporary.directory }

context "with multiple writers" do
let(:items) { expected_count / writers }
let(:page_capacity) { 1 << page_capacity_multiplier }
let(:queue_capacity) { page_capacity * queue_capacity_multiplier }

let(:output_strings) { [] }
let(:reject_memo_keys) { [:reject_memo_keys, :path, :queue, :writer_threads, :collector, :metric, :reader_threads, :output_strings] }

let(:queue) do
described_class.create_file_based(path, page_capacity, 0, queue_checkpoint_acks, queue_checkpoint_writes, queue_checkpoint_interval, queue_capacity)
end

let(:writer_threads) do
writer = queue.write_client
writers.times.map do |i|
Thread.new(i, items, writer) do |_i, _items, _writer|
publisher(_items, _writer)
end
end
end

let(:writers_finished) { Concurrent::AtomicBoolean.new(false) }

let(:reader_threads) do
reader = queue.read_client
reader.set_batch_dimensions(batch_size, batch_wait)
reader.set_events_metric(metric.namespace([:stats, :events]))
reader.set_pipeline_metric(metric.namespace([:stats, :pipelines, :main, :events]))

readers.times.map do |i|
Thread.new(i, reader, counts) do |_i, _reader, _counts|
begin
tally = 0
while true
batch = _reader.read_batch
break if batch.size.zero? && writers_finished.value == true && queue.queue.is_fully_acked?
sleep(rand * 0.01) if simulate_work
tally += batch.size
batch.close
end
_counts[_i] = tally
# puts("reader #{_i}, tally=#{tally}, _counts=#{_counts.inspect}")
rescue => e
p :reader_error => e
end
end
end
end

def publisher(items, writer)
items.times.each do |i|
event = LogStash::Event.new("sequence" => "#{i}".ljust(string_size))
writer.push(event)
end
rescue => e
p :publisher_error => e
end

let(:collector) { LogStash::Instrument::Collector.new }
let(:metric) { LogStash::Instrument::Metric.new(collector) }

shared_examples "a well behaved queue" do
it "writes, reads, closes and reopens" do
Thread.abort_on_exception = true

# force lazy initialization to avoid concurency issues within threads
counts
queue

# Start the threads
writer_threads
reader_threads

writer_threads.each(&:join)
writers_finished.make_true

reader_threads.each(&:join)

enqueued = queue.queue.unread_count

if enqueued != 0
output_strings << "unread events in queue: #{enqueued}"
end

got = counts.reduce(&:+)

if got != expected_count
# puts("count=#{counts.inspect}")
output_strings << "events read: #{got}"
end

sleep 0.1
expect { queue.close }.not_to raise_error
sleep 0.1
files = Dir.glob(path + '/*').map{|f| f.sub("#{path}/", '')}
if files.count != 2
output_strings << "File count after close mismatch expected: 2 got: #{files.count}"
output_strings.concat files
end

begin
queue.queue.open
rescue Exception => e
output_strings << e.message
end

queue.queue.close

if output_strings.any?
output_strings << __memoized.reject{|k,v| reject_memo_keys.include?(k)}.inspect
end

expect(output_strings).to eq([])
end
end

let(:writers) { 3 }
let(:readers) { 3 }
let(:simulate_work) { true }
let(:counts) { Concurrent::Array.new([0, 0, 0, 0, 0, 0, 0, 0]) }
let(:page_capacity_multiplier) { 20 }
let(:queue_capacity_multiplier) { 128 }
let(:queue_checkpoint_acks) { 1024 }
let(:queue_checkpoint_writes) { 1024 }
let(:queue_checkpoint_interval) { 1000 }
let(:batch_size) { 500 }
let(:batch_wait) { 1000 }
let(:expected_count) { 60000 }
let(:string_size) { 256 }

describe "with simulate_work ON" do
let(:simulate_work) { true }

context "> more writers than readers <" do
let(:writers) { 4 }
let(:readers) { 2 }
it_behaves_like "a well behaved queue"
end

context "> less writers than readers <" do
let(:writers) { 2 }
let(:readers) { 4 }
it_behaves_like "a well behaved queue"
end

context "> larger checkpoint acks <" do
let(:queue_checkpoint_acks) { 3000 }
it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint acks <" do
let(:queue_checkpoint_acks) { 500 }
it_behaves_like "a well behaved queue"
end

context "> larger checkpoint writes <" do
let(:queue_checkpoint_writes) { 3000 }
it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint writes <" do
let(:queue_checkpoint_writes) { 500 }
it_behaves_like "a well behaved queue"
end

context "> larger checkpoint interval <" do
let(:queue_checkpoint_interval) { 3000 }
it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint interval <" do
let(:queue_checkpoint_interval) { 500 }
it_behaves_like "a well behaved queue"
end

context "> smaller batch wait <" do
let(:batch_wait) { 125 }
it_behaves_like "a well behaved queue"
end

context "> larger batch wait <" do
let(:batch_wait) { 5000 }
it_behaves_like "a well behaved queue"
end

context "> smaller event size <" do
let(:string_size) { 8 }
it_behaves_like "a well behaved queue"
end

context "> larger event size <" do
let(:string_size) { 8192 }
it_behaves_like "a well behaved queue"
end

context "> small queue size limit <" do
let(:queue_capacity_multiplier) { 10 }
it_behaves_like "a well behaved queue"
end

context "> very large queue size limit <" do
let(:queue_capacity_multiplier) { 512 }
it_behaves_like "a well behaved queue"
end
end

describe "with simulate_work OFF" do
let(:simulate_work) { false }

context "> more writers than readers <" do
let(:writers) { 4 }
let(:readers) { 2 }
it_behaves_like "a well behaved queue"
end

context "> less writers than readers <" do
let(:writers) { 2 }
let(:readers) { 4 }
it_behaves_like "a well behaved queue"
end

context "> larger checkpoint acks <" do
let(:queue_checkpoint_acks) { 3000 }
it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint acks <" do
let(:queue_checkpoint_acks) { 500 }
it_behaves_like "a well behaved queue"
end

context "> larger checkpoint writes <" do
let(:queue_checkpoint_writes) { 3000 }
it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint writes <" do
let(:queue_checkpoint_writes) { 500 }
it_behaves_like "a well behaved queue"
end

context "> larger checkpoint interval <" do
let(:queue_checkpoint_interval) { 3000 }
it_behaves_like "a well behaved queue"
end

context "> smaller checkpoint interval <" do
let(:queue_checkpoint_interval) { 500 }
it_behaves_like "a well behaved queue"
end

context "> smaller batch wait <" do
let(:batch_wait) { 125 }
it_behaves_like "a well behaved queue"
end

context "> larger batch wait <" do
let(:batch_wait) { 5000 }
it_behaves_like "a well behaved queue"
end

context "> smaller event size <" do
let(:string_size) { 8 }
it_behaves_like "a well behaved queue"
end

context "> larger event size <" do
let(:string_size) { 8192 }
it_behaves_like "a well behaved queue"
end

context "> small queue size limit <" do
let(:queue_capacity_multiplier) { 10 }
it_behaves_like "a well behaved queue"
end

context "> very large queue size limit <" do
let(:queue_capacity_multiplier) { 512 }
it_behaves_like "a well behaved queue"
end
end
end
end
14 changes: 11 additions & 3 deletions rakelib/test.rake
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,20 @@ namespace "test" do
Rake::FileList[*specs]
end

desc "run core specs"
task "core" => ["setup"] do
desc "run all core specs"
task "core" => ["core-slow"]

desc "run all core specs"
task "core-slow" => ["setup"] do
exit(RSpec::Core::Runner.run([core_specs]))
end

desc "run core specs in fail-fast mode"
desc "run core specs excluding slower tests like stress tests"
task "core-fast" => ["setup"] do
exit(RSpec::Core::Runner.run(["--tag", "~stress_test", core_specs]))
end

desc "run all core specs in fail-fast mode"
task "core-fail-fast" => ["setup"] do
exit(RSpec::Core::Runner.run(["--fail-fast", core_specs]))
end
Expand Down

0 comments on commit e732255

Please sign in to comment.