Skip to content

Commit

Permalink
Add multi-pipeline test
Browse files Browse the repository at this point in the history
Needs DRY-ing up before commit, but tests should be valid

Fixes elastic#8026
  • Loading branch information
robbavey committed Sep 1, 2017
1 parent 6b1ffbc commit b1b3a11
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 9 deletions.
4 changes: 2 additions & 2 deletions config/jvm.options
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@
#-Djna.nosys=true

# Turn on JRuby invokedynamic
#-Djruby.compile.invokedynamic=true
-Djruby.compile.invokedynamic=true
# Force Compilation
#-Djruby.jit.threshold=0
-Djruby.jit.threshold=0

## heap dumps

Expand Down
57 changes: 50 additions & 7 deletions qa/integration/specs/dlq_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

describe "Test Dead Letter Queue" do

before(:all) {
before(:each) {
@fixture = Fixture.new(__FILE__)
IO.write(config_yaml_file, config_yaml)
}

after(:all) {
after(:each) {
begin
es_client = @fixture.get_service("elasticsearch").get_client
es_client.indices.delete(index: 'logstash-*') unless es_client.nil?
Expand All @@ -30,16 +31,15 @@
}

let!(:settings_dir) { Stud::Temporary.directory }
let!(:mp_settings_dir) { Stud::Temporary.directory }
let!(:config_yaml) { dlq_config.to_yaml }
let!(:config_yaml_file) { ::File.join(settings_dir, "logstash.yml") }
let(:generator_config_file) { config_to_temp_file(@fixture.config("root",{ :dlq_dir => dlq_dir })) }

let!(:pipelines_yaml) { pipelines.to_yaml }
let!(:pipelines_yaml_file) { ::File.join(settings_dir, "pipelines.yml") }

before(:each) do
IO.write(config_yaml_file, config_yaml)
end

it 'can index 1000 generated documents' do
it 'can index 1000 documents via dlq - single pipeline' do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_background_with_config_settings(generator_config_file, settings_dir)
es_service = @fixture.get_service("elasticsearch")
Expand All @@ -60,4 +60,47 @@
s = result["hits"]["hits"][0]["_source"]
expect(s["mutated"]).to eq("true")
end

let(:pipelines) {[
{
"pipeline.id" => "test",
"pipeline.workers" => 1,
"dead_letter_queue.enable" => true,
"pipeline.batch.size" => 1,
"config.string" => "input { generator { message => '{\"test\":\"one\"}' codec => \"json\" count => 1000 } } filter { mutate { add_field => { \"geoip\" => \"somewhere\" } } } output { elasticsearch {} }"
},
{
"pipeline.id" => "test2",
"pipeline.workers" => 1,
"dead_letter_queue.enable" => false,
"pipeline.batch.size" => 1,
"config.string" => "input { dead_letter_queue { pipeline_id => 'test' path => \"#{dlq_dir}\" commit_offsets => true } } filter { mutate { remove_field => [\"geoip\"] add_field => {\"mutated\" => \"true\" } } } output { elasticsearch {} }"
}
]}

let!(:pipelines_yaml) { pipelines.to_yaml }
let!(:pipelines_yaml_file) { ::File.join(settings_dir, "pipelines.yml") }


it 'can index 1000 documents via dlq - multi pipeline' do
IO.write(pipelines_yaml_file, pipelines_yaml)
logstash_service = @fixture.get_service("logstash")
logstash_service.spawn_logstash("--path.settings", settings_dir, "--log.level=debug")
es_service = @fixture.get_service("elasticsearch")
es_client = es_service.get_client
# Wait for es client to come up
sleep(10)
# test if all data was indexed by ES, but first refresh manually
es_client.indices.refresh

logstash_service.wait_for_logstash
try(50) do
result = es_client.search(index: 'logstash-*', size: 0, q: '*')
expect(result["hits"]["total"]).to eq(1000)
end

result = es_client.search(index: 'logstash-*', size: 1, q: '*')
s = result["hits"]["hits"][0]["_source"]
expect(s["mutated"]).to eq("true")
end
end

0 comments on commit b1b3a11

Please sign in to comment.