Skip to content

Commit

Permalink
support atomic writes for local deep storage (apache#3521)
Browse files Browse the repository at this point in the history
* Use atomic writes for local deep storage

* fix pr issues

* use defaultObjMapper for test

* move tmp pushes to a intermediate dir

* minor refactor
  • Loading branch information
zhxiaogg authored and fjy committed Dec 13, 2016
1 parent 4be3eb0 commit 48b22e2
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.timeline.partition;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Range;
import io.druid.data.input.InputRow;
Expand Down Expand Up @@ -55,6 +56,7 @@ public boolean isInChunk(long timestamp, InputRow inputRow)
}

@Override
@JsonIgnore
public int getPartitionNum()
{
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;

public class NoneShardSpecTest
{
@Test
Expand All @@ -28,4 +30,13 @@ public void testSerde() throws Exception
Assert.assertTrue(serde1 == serde2);
Assert.assertTrue(one == serde1);
}

@Test
public void testPartitionFieldIgnored() throws IOException
{
final String jsonStr = "{\"type\": \"none\",\"partitionNum\": 2}";
ObjectMapper mapper = new TestObjectMapper();
final ShardSpec noneShardSpec = mapper.readValue(jsonStr, ShardSpec.class);
noneShardSpec.equals(NoneShardSpec.instance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,12 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.SegmentUtils;
import io.druid.timeline.DataSegment;
import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.util.UUID;

/**
*/
Expand Down Expand Up @@ -70,7 +73,9 @@ public String getPathForHadoop(String dataSource)
@Override
public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOException
{
File outDir = new File(config.getStorageDirectory(), DataSegmentPusherUtil.getStorageDir(segment));
final String storageDir = DataSegmentPusherUtil.getStorageDir(segment);
final File baseStorageDir = config.getStorageDirectory();
final File outDir = new File(baseStorageDir, storageDir);

log.info("Copying segment[%s] to local filesystem at location[%s]", segment.getIdentifier(), outDir.toString());

Expand All @@ -88,19 +93,49 @@ public DataSegment push(File dataSegmentFile, DataSegment segment) throws IOExce
);
}

if (!outDir.mkdirs() && !outDir.isDirectory()) {
throw new IOException(String.format("Cannot create directory[%s]", outDir));
final File tmpOutDir = new File(baseStorageDir, intermediateDirFor(storageDir));
log.info("Creating intermediate directory[%s] for segment[%s]", tmpOutDir.toString(), segment.getIdentifier());
final long size = compressSegment(dataSegmentFile, tmpOutDir);

final DataSegment dataSegment = createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(new File(outDir, "index.zip")))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
tmpOutDir
);

// moving the temporary directory to the final destination, once success the potentially concurrent push operations
// will be failed and will read the descriptor.json created by current push operation directly
createDirectoryIfNotExists(outDir.getParentFile());
try {
java.nio.file.Files.move(tmpOutDir.toPath(), outDir.toPath());
}
catch (FileAlreadyExistsException e) {
log.warn("Push destination directory[%s] exists, ignore this message if replication is configured.", outDir);
FileUtils.deleteDirectory(tmpOutDir);
return jsonMapper.readValue(new File(outDir, "descriptor.json"), DataSegment.class);
}
return dataSegment;
}

private void createDirectoryIfNotExists(File directory) throws IOException
{
if (!directory.mkdirs() && !directory.isDirectory()) {
throw new IOException(String.format("Cannot create directory[%s]", directory.toString()));
}
}

private String intermediateDirFor(String storageDir)
{
return "intermediate_pushes/" + storageDir + "." + UUID.randomUUID().toString();
}

private long compressSegment(File dataSegmentFile, File outDir) throws IOException
{
createDirectoryIfNotExists(outDir);
File outFile = new File(outDir, "index.zip");
log.info("Compressing files from[%s] to [%s]", dataSegmentFile, outFile);
long size = CompressionUtils.zip(dataSegmentFile, outFile);

return createDescriptorFile(
segment.withLoadSpec(makeLoadSpec(outFile))
.withSize(size)
.withBinaryVersion(SegmentUtils.getVersionFromDir(dataSegmentFile)),
outDir
);
return CompressionUtils.zip(dataSegmentFile, outFile);
}

private DataSegment createDescriptorFile(DataSegment segment, File outDir) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
Expand All @@ -48,23 +49,23 @@ public class LocalDataSegmentPusherTest
LocalDataSegmentPusherConfig config;
File dataSegmentFiles;
DataSegment dataSegment = new DataSegment(
"ds",
new Interval(0, 1),
"v1",
null,
null,
null,
NoneShardSpec.instance(),
null,
-1
"ds",
new Interval(0, 1),
"v1",
null,
null,
null,
NoneShardSpec.instance(),
null,
-1
);

@Before
public void setUp() throws IOException
{
config = new LocalDataSegmentPusherConfig();
config.storageDirectory = temporaryFolder.newFolder();
localDataSegmentPusher = new LocalDataSegmentPusher(config, new ObjectMapper());
localDataSegmentPusher = new LocalDataSegmentPusher(config, new DefaultObjectMapper());
dataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(dataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x9));
}
Expand Down Expand Up @@ -103,6 +104,17 @@ public void testPush() throws IOException
}
}

@Test
public void testFirstPushWinsForConcurrentPushes() throws IOException
{
File replicatedDataSegmentFiles = temporaryFolder.newFolder();
Files.asByteSink(new File(replicatedDataSegmentFiles, "version.bin")).write(Ints.toByteArray(0x8));
DataSegment returnSegment1 = localDataSegmentPusher.push(dataSegmentFiles, dataSegment);
DataSegment returnSegment2 = localDataSegmentPusher.push(replicatedDataSegmentFiles, dataSegment);

Assert.assertEquals(returnSegment1, returnSegment2);
}

@Test
public void testPushCannotCreateDirectory() throws IOException
{
Expand Down

0 comments on commit 48b22e2

Please sign in to comment.