Skip to content

Commit

Permalink
PERFORMANCE: Avoid redundant (and high overhead) size calls to Java H…
Browse files Browse the repository at this point in the history
…ashMap

Fixes elastic#8448
  • Loading branch information
original-brownbear committed Oct 13, 2017
1 parent 54611c3 commit b062da4
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 24 deletions.
5 changes: 3 additions & 2 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -702,9 +702,10 @@ def inspect

def execute_batch(batched_execution, batch, flush)
batched_execution.compute(batch, flush, false)
@filter_queue_client.add_output_metrics(batch)
@filter_queue_client.add_filtered_metrics(batch)
@events_filtered.increment(batch.size)
filtered_size = batch.filtered_size
@filter_queue_client.add_output_metrics(filtered_size)
@filter_queue_client.add_filtered_metrics(filtered_size)
rescue Exception => e
# Plugins authors should manage their own exceptions in the plugin code
# but if an exception is raised up to the worker thread they are considered
Expand Down
4 changes: 2 additions & 2 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ def filter_batch(batch)
#these are both original and generated events
batch.merge(e) unless e.cancelled?
end
@filter_queue_client.add_filtered_metrics(batch)
@filter_queue_client.add_filtered_metrics(batch.filtered_size)
@events_filtered.increment(batch.size)
rescue Exception => e
# Plugins authors should manage their own exceptions in the plugin code
Expand Down Expand Up @@ -532,7 +532,7 @@ def output_batch(batch, output_events_map)
events.clear
end

@filter_queue_client.add_output_metrics(batch)
@filter_queue_client.add_output_metrics(batch.filtered_size)
end

def wait_inputs
Expand Down
18 changes: 6 additions & 12 deletions logstash-core/lib/logstash/util/wrapped_acked_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,14 @@ def stop_clock(batch)
end
end

def add_starting_metrics(batch)
return if @event_metric.nil? || @pipeline_metric.nil?
@event_metric.increment(:in, batch.starting_size)
@pipeline_metric.increment(:in, batch.starting_size)
def add_filtered_metrics(filtered_size)
@event_metric.increment(:filtered, filtered_size)
@pipeline_metric.increment(:filtered, filtered_size)
end

def add_filtered_metrics(batch)
@event_metric.increment(:filtered, batch.filtered_size)
@pipeline_metric.increment(:filtered, batch.filtered_size)
end

def add_output_metrics(batch)
@event_metric.increment(:out, batch.filtered_size)
@pipeline_metric.increment(:out, batch.filtered_size)
def add_output_metrics(filtered_size)
@event_metric.increment(:out, filtered_size)
@pipeline_metric.increment(:out, filtered_size)
end
end

Expand Down
12 changes: 6 additions & 6 deletions logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,14 @@ def stop_clock(batch)
end
end

def add_filtered_metrics(batch)
@event_metric_filtered.increment(batch.filtered_size)
@pipeline_metric_filtered.increment(batch.filtered_size)
def add_filtered_metrics(filtered_size)
@event_metric_filtered.increment(filtered_size)
@pipeline_metric_filtered.increment(filtered_size)
end

def add_output_metrics(batch)
@event_metric_out.increment(batch.filtered_size)
@pipeline_metric_out.increment(batch.filtered_size)
def add_output_metrics(filtered_size)
@event_metric_out.increment(filtered_size)
@pipeline_metric_out.increment(filtered_size)
end
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
sleep(0.1) # simulate some work for the `duration_in_millis`
# TODO: this interaction should be cleaned in an upcoming PR,
# This is what the current pipeline does.
read_client.add_filtered_metrics(read_batch)
read_client.add_output_metrics(read_batch)
read_client.add_filtered_metrics(read_batch.filtered_size)
read_client.add_output_metrics(read_batch.filtered_size)
read_client.close_batch(read_batch)
store = collector.snapshot_metric.metric_store

Expand Down

0 comments on commit b062da4

Please sign in to comment.