Skip to content
This repository has been archived by the owner on Nov 4, 2020. It is now read-only.

Commit

Permalink
namespace memory checkpoints for multiple queues support
Browse files Browse the repository at this point in the history
memory checkpoint IO need the purge() method
  • Loading branch information
colinsurprenant committed Feb 22, 2017
1 parent e2ffcf9 commit 5863ab3
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.logstash.common.io;

import org.logstash.ackedqueue.Checkpoint;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -71,6 +72,7 @@ public void purge(String fileName) throws IOException {
@Override
public void purge() throws IOException {
// TODO: dir traversal and delete all checkpoints?
throw new NotImplementedException();
}

// @return the head page checkpoint file name
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package org.logstash.common.io;

import org.logstash.ackedqueue.Checkpoint;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -12,7 +15,7 @@ public class MemoryCheckpointIO implements CheckpointIO {
private final String HEAD_CHECKPOINT = "checkpoint.head";
private final String TAIL_CHECKPOINT = "checkpoint.";

private static final Map<String, Checkpoint> sources = new HashMap<>();
private static final Map<String, Map<String, Checkpoint>> sources = new HashMap<>();

private final String dirPath;

Expand All @@ -26,8 +29,13 @@ public MemoryCheckpointIO(String dirPath) {

@Override
public Checkpoint read(String fileName) throws IOException {
Checkpoint cp = this.sources.get(fileName);
if (cp == null) { throw new NoSuchFileException("no memory checkpoint for " + fileName); }

Checkpoint cp = null;
Map<String, Checkpoint> ns = this.sources.get(dirPath);
if (ns != null) {
cp = ns.get(fileName);
}
if (cp == null) { throw new NoSuchFileException("no memory checkpoint for dirPath: " + this.dirPath + ", fileName: " + fileName); }
return cp;
}

Expand All @@ -40,17 +48,25 @@ public Checkpoint write(String fileName, int pageNum, int firstUnackedPageNum, l

@Override
public void write(String fileName, Checkpoint checkpoint) throws IOException {
this.sources.put(fileName, checkpoint);
Map<String, Checkpoint> ns = this.sources.get(dirPath);
if (ns == null) {
ns = new HashMap<>();
this.sources.put(this.dirPath, ns);
}
ns.put(fileName, checkpoint);
}

@Override
public void purge(String fileName) {
this.sources.remove(fileName);
Map<String, Checkpoint> ns = this.sources.get(dirPath);
if (ns != null) {
ns.remove(fileName);
}
}

@Override
public void purge() {
this.sources.clear();
this.sources.remove(this.dirPath);
}

// @return the head page checkpoint file name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,46 @@ public void writeNewReadExisting() throws IOException {
public void readInnexisting() throws IOException {
io.read("checkpoint.invalid");
}

@Test
public void readWriteDirPathNamespaced() throws IOException {
CheckpointIO io1 = new MemoryCheckpointIO("path1");
CheckpointIO io2 = new MemoryCheckpointIO("path2");
io1.write("checkpoint.head", 1, 0, 0, 0, 0);
io2.write("checkpoint.head", 2, 0, 0, 0, 0);

Checkpoint checkpoint;

checkpoint = io1.read("checkpoint.head");
assertThat(checkpoint.getPageNum(), is(equalTo(1)));

checkpoint = io2.read("checkpoint.head");
assertThat(checkpoint.getPageNum(), is(equalTo(2)));
}

@Test(expected = NoSuchFileException.class)
public void purgeDirPathNamespaced1() throws IOException {
CheckpointIO io1 = new MemoryCheckpointIO("path1");
CheckpointIO io2 = new MemoryCheckpointIO("path2");
io1.write("checkpoint.head", 1, 0, 0, 0, 0);
io2.write("checkpoint.head", 2, 0, 0, 0, 0);

io1.purge("checkpoint.head");

Checkpoint checkpoint = io1.read("checkpoint.head");
}

@Test
public void purgeDirPathNamespaced2() throws IOException {
CheckpointIO io1 = new MemoryCheckpointIO("path1");
CheckpointIO io2 = new MemoryCheckpointIO("path2");
io1.write("checkpoint.head", 1, 0, 0, 0, 0);
io2.write("checkpoint.head", 2, 0, 0, 0, 0);

io1.purge("checkpoint.head");

Checkpoint checkpoint;
checkpoint = io2.read("checkpoint.head");
assertThat(checkpoint.getPageNum(), is(equalTo(2)));
}
}

0 comments on commit 5863ab3

Please sign in to comment.