Skip to content

Commit

Permalink
Provide DLQ writer interface to Java plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann committed May 22, 2019
1 parent b093c58 commit 08d2443
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package co.elastic.logstash.api;

import org.apache.logging.log4j.Logger;
import org.logstash.common.io.DeadLetterQueueWriter;

/**
* Provides Logstash context to plugins.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package co.elastic.logstash.api;

import java.io.IOException;

public interface DeadLetterQueueWriter {

void writeEntry(Event event, Plugin plugin, String reason) throws IOException;

boolean isOpen();

long getCurrentQueueSize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.logstash.common;

import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Plugin;

import java.io.IOException;
import java.util.Objects;

public class DLQWriterAdapter implements DeadLetterQueueWriter {

private final org.logstash.common.io.DeadLetterQueueWriter dlqWriter;

public DLQWriterAdapter(org.logstash.common.io.DeadLetterQueueWriter dlqWriter) {
this.dlqWriter = Objects.requireNonNull(dlqWriter);
}

@Override
public void writeEntry(Event event, Plugin plugin, String reason) throws IOException {
dlqWriter.writeEntry((org.logstash.Event) event, plugin.getName(), plugin.getId(), reason);
}

@Override
public boolean isOpen() {
return dlqWriter != null && dlqWriter.isOpen();
}

@Override
public long getCurrentQueueSize() {
return dlqWriter != null ? dlqWriter.getCurrentQueueSize() : 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.logstash.common;

import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Plugin;

import java.io.IOException;

public class NullDeadLetterQueueWriter implements DeadLetterQueueWriter {
private static final NullDeadLetterQueueWriter INSTANCE = new NullDeadLetterQueueWriter();

private NullDeadLetterQueueWriter() {
}

public static NullDeadLetterQueueWriter getInstance() {
return INSTANCE;
}

@Override
public void writeEntry(Event event, Plugin plugin, String reason) throws IOException {
// no-op
}

@Override
public boolean isOpen() {
return false;
}

@Override
public long getCurrentQueueSize() {
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.logstash.plugins;

import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.EventFactory;
import co.elastic.logstash.api.Metric;
Expand All @@ -9,7 +10,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.ConvertedMap;
import org.logstash.common.io.DeadLetterQueueWriter;

import java.io.Serializable;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import co.elastic.logstash.api.Codec;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.Input;
import co.elastic.logstash.api.Output;
Expand All @@ -21,7 +22,8 @@
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
import org.logstash.common.io.DeadLetterQueueWriter;
import org.logstash.common.DLQWriterAdapter;
import org.logstash.common.NullDeadLetterQueueWriter;
import org.logstash.config.ir.PipelineIR;
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
Expand Down Expand Up @@ -395,16 +397,16 @@ public ExecutionContextExt create(final ThreadContext context, final IRubyObject
}

public Context toContext(PluginLookup.PluginType pluginType, AbstractNamespacedMetricExt metric) {
DeadLetterQueueWriter dlq = null;
DeadLetterQueueWriter dlq = NullDeadLetterQueueWriter.getInstance();
if (pluginType == PluginLookup.PluginType.OUTPUT) {
if (dlqWriter instanceof AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) {
IRubyObject innerWriter =
((AbstractDeadLetterQueueWriterExt.PluginDeadLetterQueueWriterExt) dlqWriter)
.innerWriter(RubyUtil.RUBY.getCurrentContext());

if (innerWriter != null) {
if (innerWriter.getJavaClass().equals(DeadLetterQueueWriter.class)) {
dlq = innerWriter.toJava(DeadLetterQueueWriter.class);
if (org.logstash.common.io.DeadLetterQueueWriter.class.isAssignableFrom(innerWriter.getJavaClass())) {
dlq = new DLQWriterAdapter(innerWriter.toJava(org.logstash.common.io.DeadLetterQueueWriter.class));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package org.logstash.plugins;

import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.DeadLetterQueueWriter;
import co.elastic.logstash.api.EventFactory;
import co.elastic.logstash.api.NamespacedMetric;
import co.elastic.logstash.api.Plugin;
import org.apache.logging.log4j.Logger;
import org.logstash.common.io.DeadLetterQueueWriter;

public class TestContext implements Context {

Expand Down

0 comments on commit 08d2443

Please sign in to comment.