Skip to content

Commit

Permalink
elastic#8218: Ensure atomic creation of PQ checkpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Mar 29, 2018
1 parent 5e49b5d commit 908ac09
Showing 1 changed file with 8 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
package org.logstash.ackedqueue.io;

import java.nio.channels.FileChannel;
import org.logstash.ackedqueue.Checkpoint;
import org.logstash.common.io.BufferedChecksumStreamInput;
import org.logstash.common.io.InputStreamStreamInput;

import java.io.ByteArrayInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.zip.CRC32;
import org.logstash.ackedqueue.Checkpoint;
import org.logstash.common.io.BufferedChecksumStreamInput;
import org.logstash.common.io.InputStreamStreamInput;

public class FileCheckpointIO implements CheckpointIO {
// Checkpoint file structure
Expand Down Expand Up @@ -70,10 +70,12 @@ public Checkpoint write(String fileName, int pageNum, int firstUnackedPageNum, l
public void write(String fileName, Checkpoint checkpoint) throws IOException {
write(checkpoint, buffer);
buffer.flip();
try (FileOutputStream out = new FileOutputStream(Paths.get(dirPath, fileName).toFile())) {
final Path tmpPath = Paths.get(dirPath, fileName + ".tmp");
try (FileOutputStream out = new FileOutputStream(tmpPath.toFile())) {
out.getChannel().write(buffer);
out.getFD().sync();
}
Files.move(tmpPath, Paths.get(dirPath, fileName), StandardCopyOption.ATOMIC_MOVE);
}

@Override
Expand Down

0 comments on commit 908ac09

Please sign in to comment.