Skip to content

Commit

Permalink
Don't update segment metadata if archive doesn't move anything (apach…
Browse files Browse the repository at this point in the history
…e#3476)

* Don't update segment metadata if archive doesn't move anything

* Fix restore task to handle potential null values

* Don't try to update empty metadata

* Address review comments

* Move to druid-io java-util
  • Loading branch information
drcrallen authored and gianm committed Dec 1, 2016
1 parent 6873582 commit 27ab23e
Show file tree
Hide file tree
Showing 6 changed files with 284 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,31 @@

import io.druid.timeline.DataSegment;

import javax.annotation.Nullable;

public interface DataSegmentArchiver
{
public DataSegment archive(DataSegment segment) throws SegmentLoadingException;
public DataSegment restore(DataSegment segment) throws SegmentLoadingException;
/**
* Perform an archive task on the segment and return the resulting segment or null if there was no action needed.
*
* @param segment The source segment
*
* @return The segment after archiving or `null` if there was no archiving performed.
*
* @throws SegmentLoadingException on error
*/
@Nullable
DataSegment archive(DataSegment segment) throws SegmentLoadingException;

/**
* Perform the restore from an archived segment and return the resulting segment or null if there was no action
*
* @param segment The source (archived) segment
*
* @return The segment after it has been unarchived
*
* @throws SegmentLoadingException on error
*/
@Nullable
DataSegment restore(DataSegment segment) throws SegmentLoadingException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package io.druid.storage.s3;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.guice.annotations.Json;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.LoadSpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
Expand All @@ -31,15 +35,18 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg
{
private final S3DataSegmentArchiverConfig archiveConfig;
private final S3DataSegmentPusherConfig restoreConfig;
private final ObjectMapper mapper;

@Inject
public S3DataSegmentArchiver(
RestS3Service s3Client,
S3DataSegmentArchiverConfig archiveConfig,
S3DataSegmentPusherConfig restoreConfig
@Json ObjectMapper mapper,
RestS3Service s3Client,
S3DataSegmentArchiverConfig archiveConfig,
S3DataSegmentPusherConfig restoreConfig
)
{
super(s3Client, restoreConfig);
this.mapper = mapper;
this.archiveConfig = archiveConfig;
this.restoreConfig = restoreConfig;
}
Expand All @@ -50,13 +57,17 @@ public DataSegment archive(DataSegment segment) throws SegmentLoadingException
String targetS3Bucket = archiveConfig.getArchiveBucket();
String targetS3BaseKey = archiveConfig.getArchiveBaseKey();

return move(
final DataSegment archived = move(
segment,
ImmutableMap.<String, Object>of(
"bucket", targetS3Bucket,
"baseKey", targetS3BaseKey
)
);
if (sameLoadSpec(segment, archived)) {
return null;
}
return archived;
}

@Override
Expand All @@ -65,12 +76,27 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException
String targetS3Bucket = restoreConfig.getBucket();
String targetS3BaseKey = restoreConfig.getBaseKey();

return move(
final DataSegment restored = move(
segment,
ImmutableMap.<String, Object>of(
"bucket", targetS3Bucket,
"baseKey", targetS3BaseKey
)
);

if (sameLoadSpec(segment, restored)) {
return null;
}
return restored;
}

boolean sameLoadSpec(DataSegment s1, DataSegment s2)
{
final S3LoadSpec s1LoadSpec = (S3LoadSpec) mapper.convertValue(s1.getLoadSpec(), LoadSpec.class);
final S3LoadSpec s2LoadSpec = (S3LoadSpec) mapper.convertValue(s2.getLoadSpec(), LoadSpec.class);
return Objects.equal(s1LoadSpec.getBucket(), s2LoadSpec.getBucket()) && Objects.equal(
s1LoadSpec.getKey(),
s2LoadSpec.getKey()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
@JsonTypeName(S3StorageDruidModule.SCHEME)
public class S3LoadSpec implements LoadSpec
{
@JsonProperty(S3DataSegmentPuller.BUCKET)
private final String bucket;
@JsonProperty(S3DataSegmentPuller.KEY)
private final String key;

private final S3DataSegmentPuller puller;
Expand All @@ -61,4 +59,16 @@ public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException
{
return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size());
}

@JsonProperty(S3DataSegmentPuller.BUCKET)
public String getBucket()
{
return bucket;
}

@JsonProperty(S3DataSegmentPuller.KEY)
public String getKey()
{
return key;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.storage.s3;

import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Map;

public class S3DataSegmentArchiverTest
{
private static final ObjectMapper MAPPER = new DefaultObjectMapper()
.setInjectableValues(new InjectableValues()
{
@Override
public Object findInjectableValue(
Object valueId, DeserializationContext ctxt, BeanProperty forProperty, Object beanInstance
)
{
return PULLER;
}
}).registerModule(new SimpleModule("s3-archive-test-module").registerSubtypes(S3LoadSpec.class));
private static final S3DataSegmentArchiverConfig ARCHIVER_CONFIG = new S3DataSegmentArchiverConfig()
{
@Override
public String getArchiveBucket()
{
return "archive_bucket";
}

@Override
public String getArchiveBaseKey()
{
return "archive_base_key";
}
};
private static final S3DataSegmentPusherConfig PUSHER_CONFIG = new S3DataSegmentPusherConfig();
private static final RestS3Service S3_SERVICE = EasyMock.createStrictMock(RestS3Service.class);
private static final S3DataSegmentPuller PULLER = new S3DataSegmentPuller(S3_SERVICE);
private static final DataSegment SOURCE_SEGMENT = DataSegment
.builder()
.binaryVersion(1)
.dataSource("dataSource")
.dimensions(ImmutableList.<String>of())
.interval(Interval.parse("2015/2016"))
.version("version")
.loadSpec(ImmutableMap.<String, Object>of(
"type",
S3StorageDruidModule.SCHEME,
S3DataSegmentPuller.BUCKET,
"source_bucket",
S3DataSegmentPuller.KEY,
"source_key"
))
.build();

@BeforeClass
public static void setUpStatic()
{
PUSHER_CONFIG.setBaseKey("push_base");
PUSHER_CONFIG.setBucket("push_bucket");
}

@Test
public void testSimpleArchive() throws Exception
{
final DataSegment archivedSegment = SOURCE_SEGMENT
.withLoadSpec(ImmutableMap.<String, Object>of(
"type",
S3StorageDruidModule.SCHEME,
S3DataSegmentPuller.BUCKET,
ARCHIVER_CONFIG.getArchiveBucket(),
S3DataSegmentPuller.KEY,
ARCHIVER_CONFIG.getArchiveBaseKey() + "archived"
));
final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG)
{
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return archivedSegment;
}
};
Assert.assertEquals(archivedSegment, archiver.archive(SOURCE_SEGMENT));
}

@Test
public void testSimpleArchiveDoesntMove() throws Exception
{
final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG)
{
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return SOURCE_SEGMENT;
}
};
Assert.assertNull(archiver.archive(SOURCE_SEGMENT));
}

@Test
public void testSimpleRestore() throws Exception
{
final DataSegment archivedSegment = SOURCE_SEGMENT
.withLoadSpec(ImmutableMap.<String, Object>of(
"type",
S3StorageDruidModule.SCHEME,
S3DataSegmentPuller.BUCKET,
ARCHIVER_CONFIG.getArchiveBucket(),
S3DataSegmentPuller.KEY,
ARCHIVER_CONFIG.getArchiveBaseKey() + "archived"
));
final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG)
{
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return archivedSegment;
}
};
Assert.assertEquals(archivedSegment, archiver.restore(SOURCE_SEGMENT));
}

@Test
public void testSimpleRestoreDoesntMove() throws Exception
{
final S3DataSegmentArchiver archiver = new S3DataSegmentArchiver(MAPPER, S3_SERVICE, ARCHIVER_CONFIG, PUSHER_CONFIG)
{
@Override
public DataSegment move(DataSegment segment, Map<String, Object> targetLoadSpec) throws SegmentLoadingException
{
return SOURCE_SEGMENT;
}
};
Assert.assertNull(archiver.restore(SOURCE_SEGMENT));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,11 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
// Move segments
for (DataSegment segment : unusedSegments) {
final DataSegment archivedSegment = toolbox.getDataSegmentArchiver().archive(segment);
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(archivedSegment)));
if (archivedSegment != null) {
toolbox.getTaskActionClient().submit(new SegmentMetadataUpdateAction(ImmutableSet.of(archivedSegment)));
} else {
log.info("No action was taken for [%s]", segment);
}
}

return TaskStatus.success(getId());
Expand Down
Loading

0 comments on commit 27ab23e

Please sign in to comment.