Skip to content

Commit

Permalink
Add support for Dead Letter Queue RecordIO library
Browse files Browse the repository at this point in the history
  • Loading branch information
talevy committed Mar 29, 2017
1 parent 910a60d commit 5e08535
Show file tree
Hide file tree
Showing 12 changed files with 1,216 additions and 1 deletion.
127 changes: 127 additions & 0 deletions logstash-core/src/main/java/org/logstash/DLQEntry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.logstash;

import org.logstash.ackedqueue.Queueable;

import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;


public class DLQEntry implements Cloneable, Serializable, Queueable {

private final Event event;
private final String pluginType;
private final String pluginId;
private final String reason;
private final Timestamp entryTime;

public DLQEntry(Event event, String pluginType, String pluginId, String reason) {
this(event, pluginType, pluginId, reason, Timestamp.now());
}

public DLQEntry(Event event, String pluginType, String pluginId, String reason, Timestamp entryTime) {
this.event = event;
this.pluginType = pluginType;
this.pluginId = pluginId;
this.reason = reason;
this.entryTime = entryTime;
}

@Override
public byte[] serialize() throws IOException {
byte[] entryTimeInBytes = entryTime.serialize();
byte[] eventInBytes = this.event.serialize();
byte[] pluginTypeBytes = pluginType.getBytes();
byte[] pluginIdBytes = pluginId.getBytes();
byte[] reasonBytes = reason.getBytes();
ByteBuffer buffer = ByteBuffer.allocate(entryTimeInBytes.length
+ eventInBytes.length
+ pluginTypeBytes.length
+ pluginIdBytes.length
+ reasonBytes.length
+ (Integer.BYTES * 5));
buffer.putInt(entryTimeInBytes.length);
buffer.put(entryTimeInBytes);
buffer.putInt(eventInBytes.length);
buffer.put(eventInBytes);
buffer.putInt(pluginTypeBytes.length);
buffer.put(pluginTypeBytes);
buffer.putInt(pluginIdBytes.length);
buffer.put(pluginIdBytes);
buffer.putInt(reasonBytes.length);
buffer.put(reasonBytes);
return buffer.array();
}

public static DLQEntry deserialize(byte[] bytes) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
buffer.put(bytes);
buffer.position(0);

int entryTimeLength = buffer.getInt();
byte[] entryTimeBytes = new byte[entryTimeLength];
buffer.get(entryTimeBytes);
Timestamp entryTime = new Timestamp(new String(entryTimeBytes));

int eventLength = buffer.getInt();
byte[] eventBytes = new byte[eventLength];
buffer.get(eventBytes);
Event event = Event.deserialize(eventBytes);

int pluginTypeLength = buffer.getInt();
byte[] pluginTypeBytes = new byte[pluginTypeLength];
buffer.get(pluginTypeBytes);
String pluginType = new String(pluginTypeBytes);

int pluginIdLength = buffer.getInt();
byte[] pluginIdBytes = new byte[pluginIdLength];
buffer.get(pluginIdBytes);
String pluginId = new String(pluginIdBytes);

int reasonLength = buffer.getInt();
byte[] reasonBytes = new byte[reasonLength];
buffer.get(reasonBytes);
String reason = new String(reasonBytes);

return new DLQEntry(event, pluginType, pluginId, reason, entryTime);
}

public Event getEvent() {
return event;
}

public String getPluginType() {
return pluginType;
}

public String getPluginId() {
return pluginId;
}

public String getReason() {
return reason;
}

public Timestamp getEntryTime() {
return entryTime;
}
}
9 changes: 8 additions & 1 deletion logstash-core/src/main/java/org/logstash/Timestamp.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import org.joda.time.LocalDateTime;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.logstash.ackedqueue.Queueable;

import java.io.IOException;
import java.util.Date;

@JsonSerialize(using = org.logstash.json.TimestampSerializer.class)
public class Timestamp implements Cloneable {
public class Timestamp implements Cloneable, Queueable {

// all methods setting the time object must set it in the UTC timezone
private DateTime time;
Expand Down Expand Up @@ -81,4 +83,9 @@ public Timestamp clone() throws CloneNotSupportedException {
clone.setTime(this.getTime());
return clone;
}

@Override
public byte[] serialize() throws IOException {
return toString().getBytes();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.common.io;

import org.logstash.DLQEntry;

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static org.logstash.common.io.DeadLetterQueueWriteManager.getSegmentPaths;

public class DeadLetterQueueReadManager {

private RecordIOReader currentReader;
private final Path queuePath;
private final ConcurrentSkipListSet<Path> segments;
private final WatchService watchService;

public DeadLetterQueueReadManager(Path queuePath) throws Exception {
this.queuePath = queuePath;
this.watchService = FileSystems.getDefault().newWatchService();
this.queuePath.register(watchService, ENTRY_CREATE, ENTRY_DELETE);
this.segments = new ConcurrentSkipListSet<>((p1, p2) -> {
Function<Path, Integer> id = (p) -> Integer.parseInt(p.getFileName().toString().split("\\.")[0]);
return id.apply(p1).compareTo(id.apply(p2));
});

segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList()));
}

private long pollNewSegments(long timeout) throws IOException, InterruptedException {
long startTime = System.currentTimeMillis();
WatchKey key = watchService.poll(timeout, TimeUnit.MILLISECONDS);
if (key != null) {
for (WatchEvent<?> watchEvent : key.pollEvents()) {
if (watchEvent.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
segments.addAll(getSegmentPaths(queuePath).collect(Collectors.toList()));
}
key.reset();
}
}
return System.currentTimeMillis() - startTime;
}

public DLQEntry pollEntry(long timeout) throws IOException, InterruptedException {
return DLQEntry.deserialize(pollEntryBytes(timeout));
}

byte[] pollEntryBytes() throws IOException, InterruptedException {
return pollEntryBytes(100);
}

byte[] pollEntryBytes(long timeout) throws IOException, InterruptedException {
long timeoutRemaining = timeout;
if (currentReader == null) {
timeoutRemaining -= pollNewSegments(timeout);
currentReader = new RecordIOReader(segments.first());
}

byte[] event = currentReader.readEvent();
if (event == null && currentReader.isEndOfStream()) {
if (currentReader.getPath().equals(segments.last())) {
pollNewSegments(timeoutRemaining);
} else {
currentReader.close();
currentReader = new RecordIOReader(segments.higher(currentReader.getPath()));
return pollEntryBytes(timeoutRemaining);
}
}

return event;
}

public void close() throws IOException {
if (currentReader != null) {
currentReader.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.common.io;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.logstash.DLQEntry;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.stream.Stream;

import static org.logstash.common.io.RecordIOWriter.RECORD_HEADER_SIZE;

public class DeadLetterQueueWriteManager {

private static final Logger logger = LogManager.getLogger(DeadLetterQueueWriteManager.class);

static final String SEGMENT_FILE_PATTERN = "%020d.log";
static final String LOCK_FILE = ".lock";
private final long maxSegmentSize;
private final long maxQueueSize;
private final Path queuePath;
private final FileLock lock;
private RecordIOWriter currentWriter;
private long currentQueueSize;
private int currentSegmentIndex;

/**
*
* @param queuePath
* @param maxSegmentSize
* @throws IOException
*/
public DeadLetterQueueWriteManager(Path queuePath, long maxSegmentSize, long maxQueueSize) throws IOException {
// check that only one instance of the writer is open in this configured path
Path lockFilePath = queuePath.resolve(LOCK_FILE);
boolean isNewlyCreated = lockFilePath.toFile().createNewFile();
FileChannel channel = FileChannel.open(lockFilePath, StandardOpenOption.WRITE);
try {
this.lock = channel.lock();
} catch (OverlappingFileLockException e) {
if (isNewlyCreated) {
logger.warn("Previous Dead Letter Queue Writer was not closed safely.");
}
throw new RuntimeException("uh oh, someone else is writing to this dead-letter queue");
}

this.queuePath = queuePath;
this.maxSegmentSize = maxSegmentSize;
this.maxQueueSize = maxQueueSize;
this.currentQueueSize = getStartupQueueSize();

currentSegmentIndex = getSegmentPaths(queuePath)
.map(s -> s.getFileName().toString().split("\\.")[0])
.mapToInt(Integer::parseInt)
.max().orElse(0);
this.currentWriter = nextWriter();
}

private long getStartupQueueSize() throws IOException {
return getSegmentPaths(queuePath)
.mapToLong((p) -> {
try {
return Files.size(p);
} catch (IOException e) {
return 0L;
}
} )
.sum();
}

private RecordIOWriter nextWriter() throws IOException {
return new RecordIOWriter(queuePath.resolve(String.format(SEGMENT_FILE_PATTERN, ++currentSegmentIndex)));
}

static Stream<Path> getSegmentPaths(Path path) throws IOException {
return Files.list(path).filter((p) -> p.toString().endsWith(".log"));
}

public void writeEntry(DLQEntry event) throws IOException {
byte[] record = event.serialize();
int eventPayloadSize = RECORD_HEADER_SIZE + record.length;
if (currentQueueSize + eventPayloadSize > maxQueueSize) {
logger.error("cannot write event to DLQ, no space available");
return;
} else if (currentWriter.getPosition() + eventPayloadSize > maxSegmentSize) {
currentWriter.close();
currentWriter = nextWriter();
}
currentQueueSize += currentWriter.writeEvent(record);
}

public void close() throws IOException {
this.lock.release();
if (currentWriter != null) {
currentWriter.close();
}
Files.deleteIfExists(queuePath.resolve(LOCK_FILE));
}
}
Loading

0 comments on commit 5e08535

Please sign in to comment.