Skip to content

Commit

Permalink
IGNITE-19144 Check flag for snapshot restore implemented (apache#10612)
Browse files Browse the repository at this point in the history
  • Loading branch information
nizhikov authored Mar 30, 2023
1 parent c5725f6 commit dd777fb
Show file tree
Hide file tree
Showing 19 changed files with 251 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import static org.apache.ignite.internal.commandline.CommandList.SNAPSHOT;
import static org.apache.ignite.internal.commandline.CommandLogger.optional;
import static org.apache.ignite.internal.commandline.snapshot.SnapshotRestoreCommandOption.CHECK;
import static org.apache.ignite.internal.commandline.snapshot.SnapshotRestoreCommandOption.GROUPS;
import static org.apache.ignite.internal.commandline.snapshot.SnapshotRestoreCommandOption.INCREMENT;
import static org.apache.ignite.internal.commandline.snapshot.SnapshotRestoreCommandOption.SOURCE;
Expand Down Expand Up @@ -90,6 +91,7 @@ private void explainDeprecatedOptions(Object cmdArg, IgniteLogger log) {
Integer incIdx = null;
Set<String> grpNames = null;
boolean sync = false;
boolean check = false;

if (restoreAction != null)
argIter.nextArg(null);
Expand Down Expand Up @@ -147,9 +149,15 @@ else if (option == SYNC) {

sync = true;
}
else if (option == CHECK) {
if (check)
throw new IllegalArgumentException(CHECK.argName() + " arg specified twice.");

check = true;
}
}

cmdArg = new VisorSnapshotRestoreTaskArg(snpName, snpPath, incIdx, sync, restoreAction, grpNames);
cmdArg = new VisorSnapshotRestoreTaskArg(snpName, snpPath, incIdx, sync, restoreAction, grpNames, check);
}

/** {@inheritDoc} */
Expand All @@ -161,12 +169,14 @@ else if (option == SYNC) {
startParams.put(SOURCE.argName() + " " + SOURCE.arg(), SOURCE.description());
startParams.put(INCREMENT.argName() + " " + INCREMENT.arg(), INCREMENT.description());
startParams.put(SYNC.argName(), SYNC.description());
startParams.put(CHECK.argName(), CHECK.description());

usage(log, "Restore snapshot:", SNAPSHOT, startParams, RESTORE.toString(), SNAPSHOT_NAME_ARG,
optional(INCREMENT.argName(), INCREMENT.arg()),
optional(GROUPS.argName(), GROUPS.arg()),
optional(SOURCE.argName(), SOURCE.arg()),
optional(SYNC.argName()));
optional(SYNC.argName()),
optional(CHECK.argName()));
usage(log, "Snapshot restore operation status (Command deprecated. Use '" + SNAPSHOT + ' '
+ SnapshotSubcommands.STATUS + "' instead):", SNAPSHOT, params, RESTORE.toString(), SNAPSHOT_NAME_ARG, "--status");
usage(log, "Cancel snapshot restore operation (Command deprecated. Use '" + SNAPSHOT + ' '
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ public enum SnapshotRestoreCommandOption implements CommandArg {

/** Synchronous execution flag. */
SYNC(SnapshotCreateCommandOption.SYNC.argName(), SnapshotCreateCommandOption.SYNC.arg(),
SnapshotCreateCommandOption.SYNC.description());
SnapshotCreateCommandOption.SYNC.description()),

/** Check snapshot before restore. */
CHECK("--check", null, "Check snapshot data integrity before restore (slow!). Similar to the \"check\" command.");

/** Name. */
private final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3301,7 +3301,11 @@ public void testSnapshotRestoreSynchronously() throws Exception {
assertContains(log, testOut.toString(), "Invalid argument: --sync.");

assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute(h, "--snapshot", "restore", snpName, "blah"));
assertContains(log, testOut.toString(), "Invalid argument: blah. Possible options: --groups, --src, --increment, --sync.");
assertContains(
log,
testOut.toString(),
"Invalid argument: blah. Possible options: --groups, --src, --increment, --sync, --check."
);

assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute(h, "--snapshot", "restore", snpName, "--status", "--sync"));
assertContains(log, testOut.toString(), "Invalid argument: --sync. Action \"--status\" does not support specified option.");
Expand All @@ -3310,7 +3314,11 @@ public void testSnapshotRestoreSynchronously() throws Exception {
assertContains(log, testOut.toString(), "Invalid argument: --start.");

assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute(h, "--snapshot", "restore", snpName, "--start", "blah"));
assertContains(log, testOut.toString(), "Invalid argument: blah. Possible options: --groups, --src, --increment, --sync.");
assertContains(
log,
testOut.toString(),
"Invalid argument: blah. Possible options: --groups, --src, --increment, --sync, --check."
);

autoConfirmation = true;

Expand Down Expand Up @@ -3351,7 +3359,7 @@ public void testSnapshotRestore() throws Exception {
ig.cluster().state(ACTIVE);

injectTestSystemOut();
injectTestSystemIn(CONFIRM_MSG);
injectTestSystemIn(CONFIRM_MSG, CONFIRM_MSG);

createCacheAndPreload(ig, cacheName1, keysCnt, 32, null);
createCacheAndPreload(ig, cacheName2, keysCnt, 32, null);
Expand All @@ -3377,7 +3385,7 @@ public void testSnapshotRestore() throws Exception {

assertEquals(EXIT_CODE_INVALID_ARGUMENTS, execute(h, "--snapshot", "restore", snpName, cacheName1));
assertContains(log, testOut.toString(),
"Invalid argument: " + cacheName1 + ". Possible options: --groups, --src, --increment, --sync.");
"Invalid argument: " + cacheName1 + ". Possible options: --groups, --src, --increment, --sync, --check.");

// Restore single cache group.
assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--groups", cacheName1));
Expand Down Expand Up @@ -3432,32 +3440,44 @@ public void testSnapshotRestore() throws Exception {

awaitPartitionMapExchange();

assertNull(ig.cache(cacheName1));
assertNull(ig.cache(cacheName2));
assertNull(ig.cache(cacheName3));
for (boolean check: new boolean[] {false, true}) {
assertNull(ig.cache(cacheName1));
assertNull(ig.cache(cacheName2));
assertNull(ig.cache(cacheName3));

// Restore all public cache groups.
assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName));
String out = testOut.toString();
assertContains(log, out, "Warning: command will restore ALL USER-CREATED CACHE GROUPS from the snapshot");
assertContains(log, out, "Snapshot cache group restore operation started [name=" + snpName);
// Restore all public cache groups.
if (check)
assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName, "--check"));
else
assertEquals(EXIT_CODE_OK, execute(h, "--snapshot", "restore", snpName));

waitForCondition(() -> ig.cache(cacheName1) != null, getTestTimeout());
waitForCondition(() -> ig.cache(cacheName2) != null, getTestTimeout());
waitForCondition(() -> ig.cache(cacheName3) != null, getTestTimeout());
String out = testOut.toString();
assertContains(log, out, "Warning: command will restore ALL USER-CREATED CACHE GROUPS from the snapshot");
assertContains(log, out, "Snapshot cache group restore operation started [name=" + snpName);

cache1 = ig.cache(cacheName1);
cache2 = ig.cache(cacheName2);
cache3 = ig.cache(cacheName3);
waitForCondition(() -> ig.cache(cacheName1) != null, getTestTimeout());
waitForCondition(() -> ig.cache(cacheName2) != null, getTestTimeout());
waitForCondition(() -> ig.cache(cacheName3) != null, getTestTimeout());

assertNotNull(cache1);
assertNotNull(cache2);
assertNotNull(cache3);
cache1 = ig.cache(cacheName1);
cache2 = ig.cache(cacheName2);
cache3 = ig.cache(cacheName3);

for (int i = 0; i < keysCnt; i++) {
assertEquals(cacheName1, Integer.valueOf(i), cache1.get(i));
assertEquals(cacheName2, Integer.valueOf(i), cache2.get(i));
assertEquals(cacheName3, Integer.valueOf(i), cache2.get(i));
assertNotNull(cache1);
assertNotNull(cache2);
assertNotNull(cache3);

for (int i = 0; i < keysCnt; i++) {
assertEquals(cacheName1, Integer.valueOf(i), cache1.get(i));
assertEquals(cacheName2, Integer.valueOf(i), cache2.get(i));
assertEquals(cacheName3, Integer.valueOf(i), cache2.get(i));
}

cache1.destroy();
cache2.destroy();
cache3.destroy();

awaitPartitionMapExchange();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,16 @@ public abstract class AbstractSnapshotVerificationTask extends
if (meta == null)
continue;

jobs.put(createJob(meta.snapshotName(), arg.snapshotPath(), meta.consistentId(), arg.cacheGroupNames()),
e.getKey());
jobs.put(
createJob(
meta.snapshotName(),
arg.snapshotPath(),
meta.consistentId(),
arg.cacheGroupNames(),
arg.check()
),
e.getKey()
);

if (allMetas.isEmpty())
break;
Expand Down Expand Up @@ -126,7 +134,14 @@ public static void checkMissedMetadata(Collection<SnapshotMetadata> clusterMetas
* @param path Snapshot directory path.
* @param constId Snapshot metadata file name.
* @param groups Cache groups to be restored from the snapshot. May be empty if all cache groups are being restored.
* @param check If {@code true} check snapshot before restore.
* @return Compute job.
*/
protected abstract ComputeJob createJob(String name, @Nullable String path, String constId, Collection<String> groups);
protected abstract ComputeJob createJob(
String name,
@Nullable String path,
String constId,
Collection<String> groups,
boolean check
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
/** Default value of {@link IgniteSystemProperties#IGNITE_SNAPSHOT_SEQUENTIAL_WRITE}. */
public static final boolean DFLT_IGNITE_SNAPSHOT_SEQUENTIAL_WRITE = true;

/** Default value of check flag. */
public static final boolean DFLT_CHECK_ON_RESTORE = false;

/** @deprecated Use #SNP_RUNNING_DIR_KEY instead. */
@Deprecated
private static final String SNP_RUNNING_KEY = "snapshot-running";
Expand Down Expand Up @@ -1169,7 +1172,7 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalFullSnapshot(
);

SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta, req.groups(), cctx.localNode(), snpDir,
req.streamerWarning());
req.streamerWarning(), true);

req.meta(meta);

Expand Down Expand Up @@ -1772,7 +1775,7 @@ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(St

cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT);

return checkSnapshot(name, snpPath, null, false, incIdx).chain(f -> {
return checkSnapshot(name, snpPath, null, false, incIdx, true).chain(f -> {
try {
return f.get();
}
Expand All @@ -1793,6 +1796,7 @@ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(St
* @param includeCustomHandlers {@code True} to invoke all user-defined {@link SnapshotHandlerType#RESTORE}
* handlers, otherwise only system consistency check will be performed.
* @param incIdx Incremental snapshot index.
* @param check If {@code true} check snapshot integrity.
* @return Future with the result of execution snapshot partitions verify task, which besides calculating partition
* hashes of {@link IdleVerifyResultV2} also contains the snapshot metadata distribution across the cluster.
*/
Expand All @@ -1801,7 +1805,8 @@ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(
@Nullable String snpPath,
@Nullable Collection<String> grps,
boolean includeCustomHandlers,
int incIdx
int incIdx,
boolean check
) {
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
Expand Down Expand Up @@ -1901,7 +1906,7 @@ public IgniteInternalFuture<SnapshotPartitionsVerifyTaskResult> checkSnapshot(

kctx0.task().execute(
cls,
new SnapshotPartitionsVerifyTaskArg(grps, metas, snpPath),
new SnapshotPartitionsVerifyTaskArg(grps, metas, snpPath, check),
options(new ArrayList<>(metas.keySet()))
).listen(f1 -> {
if (f1.error() == null)
Expand Down Expand Up @@ -2262,7 +2267,7 @@ void warnAtomicCachesInIncrementalSnapshot(String snpName, int incIdx, Collectio

/** {@inheritDoc} */
@Override public IgniteFuture<Void> restoreSnapshot(String name, @Nullable Collection<String> grpNames) {
return restoreSnapshot(name, null, grpNames, 0);
return restoreSnapshot(name, null, grpNames, 0, DFLT_CHECK_ON_RESTORE);
}

/** {@inheritDoc} */
Expand All @@ -2273,7 +2278,7 @@ void warnAtomicCachesInIncrementalSnapshot(String snpName, int incIdx, Collectio
) {
A.ensure(incIdx > 0, "Incremental snapshot index must be greater than 0.");

return restoreSnapshot(name, null, grpNames, incIdx);
return restoreSnapshot(name, null, grpNames, incIdx, DFLT_CHECK_ON_RESTORE);
}

/**
Expand All @@ -2285,7 +2290,7 @@ void warnAtomicCachesInIncrementalSnapshot(String snpName, int incIdx, Collectio
* @return Future which will be completed when restore operation finished.
*/
public IgniteFutureImpl<Void> restoreSnapshot(String name, @Nullable String snpPath, @Nullable Collection<String> grpNames) {
return restoreSnapshot(name, snpPath, grpNames, 0);
return restoreSnapshot(name, snpPath, grpNames, 0, DFLT_CHECK_ON_RESTORE);
}

/**
Expand All @@ -2295,21 +2300,23 @@ public IgniteFutureImpl<Void> restoreSnapshot(String name, @Nullable String snpP
* @param snpPath Snapshot directory path.
* @param grpNames Cache groups to be restored or {@code null} to restore all cache groups from the snapshot.
* @param incIdx Index of incremental snapshot.
* @param check If {@code true} check snapshot before restore.
* @return Future which will be completed when restore operation finished.
*/
public IgniteFutureImpl<Void> restoreSnapshot(
String name,
@Nullable String snpPath,
@Nullable Collection<String> grpNames,
int incIdx
int incIdx,
boolean check
) {
A.notNullOrEmpty(name, "Snapshot name cannot be null or empty.");
A.ensure(U.alphanumericUnderscore(name), "Snapshot name must satisfy the following name pattern: a-zA-Z0-9_");
A.ensure(grpNames == null || !grpNames.isEmpty(), "List of cache group names cannot be empty.");

cctx.kernalContext().security().authorize(ADMIN_SNAPSHOT);

return restoreCacheGrpProc.start(name, snpPath, grpNames, incIdx);
return restoreCacheGrpProc.start(name, snpPath, grpNames, incIdx, check);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,32 @@ public class SnapshotHandlerContext {
/** Warning flag of concurrent inconsistent-by-nature streamer updates. */
private final boolean streamerWrn;

/** If {@code true} check snapshot integrity. */
private final boolean check;

/**
* @param metadata Snapshot metadata.
* @param grps The names of the cache groups on which the operation is performed.
* {@code False} otherwise. Always {@code false} for snapshot restoration.
* @param locNode Local node.
* @param snpDir The full path to the snapshot files.
* @param streamerWrn {@code True} if concurrent streaming updates occurred during snapshot operation.
* @param check If {@code true} check snapshot integrity.
*/
public SnapshotHandlerContext(SnapshotMetadata metadata, @Nullable Collection<String> grps, ClusterNode locNode,
File snpDir, boolean streamerWrn) {
public SnapshotHandlerContext(
SnapshotMetadata metadata,
@Nullable Collection<String> grps,
ClusterNode locNode,
File snpDir,
boolean streamerWrn,
boolean check
) {
this.metadata = metadata;
this.grps = grps;
this.locNode = locNode;
this.snpDir = snpDir;
this.streamerWrn = streamerWrn;
this.check = check;
}

/**
Expand Down Expand Up @@ -93,4 +104,9 @@ public ClusterNode localNode() {
public boolean streamerWarning() {
return streamerWrn;
}

/** @return If {@code true} check snapshot integrity. */
public boolean check() {
return check;
}
}
Loading

0 comments on commit dd777fb

Please sign in to comment.