Skip to content

Commit

Permalink
Avoid to increment event.out conter for dropped events (elastic#13593)
Browse files Browse the repository at this point in the history
Fixes the issue elastic#8752 in event.out counter. When a pipeline contains a drop filter the total out events counter should count only the events that reached the out stage.

This PR changes CompiledExecution.compute() interface to return the number of events that effectively reached the end of the pipeline. This change is used in WorkerLoop to update correctly the event.out metric, instead of relying on the batch's size.
  • Loading branch information
andsel authored Jan 14, 2022
1 parent 2892964 commit b6da829
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 12 deletions.
1 change: 0 additions & 1 deletion logstash-core/src/main/java/org/logstash/ConvertedMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.jruby.RubyString;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.execution.WorkerLoop;

/**
* <p>This class is an internal API and behaves very different from a standard {@link Map}.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,12 +287,12 @@ public final class CompiledOrderedExecution extends CompiledExecution {
@SuppressWarnings({"unchecked"}) private final RubyArray<RubyEvent> EMPTY_ARRAY = RubyUtil.RUBY.newEmptyArray();

@Override
public void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
compute(batch.events(), flush, shutdown);
public int compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
return compute(batch.events(), flush, shutdown);
}

@Override
public void compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
public int compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
if (!batch.isEmpty()) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> filterBatch = RubyUtil.RUBY.newArray(1);
Expand All @@ -302,11 +302,14 @@ public void compute(final Collection<RubyEvent> batch, final boolean flush, fina
_compute(filterBatch, outputBatch, flush, shutdown);
}
compiledOutputs.compute(outputBatch, flush, shutdown);
return outputBatch.size();
} else if (flush || shutdown) {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray();
_compute(EMPTY_ARRAY, outputBatch, flush, shutdown);
compiledOutputs.compute(outputBatch, flush, shutdown);
return outputBatch.size();
}
return 0;
}

private void _compute(final RubyArray<RubyEvent> batch, final RubyArray<RubyEvent> outputBatch, final boolean flush, final boolean shutdown) {
Expand All @@ -319,18 +322,19 @@ private void _compute(final RubyArray<RubyEvent> batch, final RubyArray<RubyEven
public final class CompiledUnorderedExecution extends CompiledExecution {

@Override
public void compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
compute(batch.events(), flush, shutdown);
public int compute(final QueueBatch batch, final boolean flush, final boolean shutdown) {
return compute(batch.events(), flush, shutdown);
}

@Override
public void compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
public int compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown) {
// we know for now this comes from batch.collection() which returns a LinkedHashSet
final Collection<RubyEvent> result = compiledFilters.compute(RubyArray.newArray(RubyUtil.RUBY, batch), flush, shutdown);
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> outputBatch = RubyUtil.RUBY.newArray(result.size());
copyNonCancelledEvents(result, outputBatch);
compiledFilters.clear();
compiledOutputs.compute(outputBatch, flush, shutdown);
return outputBatch.size();
}
}

Expand Down Expand Up @@ -360,9 +364,13 @@ public abstract class CompiledExecution {
compiledOutputs = compileOutputs();
}

public abstract void compute(final QueueBatch batch, final boolean flush, final boolean shutdown);
/**
* @return the number of events that was processed, could be less o greater than batch.size(), depending if
* the pipeline drops or clones events during the filter stage.
* */
public abstract int compute(final QueueBatch batch, final boolean flush, final boolean shutdown);

public abstract void compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown);
public abstract int compute(final Collection<RubyEvent> batch, final boolean flush, final boolean shutdown);

/**
* Instantiates the graph of compiled filter section {@link Dataset}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@ public void run() {
if (batch.filteredSize() > 0 || isFlush) {
consumedCounter.add(batch.filteredSize());
readClient.startMetrics(batch);
execution.compute(batch, isFlush, false);
final int outputCount = execution.compute(batch, isFlush, false);
int filteredCount = batch.filteredSize();
filteredCounter.add(filteredCount);
readClient.addOutputMetrics(filteredCount);
readClient.addOutputMetrics(outputCount);
readClient.addFilteredMetrics(filteredCount);
readClient.closeBatch(batch);
if (isFlush) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.jruby.runtime.ThreadContext;
import org.junit.Test;
import org.logstash.execution.QueueBatch;
import org.logstash.execution.WorkerLoop;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down
33 changes: 33 additions & 0 deletions qa/integration/fixtures/monitoring_api_spec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,36 @@
name: Metrics test
services:
- logstash
config:
dropping_events: |-
input {
tcp {
port => '<%=options[:port]%>'
type => 'type1'
}
}
filter {
drop { }
}
output {
stdout { }
}
cloning_events: |-
input {
tcp {
port => '<%=options[:port]%>'
type => 'type1'
}
}
filter {
clone {
clones => ["sun", "moon"]
}
}
output {
stdout { }
}
47 changes: 47 additions & 0 deletions qa/integration/specs/monitoring_api_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

require_relative '../framework/fixture'
require_relative '../framework/settings'
require_relative '../framework/helpers'
require_relative '../services/logstash_service'
require "logstash/devutils/rspec/spec_helper"
require "stud/try"
Expand Down Expand Up @@ -51,6 +52,52 @@
end
end

context "verify global event counters" do
let(:tcp_port) { random_port }
let(:sample_data) { 'Hello World!' }
let(:logstash_service) { @fixture.get_service("logstash") }

before(:each) do
logstash_service.spawn_logstash("-w", "1" , "-e", config)
logstash_service.wait_for_logstash
wait_for_port(tcp_port, 60)

send_data(tcp_port, sample_data)
end

context "when a drop filter is in the pipeline" do
let(:config) { @fixture.config("dropping_events", { :port => tcp_port } ) }

it 'expose the correct output counter' do
try(max_retry) do
# node_stats can fail if the stats subsystem isn't ready
result = logstash_service.monitoring_api.node_stats rescue nil
expect(result).not_to be_nil
expect(result["events"]).not_to be_nil
expect(result["events"]["in"]).to eq(1)
expect(result["events"]["filtered"]).to eq(1)
expect(result["events"]["out"]).to eq(0)
end
end
end

context "when a clone filter is in the pipeline" do
let(:config) { @fixture.config("cloning_events", { :port => tcp_port } ) }

it 'expose the correct output counter' do
try(max_retry) do
# node_stats can fail if the stats subsystem isn't ready
result = logstash_service.monitoring_api.node_stats rescue nil
expect(result).not_to be_nil
expect(result["events"]).not_to be_nil
expect(result["events"]["in"]).to eq(1)
expect(result["events"]["filtered"]).to eq(1)
expect(result["events"]["out"]).to eq(3)
end
end
end
end

it "can retrieve JVM stats" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin
Expand Down

0 comments on commit b6da829

Please sign in to comment.