Skip to content

Commit

Permalink
MINOR: Cleanup dead code in PQ, fix unsafe resource handling in PQ pages
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Apr 6, 2018
1 parent d035f18 commit 5553f58
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,6 @@ def publisher(items, writer)
output_strings.concat files
end

begin
queue.queue.open
rescue Exception => e
output_strings << e.message
end

queue.close

if output_strings.any?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.logstash.ext.JrubyEventExtLibrary;

public final class AckedBatch {
private static final long serialVersionUID = -3118949118637372130L;
private Batch batch;

public static AckedBatch create(Batch batch) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public Batch(List<byte[]> elements, LongVector seqNums, Queue q) {
}

// close acks the batch ackable events
@Override
public void close() throws IOException {
if (closed.getAndSet(true) == false) {
this.queue.ack(this.seqNums);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.javasupport.JavaObject;
import org.jruby.runtime.Arity;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.Event;
Expand All @@ -18,7 +17,6 @@
import org.logstash.ackedqueue.Batch;
import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.SettingsImpl;
import org.logstash.ext.JrubyEventExtLibrary;

@JRubyClass(name = "AckedQueue")
public final class JRubyAckedQueueExt extends RubyObject {
Expand All @@ -41,19 +39,6 @@ public static JRubyAckedQueueExt create(String path, int capacity, int maxEvents
return queueExt;
}

@JRubyMethod(name = "initialize", optional = 7)
public IRubyObject ruby_initialize(ThreadContext context, IRubyObject[] args) {
args = Arity.scanArgs(context.runtime, args, 7, 0);
int capacity = RubyFixnum.num2int(args[1]);
int maxUnread = RubyFixnum.num2int(args[2]);
int checkpointMaxAcks = RubyFixnum.num2int(args[3]);
int checkpointMaxWrites = RubyFixnum.num2int(args[4]);
long queueMaxBytes = RubyFixnum.num2long(args[6]);
initializeQueue(args[0].asJavaString(), capacity, maxUnread, checkpointMaxWrites, checkpointMaxAcks, queueMaxBytes);

return context.nil;
}

private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites, int checkpointMaxAcks, long maxBytes) {
this.queue = new Queue(
SettingsImpl.fileSettingsBuilder(path)
Expand Down Expand Up @@ -107,33 +92,16 @@ public IRubyObject ruby_unread_count(ThreadContext context) {
return context.runtime.newFixnum(queue.getUnreadCount());
}

@JRubyMethod(name = "open")
public IRubyObject ruby_open(ThreadContext context) {
try {
open();
} catch (IOException e) {
throw RubyUtil.newRubyIOError(context.runtime, e);
}
return context.nil;
}

public void open() throws IOException {
queue.open();
}

@JRubyMethod(name = {"write", "<<"}, required = 1)
public IRubyObject ruby_write(ThreadContext context, IRubyObject event) {
if (!(event instanceof JrubyEventExtLibrary.RubyEvent)) {
throw context.runtime.newTypeError(
"wrong argument type " + event.getMetaClass() + " (expected LogStash::Event)");
}
long seqNum;
public void rubyWrite(ThreadContext context, Event event) {
try {
seqNum = this.queue.write(((JrubyEventExtLibrary.RubyEvent) event).getEvent());
this.queue.write(event);
} catch (IOException e) {
throw RubyUtil.newRubyIOError(context.runtime, e);
}
return context.runtime.newFixnum(seqNum);
}

@JRubyMethod(name = "read_batch", required = 2)
Expand All @@ -159,11 +127,6 @@ public IRubyObject ruby_is_fully_acked(ThreadContext context) {
return RubyBoolean.newBoolean(context.runtime, this.queue.isFullyAcked());
}

@JRubyMethod(name = "is_empty?")
public IRubyObject ruby_is_empty(ThreadContext context) {
return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
}

public boolean isEmpty() {
return queue.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyFixnum;
import org.jruby.RubyObject;
Expand All @@ -14,6 +15,7 @@
import org.logstash.RubyUtil;
import org.logstash.ext.JrubyAckedReadClientExt;
import org.logstash.ext.JrubyAckedWriteClientExt;
import org.logstash.ext.JrubyEventExtLibrary;

@JRubyClass(name = "WrappedAckedQueue")
public final class JRubyWrappedAckedQueueExt extends RubyObject {
Expand Down Expand Up @@ -62,9 +64,9 @@ public IRubyObject rubyClose(ThreadContext context) {
}

@JRubyMethod(name = {"push", "<<"})
public void rubyPush(ThreadContext context, IRubyObject object) {
public void rubyPush(ThreadContext context, IRubyObject event) {
checkIfClosed("write");
queue.ruby_write(context, object);
queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent());
}

@JRubyMethod(name = "read_batch")
Expand All @@ -86,7 +88,7 @@ public IRubyObject rubyReadClient(final ThreadContext context) {

@JRubyMethod(name = "is_empty?")
public IRubyObject rubyIsEmpty(ThreadContext context) {
return queue.ruby_is_empty(context);
return RubyBoolean.newBoolean(context.runtime, this.queue.isEmpty());
}

private void checkIfClosed(String action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ public final class MmapPageIO implements PageIO {
public static final int SEQNUM_SIZE = Long.BYTES;
public static final int MIN_CAPACITY = VERSION_SIZE + SEQNUM_SIZE + LENGTH_SIZE + 1 + CHECKSUM_SIZE; // header overhead plus elements overhead to hold a single 1 byte element
public static final int HEADER_SIZE = 1; // version byte
// Size of: Header + Sequence Number + Length + Checksum
public static final int WRAPPER_SIZE = HEADER_SIZE + SEQNUM_SIZE + LENGTH_SIZE + CHECKSUM_SIZE;
public static final boolean VERIFY_CHECKSUM = true;

private static final Logger LOGGER = LogManager.getLogger(MmapPageIO.class);
Expand All @@ -42,8 +40,6 @@ public final class MmapPageIO implements PageIO {

private final IntVector offsetMap;

private FileChannel channel;

private int capacity; // page capacity is an int per the ByteBuffer class.
private long minSeqNum; // TODO: to make minSeqNum final we have to pass in the minSeqNum in the constructor and not set it on first write
private int elementCount;
Expand Down Expand Up @@ -169,10 +165,9 @@ public void recover() throws IOException {

@Override
public void create() throws IOException {
RandomAccessFile raf = new RandomAccessFile(this.file, "rw");
this.channel = raf.getChannel();
this.buffer = this.channel.map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
raf.close();
try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) {
this.buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
}
buffer.position(0);
buffer.put(VERSION_ONE);
this.head = 1;
Expand All @@ -181,17 +176,16 @@ public void create() throws IOException {
}

@Override
public void deactivate() throws IOException {
public void deactivate() {
close(); // close can be called multiple times
}

@Override
public void activate() throws IOException {
if (this.channel == null) {
RandomAccessFile raf = new RandomAccessFile(this.file, "rw");
this.channel = raf.getChannel();
this.buffer = this.channel.map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
raf.close();
if (this.buffer == null) {
try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) {
this.buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
}
this.buffer.load();
}
// TODO: do we need to check is the channel is still open? not sure how it could be closed
Expand All @@ -215,19 +209,12 @@ public void write(byte[] bytes, long seqNum) {
}

@Override
public void close() throws IOException {
public void close() {
if (this.buffer != null) {
this.buffer.force();
BUFFER_CLEANER.clean(buffer);

}
if (this.channel != null) {
if (this.channel.isOpen()) {
this.channel.force(false);
}
this.channel.close(); // close can be called multiple times
}
this.channel = null;
this.buffer = null;
}

Expand Down Expand Up @@ -274,23 +261,21 @@ private long maxSeqNum() {

// memory map data file to this.buffer and read initial version byte
private void mapFile() throws IOException {
RandomAccessFile raf = new RandomAccessFile(this.file, "rw");
try (RandomAccessFile raf = new RandomAccessFile(this.file, "rw")) {

if (raf.length() > Integer.MAX_VALUE) {
throw new IOException("Page file too large " + this.file);
}
int pageFileCapacity = (int) raf.length();
if (raf.length() > Integer.MAX_VALUE) {
throw new IOException("Page file too large " + this.file);
}
int pageFileCapacity = (int) raf.length();

// update capacity to actual raf length. this can happen if a page size was changed on a non empty queue directory for example.
this.capacity = pageFileCapacity;
// update capacity to actual raf length. this can happen if a page size was changed on a non empty queue directory for example.
this.capacity = pageFileCapacity;

if (this.capacity < MIN_CAPACITY) {
throw new IOException(String.format("Page file size is too small to hold elements"));
if (this.capacity < MIN_CAPACITY) {
throw new IOException(String.format("Page file size is too small to hold elements"));
}
this.buffer = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
}

this.channel = raf.getChannel();
this.buffer = this.channel.map(FileChannel.MapMode.READ_WRITE, 0, this.capacity);
raf.close();
this.buffer.load();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,4 @@ public void addOutputMetrics(int filteredSize) {
}

public abstract void close() throws IOException;
public abstract boolean isEmpty();

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,15 @@ private JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass,
@JRubyMethod(name = {"push", "<<"}, required = 1)
public IRubyObject rubyPush(final ThreadContext context, IRubyObject event) {
ensureOpen();
queue.ruby_write(context, event);
queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent());
return this;
}

@JRubyMethod(name = "push_batch", required = 1)
public IRubyObject rubyPushBatch(final ThreadContext context, IRubyObject batch) {
ensureOpen();
for (final IRubyObject event : (Collection<JrubyEventExtLibrary.RubyEvent>) batch) {
queue.ruby_write(context, event);
queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent());
}
return this;
}
Expand Down

0 comments on commit 5553f58

Please sign in to comment.