Skip to content

Commit

Permalink
JAVAFICATION: Move more of the pipelines to Java
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Jun 19, 2018
1 parent fbb1677 commit cccd044
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 52 deletions.
20 changes: 3 additions & 17 deletions logstash-core/lib/logstash/java_pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ module LogStash; class JavaPipeline < JavaBasePipeline
:events_consumed,
:events_filtered,
:started_at,
:thread,
:filter_queue_client
:thread

MAX_INFLIGHT_WARN_THRESHOLD = 10_000

Expand All @@ -24,13 +23,6 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)
super pipeline_config, namespaced_metric, @logger, agent
@worker_threads = []

@filter_queue_client = queue.read_client
# Note that @inflight_batches as a central mechanism for tracking inflight
# batches will fail if we have multiple read clients here.
@filter_queue_client.set_events_metric(metric.namespace([:stats, :events]))
@filter_queue_client.set_pipeline_metric(
metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events])
)
@drain_queue = settings.get_value("queue.drain") || settings.get("queue.type") == "memory"

@events_filtered = java.util.concurrent.atomic.LongAdder.new
Expand Down Expand Up @@ -157,12 +149,6 @@ def run
return 0
end # def run

def close
@filter_queue_client.close
queue.close
close_dlq_writer
end

def transition_to_running
@running.make_true
end
Expand Down Expand Up @@ -223,12 +209,12 @@ def start_workers
@logger.warn("CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})", default_logging_keys)
end

@filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
filter_queue_client.set_batch_dimensions(batch_size, batch_delay)

pipeline_workers.times do |t|
thread = Thread.new do
org.logstash.execution.WorkerLoop.new(
lir_execution, @filter_queue_client, @events_filtered, @events_consumed,
lir_execution, filter_queue_client, @events_filtered, @events_consumed,
@flushRequested, @flushing, @shutdownRequested, @drain_queue).run
end
thread.name="[#{pipeline_id}]>worker#{t}"
Expand Down
35 changes: 11 additions & 24 deletions logstash-core/lib/logstash/pipeline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ module LogStash; class Pipeline < BasePipeline
:events_consumed,
:events_filtered,
:started_at,
:thread,
:filter_queue_client
:thread

MAX_INFLIGHT_WARN_THRESHOLD = 10_000

Expand All @@ -92,14 +91,8 @@ def initialize(pipeline_config, namespaced_metric = nil, agent = nil)

@worker_threads = []

@filter_queue_client = queue.read_client
@signal_queue = java.util.concurrent.LinkedBlockingQueue.new
# Note that @inflight_batches as a central mechanism for tracking inflight
# batches will fail if we have multiple read clients here.
@filter_queue_client.set_events_metric(metric.namespace([:stats, :events]))
@filter_queue_client.set_pipeline_metric(
metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events])
)

@drain_queue = settings.get_value("queue.drain") || settings.get("queue.type") == "memory"


Expand Down Expand Up @@ -225,12 +218,6 @@ def run
return 0
end # def run

def close
@filter_queue_client.close
queue.close
close_dlq_writer
end

def transition_to_running
@running.make_true
end
Expand Down Expand Up @@ -320,12 +307,12 @@ def start_workers
# Main body of what a worker thread does
# Repeatedly takes batches off the queue, filters, then outputs them
def worker_loop(batch_size, batch_delay)
@filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
filter_queue_client.set_batch_dimensions(batch_size, batch_delay)
output_events_map = Hash.new { |h, k| h[k] = [] }
while true
signal = @signal_queue.poll || NO_SIGNAL

batch = @filter_queue_client.read_batch.to_java # metrics are started in read_batch
batch = filter_queue_client.read_batch.to_java # metrics are started in read_batch
batch_size = batch.filteredSize
if batch_size > 0
@events_consumed.add(batch_size)
Expand All @@ -334,27 +321,27 @@ def worker_loop(batch_size, batch_delay)
flush_filters_to_batch(batch, :final => false) if signal.flush?
if batch.filteredSize > 0
output_batch(batch, output_events_map)
@filter_queue_client.close_batch(batch)
filter_queue_client.close_batch(batch)
end
# keep break at end of loop, after the read_batch operation, some pipeline specs rely on this "final read_batch" before shutdown.
break if (@worker_shutdown.get && !draining_queue?)
end

# we are shutting down, queue is drained if it was required, now perform a final flush.
# for this we need to create a new empty batch to contain the final flushed events
batch = @filter_queue_client.to_java.newBatch
@filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
batch = filter_queue_client.to_java.newBatch
filter_queue_client.start_metrics(batch) # explicitly call start_metrics since we dont do a read_batch here
flush_filters_to_batch(batch, :final => true)
output_batch(batch, output_events_map)
@filter_queue_client.close_batch(batch)
filter_queue_client.close_batch(batch)
end

def filter_batch(batch)
filter_func(batch.to_a).each do |e|
#these are both original and generated events
batch.merge(e) unless e.cancelled?
end
@filter_queue_client.add_filtered_metrics(batch.filtered_size)
filter_queue_client.add_filtered_metrics(batch.filtered_size)
@events_filtered.add(batch.filteredSize)
rescue Exception => e
# Plugins authors should manage their own exceptions in the plugin code
Expand Down Expand Up @@ -386,7 +373,7 @@ def output_batch(batch, output_events_map)
events.clear
end

@filter_queue_client.add_output_metrics(batch.filtered_size)
filter_queue_client.add_output_metrics(batch.filtered_size)
end

def wait_inputs
Expand Down Expand Up @@ -615,6 +602,6 @@ def default_logging_keys(other_keys = {})
end

def draining_queue?
@drain_queue ? !@filter_queue_client.empty? : false
@drain_queue ? !filter_queue_client.empty? : false
end
end; end
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.execution.QueueReadClientBase;
import org.logstash.ext.JRubyAbstractQueueWriteClientExt;
import org.logstash.ext.JrubyAckedReadClientExt;
import org.logstash.ext.JrubyAckedWriteClientExt;
Expand Down Expand Up @@ -77,7 +78,7 @@ protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext co
}

@Override
protected IRubyObject getReadClient() {
protected QueueReadClientBase getReadClient() {
return JrubyAckedReadClientExt.create(queue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ public class AbstractPipelineExt extends RubyBasicObject {

private static final RubySymbol PATH = RubyUtil.RUBY.newSymbol("path");

private static final RubySymbol STATS_KEY = RubyUtil.RUBY.newSymbol("stats");

private static final RubySymbol TYPE_KEY = RubyUtil.RUBY.newSymbol("type");

private static final RubySymbol QUEUE_KEY = RubyUtil.RUBY.newSymbol("queue");

private static final RubySymbol DLQ_KEY = RubyUtil.RUBY.newSymbol("dlq");

private static final RubyArray EVENTS_METRIC_NAMESPACE = RubyArray.newArray(
RubyUtil.RUBY, new IRubyObject[]{MetricKeys.STATS_KEY, MetricKeys.EVENTS_KEY}
);

protected PipelineIR lir;

private final RubyString ephemeralId = RubyUtil.RUBY.newString(UUID.randomUUID().toString());
Expand All @@ -102,6 +104,8 @@ public class AbstractPipelineExt extends RubyBasicObject {

private JRubyAbstractQueueWriteClientExt inputQueueClient;

private QueueReadClientBase filterQueueClient;

public AbstractPipelineExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
}
Expand Down Expand Up @@ -129,6 +133,7 @@ public final AbstractPipelineExt initialize(final ThreadContext context,
throw new IllegalStateException(ex);
}
inputQueueClient = queue.writeClient(context);
filterQueueClient = queue.readClient();
final IRubyObject id = getSetting(context, "pipeline.id");
if (id.isNil()) {
pipelineId = id();
Expand All @@ -153,9 +158,27 @@ public final AbstractPipelineExt initialize(final ThreadContext context,
configString.asJavaString(),
getSetting(context, "config.support_escapes").isTrue()
);
filterQueueClient.setEventsMetric(metric.namespace(context, EVENTS_METRIC_NAMESPACE));
filterQueueClient.setPipelineMetric(
metric.namespace(
context,
RubyArray.newArray(
context.runtime,
new IRubyObject[]{
MetricKeys.STATS_KEY, MetricKeys.PIPELINES_KEY,
pipelineId.convertToString().intern19(), MetricKeys.EVENTS_KEY
}
)
)
);
return this;
}

@JRubyMethod(name = "filter_queue_client")
public final QueueReadClientBase filterQueueClient() {
return filterQueueClient;
}

@JRubyMethod(name = "config_str")
public final RubyString configStr() {
return configString;
Expand Down Expand Up @@ -262,7 +285,7 @@ public final IRubyObject collectStats(final ThreadContext context) throws IOExce
context,
RubyArray.newArray(
context.runtime,
Arrays.asList(STATS_KEY, MetricKeys.PIPELINES_KEY, pipelineId.asString().intern(), QUEUE_KEY)
Arrays.asList(MetricKeys.STATS_KEY, MetricKeys.PIPELINES_KEY, pipelineId.asString().intern(), QUEUE_KEY)
)
);
pipelineMetric.gauge(context, TYPE_KEY, getSetting(context, "queue.type"));
Expand Down Expand Up @@ -307,6 +330,14 @@ public final AbstractWrappedQueueExt queue() {
return queue;
}

@JRubyMethod
public final IRubyObject close(final ThreadContext context) throws IOException {
filterQueueClient.close();
queue.close(context);
closeDlqWriter(context);
return context.nil;
}

@JRubyMethod(name = "wrapped_write_client", visibility = Visibility.PROTECTED)
public final JRubyWrappedWriteClientExt wrappedWriteClient(final ThreadContext context,
final IRubyObject pluginId) {
Expand All @@ -324,7 +355,8 @@ private AbstractNamespacedMetricExt getDlqMetric(final ThreadContext context) {
context, RubyArray.newArray(
context.runtime,
Arrays.asList(
STATS_KEY, MetricKeys.PIPELINES_KEY, pipelineId.asString().intern(), DLQ_KEY
MetricKeys.STATS_KEY, MetricKeys.PIPELINES_KEY,
pipelineId.asString().intern(), DLQ_KEY
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public final JRubyAbstractQueueWriteClientExt writeClient(final ThreadContext co
}

@JRubyMethod(name = "read_client")
public final IRubyObject readClient() {
public final QueueReadClientBase readClient() {
return getReadClient();
}

Expand All @@ -35,5 +35,5 @@ public final IRubyObject close(final ThreadContext context) {

protected abstract JRubyAbstractQueueWriteClientExt getWriteClient(ThreadContext context);

protected abstract IRubyObject getReadClient();
protected abstract QueueReadClientBase getReadClient();
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.execution.AbstractWrappedQueueExt;
import org.logstash.execution.QueueReadClientBase;

@JRubyClass(name = "WrappedSynchronousQueue")
public final class JrubyWrappedSynchronousQueueExt extends AbstractWrappedQueueExt {
Expand All @@ -35,7 +36,7 @@ protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext co
}

@Override
protected IRubyObject getReadClient() {
protected QueueReadClientBase getReadClient() {
// batch size and timeout are currently hard-coded to 125 and 50ms as values observed
// to be reasonable tradeoffs between latency and throughput per PR #8707
return JrubyMemoryReadClientExt.create(queue, 125, 50);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,6 @@ private MetricKeys() {
RubyUtil.RUBY.newSymbol("duration_in_millis");

public static final RubySymbol FILTERED_KEY = RubyUtil.RUBY.newSymbol("filtered");

public static final RubySymbol STATS_KEY = RubyUtil.RUBY.newSymbol("stats");
}
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,6 @@ public ExecutionContextExt create(final ThreadContext context, final IRubyObject
@JRubyClass(name = "PluginMetricFactory")
public static final class Metrics extends RubyBasicObject {

private static final RubySymbol STATS = RubyUtil.RUBY.newSymbol("stats");

private static final RubySymbol PLUGINS = RubyUtil.RUBY.newSymbol("plugins");

private RubySymbol pipelineId;
Expand Down Expand Up @@ -308,7 +306,10 @@ public AbstractNamespacedMetricExt create(final ThreadContext context, final IRu
return metric.namespace(
context,
RubyArray.newArray(
context.runtime, Arrays.asList(STATS, MetricKeys.PIPELINES_KEY, pipelineId, PLUGINS)
context.runtime,
Arrays.asList(
MetricKeys.STATS_KEY, MetricKeys.PIPELINES_KEY, pipelineId, PLUGINS
)
)
).namespace(
context, RubyUtil.RUBY.newSymbol(String.format("%ss", pluginType.asJavaString()))
Expand Down

0 comments on commit cccd044

Please sign in to comment.