Skip to content

Commit

Permalink
Recursively Delete Unreferenced Index Directories (elastic#42189)
Browse files Browse the repository at this point in the history
* Use ability to list child "folders" in the blob store to implement recursive delete on all stale index folders when cleaning up instead of using the diff between two `RepositoryData` instances to cover aborted deletes
* Runs after ever delete operation
* Relates  elastic#13159 (fixing most of this issues caused by unreferenced indices, leaving some meta files to be cleaned up only)
  • Loading branch information
original-brownbear authored Jun 21, 2019
1 parent 4fcb951 commit 0545a00
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Collection;

Expand Down Expand Up @@ -145,6 +149,9 @@ public void testSimpleWorkflow() {
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
final BlobStoreRepository repo =
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
}

public void testMissingUri() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.test.StreamsUtils;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.Map;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -76,6 +78,20 @@ protected void createRepository(String repoName) {
}

@Override
protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor genericExec) throws Exception {
// S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that
// listing operations will become consistent within these 10 minutes.
assertBusy(() -> assertTrue(super.assertCorruptionVisible(repo, genericExec)), 10L, TimeUnit.MINUTES);
return true;
}

@Override
protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
// S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that
// listing operations will become consistent within these 10 minutes.
assertBusy(() -> super.assertConsistentRepository(repo, executor), 10L, TimeUnit.MINUTES);
}

protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetaData> blobs) throws Exception {
// AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
// to become consistent.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -393,46 +392,68 @@ public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Action
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
}
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
final RepositoryData repositoryData;
final RepositoryData updatedRepositoryData;
final Map<String, BlobContainer> foundIndices;
try {
repositoryData = getRepositoryData();
final RepositoryData repositoryData = getRepositoryData();
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
foundIndices = blobStore().blobContainer(basePath().add("indices")).children();
writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (Exception ex) {
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
return;
}
final SnapshotInfo finalSnapshotInfo = snapshot;
final Collection<IndexId> unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values());
unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values());
try {
blobContainer().deleteBlobsIgnoringIfNotExists(
Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID())));
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e);
}
final var survivingIndices = updatedRepositoryData.getIndices();
deleteIndices(
Optional.ofNullable(finalSnapshotInfo)
.map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList()))
.map(info -> info.indices().stream().filter(survivingIndices::containsKey)
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList()))
.orElse(Collections.emptyList()),
snapshotId,
ActionListener.map(listener, v -> {
try {
blobStore().blobContainer(indicesPath()).deleteBlobsIgnoringIfNotExists(
unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList()));
} catch (IOException e) {
logger.warn(() ->
new ParameterizedMessage(
"[{}] indices {} are no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e);
}
cleanupStaleIndices(foundIndices, survivingIndices);
return null;
})
);
}
}

private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Map<String, IndexId> survivingIndices) {
try {
final Set<String> survivingIndexIds = survivingIndices.values().stream()
.map(IndexId::getId).collect(Collectors.toSet());
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
final String indexSnId = indexEntry.getKey();
try {
if (survivingIndexIds.contains(indexSnId) == false) {
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
}
}
} catch (Exception e) {
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
// bubbling up and breaking the snapshot functionality.
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
}
}

private void deleteIndices(List<IndexId> indices, SnapshotId snapshotId, ActionListener<Void> listener) {
if (indices.isEmpty()) {
listener.onResponse(null);
Expand Down Expand Up @@ -494,9 +515,9 @@ public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);
try {
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
final RepositoryData repositoryData = getRepositoryData();
writeIndexGen(repositoryData.addSnapshot(snapshotId, blobStoreSnapshot.state(), indices), repositoryStateId);
writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (FileAlreadyExistsException ex) {
// if another master was elected and took over finalizing the snapshot, it is possible
// that both nodes try to finalize the snapshot and write to the same blobs, so we just
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
Expand Down Expand Up @@ -65,6 +67,32 @@ public void assertConsistentHistoryInLuceneIndex() throws Exception {
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
}

private String skipRepoConsistencyCheckReason;

@After
public void assertRepoConsistency() {
if (skipRepoConsistencyCheckReason == null) {
client().admin().cluster().prepareGetRepositories().get().repositories()
.stream()
.map(RepositoryMetaData::name)
.forEach(name -> {
final List<SnapshotInfo> snapshots = client().admin().cluster().prepareGetSnapshots(name).get().getSnapshots(name);
// Delete one random snapshot to trigger repository cleanup.
if (snapshots.isEmpty() == false) {
client().admin().cluster().prepareDeleteSnapshot(name, randomFrom(snapshots).snapshotId().getName()).get();
}
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
});
} else {
logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason);
}
}

protected void disableRepoConsistencyCheck(String reason) {
assertNotNull(reason);
skipRepoConsistencyCheckReason = reason;
}

public static long getFailureCount(String repository) {
long failureCount = 0;
for (RepositoriesService repositoriesService :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ public boolean clearData(String nodeName) {
}

public void testRegistrationFailure() {
disableRepoConsistencyCheck("This test does not create any data in the repository");
logger.info("--> start first node");
internalCluster().startNode();
logger.info("--> start second node");
Expand All @@ -741,6 +742,7 @@ public void testRegistrationFailure() {
}

public void testThatSensitiveRepositorySettingsAreNotExposed() throws Exception {
disableRepoConsistencyCheck("This test does not create any data in the repository");
Settings nodeSettings = Settings.EMPTY;
logger.info("--> start two nodes");
internalCluster().startNodes(2, nodeSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ public void testWhenMetadataAreLoaded() throws Exception {
// Deleting a snapshot does not load the global metadata state but loads each index metadata
assertAcked(client().admin().cluster().prepareDeleteSnapshot("repository", "snap").get());
assertGlobalMetadataLoads("snap", 1);
assertIndexMetadataLoads("snap", "docs", 5);
assertIndexMetadataLoads("snap", "others", 4);
assertIndexMetadataLoads("snap", "docs", 4);
assertIndexMetadataLoads("snap", "others", 3);
}

private void assertGlobalMetadataLoads(final String snapshot, final int times) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ public void testRepositoryAckTimeout() throws Exception {
}

public void testRepositoryVerification() throws Exception {
disableRepoConsistencyCheck("This test does not create any data in the repository.");

Client client = client();

Settings settings = Settings.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ public void testSingleGetAfterRestore() throws Exception {
assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true));
}

public void testFreshIndexUUID() {
public void testFreshIndexUUID() throws InterruptedException {
Client client = client();

logger.info("--> creating repository");
Expand Down Expand Up @@ -540,7 +540,6 @@ public void testRestoreAliases() throws Exception {
logger.info("--> check that aliases are not restored and existing aliases still exist");
assertAliasesMissing(client.admin().indices().prepareAliasesExist("alias-123", "alias-1").get());
assertAliasesExist(client.admin().indices().prepareAliasesExist("alias-3").get());

}

public void testRestoreTemplates() throws Exception {
Expand Down Expand Up @@ -593,7 +592,6 @@ public void testRestoreTemplates() throws Exception {
logger.info("--> check that template is restored");
getIndexTemplatesResponse = client().admin().indices().prepareGetTemplates().get();
assertIndexTemplateExists(getIndexTemplatesResponse, "test-template");

}

public void testIncludeGlobalState() throws Exception {
Expand Down Expand Up @@ -780,10 +778,10 @@ public void testIncludeGlobalState() throws Exception {
assertFalse(client().admin().cluster().prepareGetPipeline("barbaz").get().isFound());
assertNull(client().admin().cluster().prepareGetStoredScript("foobar").get().getSource());
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));

}

public void testSnapshotFileFailureDuringSnapshot() {
public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException {
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
Client client = client();

logger.info("--> creating repository");
Expand Down Expand Up @@ -910,6 +908,8 @@ public void testDataFileFailureDuringSnapshot() throws Exception {
}

public void testDataFileFailureDuringRestore() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");

Path repositoryLocation = randomRepoPath();
Client client = client();
logger.info("--> creating repository");
Expand Down Expand Up @@ -973,6 +973,8 @@ public void testDataFileFailureDuringRestore() throws Exception {
}

public void testDataFileCorruptionDuringRestore() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");

Path repositoryLocation = randomRepoPath();
Client client = client();
logger.info("--> creating repository");
Expand Down Expand Up @@ -1237,7 +1239,6 @@ public void testDeletionOfFailingToRecoverIndexShouldStopRestore() throws Except
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
SearchResponse countResponse = client.prepareSearch("test-idx").setSize(0).get();
assertThat(countResponse.getHits().getTotalHits().value, equalTo(100L));

}

public void testUnallocatedShards() throws Exception {
Expand Down Expand Up @@ -1786,8 +1787,6 @@ public void testRenameOnRestore() throws Exception {
.setIndices("test-idx-1").setRenamePattern("test-idx").setRenameReplacement("alias")
.setWaitForCompletion(true).setIncludeAliases(false).execute().actionGet();
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));


}

public void testMoveShardWhileSnapshotting() throws Exception {
Expand Down Expand Up @@ -1854,6 +1853,7 @@ public void testMoveShardWhileSnapshotting() throws Exception {
}

public void testDeleteRepositoryWhileSnapshotting() throws Exception {
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
Client client = client();
Path repositoryLocation = randomRepoPath();
logger.info("--> creating repository");
Expand Down Expand Up @@ -2412,7 +2412,6 @@ public void testChangeSettingsOnRestore() throws Exception {

assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")).get(), numdocs);
assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")).get(), numdocs);

}

public void testRecreateBlocksOnRestore() throws Exception {
Expand Down Expand Up @@ -2506,6 +2505,8 @@ public void testRecreateBlocksOnRestore() throws Exception {
}

public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");

Client client = client();

boolean allowPartial = randomBoolean();
Expand Down Expand Up @@ -2827,6 +2828,8 @@ private boolean waitForRelocationsToStart(final String index, TimeValue timeout)
}

public void testSnapshotName() throws Exception {
disableRepoConsistencyCheck("This test does not create any data in the repository");

final Client client = client();

logger.info("--> creating repository");
Expand All @@ -2848,6 +2851,8 @@ public void testSnapshotName() throws Exception {
}

public void testListCorruptedSnapshot() throws Exception {
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");

Client client = client();
Path repo = randomRepoPath();
logger.info("--> creating repository at {}", repo.toAbsolutePath());
Expand Down Expand Up @@ -3418,6 +3423,9 @@ public void testSnapshotCanceledOnRemovedShard() throws Exception {
}

public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception {
// TODO: Fix repo cleanup logic to handle these leaked snap-file and only exclude test-repo (the mock repo) here.
disableRepoConsistencyCheck(
"This test uses a purposely broken repository implementation that results in leaking snap-{uuid}.dat files");
logger.info("--> creating repository");
final Path repoPath = randomRepoPath();
final Client client = client();
Expand Down
Loading

0 comments on commit 0545a00

Please sign in to comment.