diff --git a/azure-pipelines.yml b/azure-pipelines.yml
index 621d0ac50..2498f2115 100644
--- a/azure-pipelines.yml
+++ b/azure-pipelines.yml
@@ -90,6 +90,7 @@ jobs:
inputs:
testRunner: VSTest
testResultsFiles: '**/*.trx'
+ searchFolder: '$(Agent.TempDirectory)'
- job: 'cppWindows'
pool:
@@ -152,11 +153,6 @@ jobs:
workingDirectory: 'cc/build/Debug'
displayName: 'Run Tests (Debug)'
- #- script: |
- # CTEST_OUTPUT_ON_FAILURE=1 make test
- # workingDirectory: 'cc/build/Release'
- # displayName: 'Run Tests (Release)'
-
- job: 'csharpLinux'
pool:
vmImage: ubuntu-16.04
@@ -187,8 +183,15 @@ jobs:
inputs:
command: test
projects: '**/test/*.csproj'
- arguments: '--configuration $(buildConfiguration) --framework netcoreapp3.1'
-
+ arguments: '--configuration $(buildConfiguration) --framework netcoreapp3.1 -l "console;verbosity=detailed"'
+
+ - task: PublishTestResults@2
+ displayName: 'Publish Test Results'
+ inputs:
+ testResultsFormat: 'VSTest'
+ testResultsFiles: '*.trx'
+ searchFolder: '$(Agent.TempDirectory)'
+
# - job: 'cppBlobsWindows'
# pool:
# vmImage: vs2017-win2016
diff --git a/cs/FASTER.sln b/cs/FASTER.sln
index 5d201b180..28aecb198 100644
--- a/cs/FASTER.sln
+++ b/cs/FASTER.sln
@@ -83,6 +83,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{C60F148B-2
..\docs\_docs\96-slides-videos.md = ..\docs\_docs\96-slides-videos.md
EndProjectSection
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TstRunner", "playground\TstRunner\TstRunner.csproj", "{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -275,6 +277,13 @@ Global
{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693}.Release|Any CPU.Build.0 = Release|x64
{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693}.Release|x64.ActiveCfg = Release|x64
{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693}.Release|x64.Build.0 = Release|x64
+ {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Debug|Any CPU.ActiveCfg = Debug|x64
+ {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Debug|Any CPU.Build.0 = Debug|x64
+ {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Debug|x64.ActiveCfg = Debug|x64
+ {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Debug|x64.Build.0 = Debug|x64
+ {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|Any CPU.ActiveCfg = Release|x64
+ {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|x64.ActiveCfg = Release|x64
+ {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -305,6 +314,7 @@ Global
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{9EFCF8C5-320B-473C-83DE-3815981D465B} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
+ {A265D9D2-3FEA-48BB-B1CC-273ECFEA0611} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
diff --git a/cs/playground/AsyncStress/FasterWrapper.cs b/cs/playground/AsyncStress/FasterWrapper.cs
index 834d55d8a..12a01db8c 100644
--- a/cs/playground/AsyncStress/FasterWrapper.cs
+++ b/cs/playground/AsyncStress/FasterWrapper.cs
@@ -1,4 +1,7 @@
-using FASTER.core;
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using FASTER.core;
using Xunit;
using System;
using System.IO;
diff --git a/cs/playground/AsyncStress/Program.cs b/cs/playground/AsyncStress/Program.cs
index 021af0dcf..d3b3066fc 100644
--- a/cs/playground/AsyncStress/Program.cs
+++ b/cs/playground/AsyncStress/Program.cs
@@ -1,4 +1,7 @@
-using System;
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Xunit;
diff --git a/cs/playground/CacheStoreConcurrent/Types.cs b/cs/playground/CacheStoreConcurrent/Types.cs
index e9bd1e66f..4d5ae58ce 100644
--- a/cs/playground/CacheStoreConcurrent/Types.cs
+++ b/cs/playground/CacheStoreConcurrent/Types.cs
@@ -1,14 +1,9 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
-using FASTER.core;
using System;
-using System.Collections.Generic;
using System.Diagnostics;
-using System.IO;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
+using FASTER.core;
namespace CacheStoreConcurrent
{
diff --git a/cs/playground/FasterLogMLSDTest/Program.cs b/cs/playground/FasterLogMLSDTest/Program.cs
index 32143a9ad..474c728a1 100644
--- a/cs/playground/FasterLogMLSDTest/Program.cs
+++ b/cs/playground/FasterLogMLSDTest/Program.cs
@@ -1,4 +1,7 @@
-using System;
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
diff --git a/cs/playground/TstRunner/Program.cs b/cs/playground/TstRunner/Program.cs
new file mode 100644
index 000000000..6a85bd932
--- /dev/null
+++ b/cs/playground/TstRunner/Program.cs
@@ -0,0 +1,19 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using FASTER.core;
+using FASTER.test.recovery.objects;
+
+namespace TstRunner
+{
+ public class Program
+ {
+ public static void Main()
+ {
+ var test = new ObjectRecoveryTests2();
+ test.Setup();
+ test.ObjectRecoveryTest2(CheckpointType.Snapshot, 400, false).GetAwaiter().GetResult();
+ test.TearDown();
+ }
+ }
+}
diff --git a/cs/playground/TstRunner/TstRunner.csproj b/cs/playground/TstRunner/TstRunner.csproj
new file mode 100644
index 000000000..3b7b751f4
--- /dev/null
+++ b/cs/playground/TstRunner/TstRunner.csproj
@@ -0,0 +1,19 @@
+
+
+
+ Exe
+ netcoreapp3.1
+ x64
+ true
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/cs/samples/SecondaryReaderStore/Program.cs b/cs/samples/SecondaryReaderStore/Program.cs
index 12adf06b5..08cc41a5e 100644
--- a/cs/samples/SecondaryReaderStore/Program.cs
+++ b/cs/samples/SecondaryReaderStore/Program.cs
@@ -79,7 +79,7 @@ static void SecondaryReader()
{
try
{
- secondaryStore.Recover(undoFutureVersions: false); // read-only recovery, no writing back undos
+ secondaryStore.Recover(undoNextVersion: false); // read-only recovery, no writing back undos
}
catch
{
diff --git a/cs/samples/StoreCheckpointRecover/Program.cs b/cs/samples/StoreCheckpointRecover/Program.cs
index 619f8e0b8..ff3c94268 100644
--- a/cs/samples/StoreCheckpointRecover/Program.cs
+++ b/cs/samples/StoreCheckpointRecover/Program.cs
@@ -57,7 +57,7 @@ static void Main()
Console.WriteLine("Error!");
}
- // Take fold-over checkpoint of FASTER, wait to complete
+ // Take index + fold-over checkpoint of FASTER, wait to complete
store.TakeFullCheckpointAsync(CheckpointType.FoldOver)
.GetAwaiter().GetResult();
diff --git a/cs/src/core/Allocator/AllocatorBase.cs b/cs/src/core/Allocator/AllocatorBase.cs
index c0a4c2ddc..34e460525 100644
--- a/cs/src/core/Allocator/AllocatorBase.cs
+++ b/cs/src/core/Allocator/AllocatorBase.cs
@@ -21,6 +21,8 @@ internal struct FullPageStatus
public long LastFlushedUntilAddress;
[FieldOffset(8)]
public long LastClosedUntilAddress;
+ [FieldOffset(16)]
+ public int Dirty;
}
[StructLayout(LayoutKind.Explicit)]
@@ -379,7 +381,7 @@ public abstract (int, int) GetInitialRecordSize(ref Key ke
///
///
///
- protected abstract bool IsAllocated(int pageIndex);
+ internal abstract bool IsAllocated(int pageIndex);
///
/// Populate page
///
@@ -401,6 +403,141 @@ public abstract (int, int) GetInitialRecordSize(ref Key ke
///
protected abstract void WriteAsyncToDevice(long startPage, long flushPage, int pageSize, DeviceIOCompletionCallback callback, PageAsyncFlushResult result, IDevice device, IDevice objectLogDevice, long[] localSegmentOffsets);
+ ///
+ /// Delta flush
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, int version, DeltaLog deltaLog)
+ {
+ long startPage = GetPage(startAddress);
+ long endPage = GetPage(endAddress);
+ if (endAddress > GetStartLogicalAddress(endPage))
+ endPage++;
+
+ long prevEndPage = GetPage(prevEndAddress);
+
+ deltaLog.Allocate(out int entryLength, out long destPhysicalAddress);
+ int destOffset = 0;
+
+ for (long p = startPage; p < endPage; p++)
+ {
+ // All RCU pages need to be added to delta
+ // For IPU-only pages, prune based on dirty bit
+ if ((p < prevEndPage || endAddress == prevEndAddress) && PageStatusIndicator[p % BufferSize].Dirty < version)
+ continue;
+
+ var logicalAddress = p << LogPageSizeBits;
+ var physicalAddress = GetPhysicalAddress(logicalAddress);
+ var endPhysicalAddress = physicalAddress + PageSize;
+
+ if (p == startPage)
+ {
+ physicalAddress += (int)(startAddress & PageSizeMask);
+ logicalAddress += (int)(startAddress & PageSizeMask);
+ }
+
+ while (physicalAddress < endPhysicalAddress)
+ {
+ ref var info = ref GetInfo(physicalAddress);
+ var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress);
+ if (info.Version == RecordInfo.GetShortVersion(version))
+ {
+ int size = sizeof(long) + sizeof(int) + alignedRecordSize;
+ if (destOffset + size > entryLength)
+ {
+ deltaLog.Seal(destOffset);
+ deltaLog.Allocate(out entryLength, out destPhysicalAddress);
+ destOffset = 0;
+ if (destOffset + size > entryLength)
+ throw new FasterException("Insufficient page size to write delta");
+ }
+ *((long*)(destPhysicalAddress + destOffset)) = logicalAddress;
+ destOffset += sizeof(long);
+ *((int*)(destPhysicalAddress + destOffset)) = alignedRecordSize;
+ destOffset += sizeof(int);
+ Buffer.MemoryCopy((void*)physicalAddress, (void*)(destPhysicalAddress + destOffset), alignedRecordSize, alignedRecordSize);
+ destOffset += alignedRecordSize;
+ }
+ physicalAddress += alignedRecordSize;
+ logicalAddress += alignedRecordSize;
+ }
+ }
+
+ if (destOffset > 0)
+ deltaLog.Seal(destOffset);
+ }
+
+ internal unsafe void ApplyDelta(DeltaLog log, long startPage, long endPage)
+ {
+ if (log == null) return;
+
+ long startLogicalAddress = GetStartLogicalAddress(startPage);
+ long endLogicalAddress = GetStartLogicalAddress(endPage);
+
+ log.Reset();
+ while (log.GetNext(out long physicalAddress, out int entryLength, out int type))
+ {
+ if (type != 0) continue; // consider only delta records
+ long endAddress = physicalAddress + entryLength;
+ while (physicalAddress < endAddress)
+ {
+ long address = *(long*)physicalAddress;
+ physicalAddress += sizeof(long);
+ int size = *(int*)physicalAddress;
+ physicalAddress += sizeof(int);
+ if (address >= startLogicalAddress && address < endLogicalAddress)
+ {
+ var destination = GetPhysicalAddress(address);
+ Buffer.MemoryCopy((void*)physicalAddress, (void*)destination, size, size);
+ }
+ physicalAddress += size;
+ }
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal void MarkPage(long logicalAddress, int version)
+ {
+ var offset = (logicalAddress >> LogPageSizeBits) % BufferSize;
+ if (PageStatusIndicator[offset].Dirty < version)
+ PageStatusIndicator[offset].Dirty = version;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ internal void MarkPageAtomic(long logicalAddress, int version)
+ {
+ var offset = (logicalAddress >> LogPageSizeBits) % BufferSize;
+ Utility.MonotonicUpdate(ref PageStatusIndicator[offset].Dirty, version, out _);
+ }
+
+ internal void WriteAsync(IntPtr alignedSourceAddress, ulong alignedDestinationAddress, uint numBytesToWrite,
+ DeviceIOCompletionCallback callback, PageAsyncFlushResult asyncResult,
+ IDevice device)
+ {
+ if (asyncResult.partial)
+ {
+ // Write only required bytes within the page
+ int aligned_start = (int)((asyncResult.fromAddress - (asyncResult.page << LogPageSizeBits)));
+ aligned_start = (aligned_start / sectorSize) * sectorSize;
+
+ int aligned_end = (int)((asyncResult.untilAddress - (asyncResult.page << LogPageSizeBits)));
+ aligned_end = ((aligned_end + (sectorSize - 1)) & ~(sectorSize - 1));
+
+ numBytesToWrite = (uint)(aligned_end - aligned_start);
+ device.WriteAsync(alignedSourceAddress + aligned_start, alignedDestinationAddress + (ulong)aligned_start, numBytesToWrite, callback, asyncResult);
+ }
+ else
+ {
+ device.WriteAsync(alignedSourceAddress, alignedDestinationAddress,
+ numBytesToWrite, callback, asyncResult);
+ }
+ }
+
+
///
/// Read objects to memory (async)
///
@@ -427,7 +564,7 @@ public abstract (int, int) GetInitialRecordSize(ref Key ke
///
/// Page number to be cleared
/// Offset to clear from (if partial clear)
- protected abstract void ClearPage(long page, int offset = 0);
+ internal abstract void ClearPage(long page, int offset = 0);
///
/// Write page (async)
///
@@ -1796,7 +1933,7 @@ private void AsyncFlushPageCallback(uint errorCode, uint numBytes, object contex
///
///
///
- private void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, object context)
+ protected void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, object context)
{
try
{
diff --git a/cs/src/core/Allocator/BlittableAllocator.cs b/cs/src/core/Allocator/BlittableAllocator.cs
index a6f288473..02ae2a5c4 100644
--- a/cs/src/core/Allocator/BlittableAllocator.cs
+++ b/cs/src/core/Allocator/BlittableAllocator.cs
@@ -21,6 +21,7 @@ public unsafe sealed class BlittableAllocator : AllocatorBase));
+ private static readonly int recordInfoSize = Utility.GetSize(default(RecordInfo));
private static readonly int keySize = Utility.GetSize(default(Key));
private static readonly int valueSize = Utility.GetSize(default(Value));
@@ -145,7 +146,7 @@ public override long GetPhysicalAddress(long logicalAddress)
return *(nativePointers + pageIndex) + offset;
}
- protected override bool IsAllocated(int pageIndex)
+ internal override bool IsAllocated(int pageIndex)
{
return values[pageIndex] != null;
}
@@ -195,7 +196,7 @@ public override long GetFirstValidLogicalAddress(long page)
return page << LogPageSizeBits;
}
- protected override void ClearPage(long page, int offset)
+ internal override void ClearPage(long page, int offset)
{
if (offset == 0)
Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset);
@@ -223,30 +224,6 @@ internal override void DeleteFromMemory()
values = null;
}
-
- private void WriteAsync(IntPtr alignedSourceAddress, ulong alignedDestinationAddress, uint numBytesToWrite,
- DeviceIOCompletionCallback callback, PageAsyncFlushResult asyncResult,
- IDevice device)
- {
- if (asyncResult.partial)
- {
- // Write only required bytes within the page
- int aligned_start = (int)((asyncResult.fromAddress - (asyncResult.page << LogPageSizeBits)));
- aligned_start = (aligned_start / sectorSize) * sectorSize;
-
- int aligned_end = (int)((asyncResult.untilAddress - (asyncResult.page << LogPageSizeBits)));
- aligned_end = ((aligned_end + (sectorSize - 1)) & ~(sectorSize - 1));
-
- numBytesToWrite = (uint)(aligned_end - aligned_start);
- device.WriteAsync(alignedSourceAddress + aligned_start, alignedDestinationAddress + (ulong)aligned_start, numBytesToWrite, callback, asyncResult);
- }
- else
- {
- device.WriteAsync(alignedSourceAddress, alignedDestinationAddress,
- numBytesToWrite, callback, asyncResult);
- }
- }
-
protected override void ReadAsync(
ulong alignedSourceAddress, int destinationPageIndex, uint aligned_read_length,
DeviceIOCompletionCallback callback, PageAsyncReadResult asyncResult, IDevice device, IDevice objlogDevice)
diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs
index 10f2f6675..75af9368e 100644
--- a/cs/src/core/Allocator/GenericAllocator.cs
+++ b/cs/src/core/Allocator/GenericAllocator.cs
@@ -238,7 +238,7 @@ public override long GetPhysicalAddress(long logicalAddress)
return logicalAddress;
}
- protected override bool IsAllocated(int pageIndex)
+ internal override bool IsAllocated(int pageIndex)
{
return values[pageIndex] != null;
}
@@ -259,19 +259,41 @@ protected override void WriteAsync(long flushPage, DeviceIOCompletionC
}
protected override void WriteAsyncToDevice
- (long startPage, long flushPage, int pageSize, DeviceIOCompletionCallback callback,
+ (long startPage, long flushPage, int pageSize, DeviceIOCompletionCallback callback,
PageAsyncFlushResult asyncResult, IDevice device, IDevice objectLogDevice, long[] localSegmentOffsets)
{
- // We are writing to separate device, so use fresh segment offsets
- WriteAsync(flushPage,
- (ulong)(AlignedPageSizeBytes * (flushPage - startPage)),
- (uint)pageSize, callback, asyncResult,
- device, objectLogDevice, flushPage, localSegmentOffsets);
+ bool epochTaken = false;
+ if (!epoch.ThisInstanceProtected())
+ {
+ epochTaken = true;
+ epoch.Resume();
+ }
+ try
+ {
+ if (FlushedUntilAddress < (flushPage << LogPageSizeBits) + pageSize)
+ {
+ // We are writing to separate device, so use fresh segment offsets
+ WriteAsync(flushPage,
+ (ulong)(AlignedPageSizeBytes * (flushPage - startPage)),
+ (uint)pageSize, callback, asyncResult,
+ device, objectLogDevice, flushPage, localSegmentOffsets);
+ }
+ else
+ {
+ // Requested page is already flushed to main log, ignore
+ callback(0, 0, asyncResult);
+ }
+ }
+ finally
+ {
+ if (epochTaken)
+ epoch.Suspend();
+ }
}
- protected override void ClearPage(long page, int offset)
+ internal override void ClearPage(long page, int offset)
{
Array.Clear(values[page % BufferSize], offset / recordSize, values[page % BufferSize].Length - offset / recordSize);
@@ -997,5 +1019,10 @@ internal override void MemoryPageScan(long beginAddress, long endAddress)
epoch.Resume();
}
}
+
+ internal override void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, int version, DeltaLog deltaLog)
+ {
+ throw new FasterException("Incremental snapshots not supported with generic allocator");
+ }
}
}
diff --git a/cs/src/core/Allocator/ScanIteratorBase.cs b/cs/src/core/Allocator/ScanIteratorBase.cs
index ab9e17818..37a110ec6 100644
--- a/cs/src/core/Allocator/ScanIteratorBase.cs
+++ b/cs/src/core/Allocator/ScanIteratorBase.cs
@@ -36,10 +36,10 @@ public abstract class ScanIteratorBase
///
protected long currentAddress, nextAddress;
- private readonly CountdownEvent[] loaded;
- private readonly CancellationTokenSource[] loadedCancel;
- private readonly long[] loadedPage;
- private readonly long[] nextLoadedPage;
+ private CountdownEvent[] loaded;
+ private CancellationTokenSource[] loadedCancel;
+ private long[] loadedPage;
+ private long[] nextLoadedPage;
private readonly int logPageSizeBits;
///
@@ -60,10 +60,11 @@ public abstract class ScanIteratorBase
///
///
///
- public unsafe ScanIteratorBase(long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch, int logPageSizeBits)
+ ///
+ public unsafe ScanIteratorBase(long beginAddress, long endAddress, ScanBufferingMode scanBufferingMode, LightEpoch epoch, int logPageSizeBits, bool initForReads = true)
{
// If we are protected when creating the iterator, we do not need per-GetNext protection
- if (!epoch.ThisInstanceProtected())
+ if (epoch != null && !epoch.ThisInstanceProtected())
this.epoch = epoch;
this.beginAddress = beginAddress;
@@ -82,7 +83,14 @@ public unsafe ScanIteratorBase(long beginAddress, long endAddress, ScanBuffering
frameSize = 0;
return;
}
+ if (initForReads) InitializeForReads();
+ }
+ ///
+ /// Initialize for reads
+ ///
+ public virtual void InitializeForReads()
+ {
loaded = new CountdownEvent[frameSize];
loadedCancel = new CancellationTokenSource[frameSize];
loadedPage = new long[frameSize];
@@ -93,6 +101,8 @@ public unsafe ScanIteratorBase(long beginAddress, long endAddress, ScanBuffering
nextLoadedPage[i] = -1;
loadedCancel[i] = new CancellationTokenSource();
}
+ currentAddress = -1;
+ nextAddress = beginAddress;
}
///
@@ -129,11 +139,19 @@ protected unsafe bool BufferAndLoad(long currentAddress, long currentPage, long
if (val < pageEndAddress && Interlocked.CompareExchange(ref nextLoadedPage[nextFrame], pageEndAddress, val) == val)
{
var tmp_i = i;
- epoch.BumpCurrentEpoch(() =>
+ if (epoch != null)
+ {
+ epoch.BumpCurrentEpoch(() =>
+ {
+ AsyncReadPagesFromDeviceToFrame(tmp_i + (currentAddress >> logPageSizeBits), 1, endAddress, Empty.Default, out loaded[nextFrame], 0, null, null, loadedCancel[nextFrame]);
+ loadedPage[nextFrame] = pageEndAddress;
+ });
+ }
+ else
{
AsyncReadPagesFromDeviceToFrame(tmp_i + (currentAddress >> logPageSizeBits), 1, endAddress, Empty.Default, out loaded[nextFrame], 0, null, null, loadedCancel[nextFrame]);
loadedPage[nextFrame] = pageEndAddress;
- });
+ }
}
else
epoch?.ProtectAndDrain();
@@ -172,18 +190,40 @@ private bool WaitForFrameLoad(long currentAddress, long currentFrame)
///
public virtual void Dispose()
{
- // Wait for ongoing reads to complete/fail
- for (int i = 0; i < frameSize; i++)
+ if (loaded != null)
{
- if (loadedPage[i] != -1)
+ // Wait for ongoing reads to complete/fail
+ for (int i = 0; i < frameSize; i++)
{
- try
+ if (loadedPage[i] != -1)
{
- loaded[i].Wait(loadedCancel[i].Token);
+ try
+ {
+ loaded[i].Wait(loadedCancel[i].Token);
+ }
+ catch { }
}
- catch { }
}
}
}
+
+ ///
+ /// Reset iterator
+ ///
+ public void Reset()
+ {
+ loaded = new CountdownEvent[frameSize];
+ loadedCancel = new CancellationTokenSource[frameSize];
+ loadedPage = new long[frameSize];
+ nextLoadedPage = new long[frameSize];
+ for (int i = 0; i < frameSize; i++)
+ {
+ loadedPage[i] = -1;
+ nextLoadedPage[i] = -1;
+ loadedCancel[i] = new CancellationTokenSource();
+ }
+ currentAddress = -1;
+ nextAddress = beginAddress;
+ }
}
}
diff --git a/cs/src/core/Allocator/VarLenBlittableAllocator.cs b/cs/src/core/Allocator/VarLenBlittableAllocator.cs
index 81df3806b..d6c4af38d 100644
--- a/cs/src/core/Allocator/VarLenBlittableAllocator.cs
+++ b/cs/src/core/Allocator/VarLenBlittableAllocator.cs
@@ -250,7 +250,7 @@ public override long GetPhysicalAddress(long logicalAddress)
return *(nativePointers + pageIndex) + offset;
}
- protected override bool IsAllocated(int pageIndex)
+ internal override bool IsAllocated(int pageIndex)
{
return values[pageIndex] != null;
}
@@ -300,7 +300,7 @@ public override long GetFirstValidLogicalAddress(long page)
return page << LogPageSizeBits;
}
- protected override void ClearPage(long page, int offset)
+ internal override void ClearPage(long page, int offset)
{
if (offset == 0)
Array.Clear(values[page % BufferSize], offset, values[page % BufferSize].Length - offset);
@@ -328,29 +328,6 @@ internal override void DeleteFromMemory()
values = null;
}
-
- private void WriteAsync(IntPtr alignedSourceAddress, ulong alignedDestinationAddress, uint numBytesToWrite,
- DeviceIOCompletionCallback callback, PageAsyncFlushResult asyncResult,
- IDevice device)
- {
- if (asyncResult.partial)
- {
- // Write only required bytes within the page
- int aligned_start = (int)((asyncResult.fromAddress - (asyncResult.page << LogPageSizeBits)));
- aligned_start = (aligned_start / sectorSize) * sectorSize;
-
- int aligned_end = (int)((asyncResult.untilAddress - (asyncResult.page << LogPageSizeBits)));
- aligned_end = ((aligned_end + (sectorSize - 1)) & ~(sectorSize - 1));
-
- numBytesToWrite = (uint)(aligned_end - aligned_start);
- device.WriteAsync(alignedSourceAddress + aligned_start, alignedDestinationAddress + (ulong)aligned_start, numBytesToWrite, callback, asyncResult);
- }
- else
- {
- device.WriteAsync(alignedSourceAddress, alignedDestinationAddress, numBytesToWrite, callback, asyncResult);
- }
- }
-
protected override void ReadAsync(
ulong alignedSourceAddress, int destinationPageIndex, uint aligned_read_length,
DeviceIOCompletionCallback callback, PageAsyncReadResult asyncResult, IDevice device, IDevice objlogDevice)
diff --git a/cs/src/core/ClientSession/AdvancedClientSession.cs b/cs/src/core/ClientSession/AdvancedClientSession.cs
index 99033eeb2..4bd6c686d 100644
--- a/cs/src/core/ClientSession/AdvancedClientSession.cs
+++ b/cs/src/core/ClientSession/AdvancedClientSession.cs
@@ -880,7 +880,10 @@ public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref Reco
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool ConcurrentWriterNoLock(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address)
- => _clientSession.functions.ConcurrentWriter(ref key, ref src, ref dst, ref recordInfo, address);
+ {
+ recordInfo.Version = _clientSession.ctx.version;
+ return _clientSession.functions.ConcurrentWriter(ref key, ref src, ref dst, ref recordInfo, address);
+ }
private bool ConcurrentWriterLock(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address)
{
@@ -909,6 +912,7 @@ public void ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recor
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void ConcurrentDeleterNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address)
{
+ recordInfo.Version = _clientSession.ctx.version;
recordInfo.Tombstone = true;
_clientSession.functions.ConcurrentDeleter(ref key, ref value, ref recordInfo, address);
}
@@ -963,7 +967,10 @@ public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref Re
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address)
- => _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref recordInfo, address);
+ {
+ recordInfo.Version = _clientSession.ctx.version;
+ return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value, ref recordInfo, address);
+ }
private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address)
{
diff --git a/cs/src/core/ClientSession/ClientSession.cs b/cs/src/core/ClientSession/ClientSession.cs
index 2b6cd3643..dbc696abe 100644
--- a/cs/src/core/ClientSession/ClientSession.cs
+++ b/cs/src/core/ClientSession/ClientSession.cs
@@ -883,12 +883,15 @@ public void ConcurrentReaderLock(ref Key key, ref Input input, ref Value value,
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool ConcurrentWriter(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address)
=> !this.SupportsLocking
- ? ConcurrentWriterNoLock(ref key, ref src, ref dst, address)
+ ? ConcurrentWriterNoLock(ref key, ref src, ref dst, ref recordInfo, address)
: ConcurrentWriterLock(ref key, ref src, ref dst, ref recordInfo, address);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private bool ConcurrentWriterNoLock(ref Key key, ref Value src, ref Value dst, long address)
- => _clientSession.functions.ConcurrentWriter(ref key, ref src, ref dst);
+ private bool ConcurrentWriterNoLock(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address)
+ {
+ recordInfo.Version = _clientSession.ctx.version;
+ return _clientSession.functions.ConcurrentWriter(ref key, ref src, ref dst);
+ }
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool ConcurrentWriterLock(ref Key key, ref Value src, ref Value dst, ref RecordInfo recordInfo, long address)
@@ -897,7 +900,7 @@ private bool ConcurrentWriterLock(ref Key key, ref Value src, ref Value dst, ref
this.Lock(ref recordInfo, ref key, ref dst, LockType.Exclusive, ref context);
try
{
- return !recordInfo.Tombstone && ConcurrentWriterNoLock(ref key, ref src, ref dst, address);
+ return !recordInfo.Tombstone && ConcurrentWriterNoLock(ref key, ref src, ref dst, ref recordInfo, address);
}
finally
{
@@ -918,6 +921,7 @@ public void ConcurrentDeleter(ref Key key, ref Value value, ref RecordInfo recor
private void ConcurrentDeleterNoLock(ref Key key, ref Value value, ref RecordInfo recordInfo, long address)
{
// Non-Advanced IFunctions has no ConcurrentDeleter
+ recordInfo.Version = _clientSession.ctx.version;
recordInfo.Tombstone = true;
}
@@ -966,12 +970,15 @@ public void InitialUpdater(ref Key key, ref Input input, ref Value value)
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public bool InPlaceUpdater(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address)
=> !this.SupportsLocking
- ? InPlaceUpdaterNoLock(ref key, ref input, ref value, address)
+ ? InPlaceUpdaterNoLock(ref key, ref input, ref value, ref recordInfo, address)
: InPlaceUpdaterLock(ref key, ref input, ref value, ref recordInfo, address);
[MethodImpl(MethodImplOptions.AggressiveInlining)]
- private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Value value, long address)
- => _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value);
+ private bool InPlaceUpdaterNoLock(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address)
+ {
+ recordInfo.Version = _clientSession.ctx.version;
+ return _clientSession.functions.InPlaceUpdater(ref key, ref input, ref value);
+ }
private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Value value, ref RecordInfo recordInfo, long address)
{
@@ -979,7 +986,7 @@ private bool InPlaceUpdaterLock(ref Key key, ref Input input, ref Value value, r
this.Lock(ref recordInfo, ref key, ref value, LockType.Exclusive, ref context);
try
{
- return !recordInfo.Tombstone && InPlaceUpdaterNoLock(ref key, ref input, ref value, address);
+ return !recordInfo.Tombstone && InPlaceUpdaterNoLock(ref key, ref input, ref value, ref recordInfo, address);
}
finally
{
diff --git a/cs/src/core/Device/IDevice.cs b/cs/src/core/Device/IDevice.cs
index 2ea4df826..f96f64aca 100644
--- a/cs/src/core/Device/IDevice.cs
+++ b/cs/src/core/Device/IDevice.cs
@@ -174,6 +174,13 @@ public interface IDevice : IDisposable
///
/// index of the segment to remov
void RemoveSegment(int segment);
+
+ ///
+ /// Get file size for given segment
+ ///
+ ///
+ ///
+ long GetFileSize(int segment);
}
///
diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs
index 8d1fde608..300792bea 100644
--- a/cs/src/core/Device/LocalStorageDevice.cs
+++ b/cs/src/core/Device/LocalStorageDevice.cs
@@ -364,6 +364,14 @@ public override bool TryComplete()
}
}
+ ///
+ public override long GetFileSize(int segment)
+ {
+ if (segmentSize > 0) return segmentSize;
+ Native32.GetFileSizeEx(GetOrAddHandle(segment), out long size);
+ return size;
+ }
+
///
/// Creates a SafeFileHandle for the specified segment. This can be used by derived classes to prepopulate logHandles in the constructor.
///
diff --git a/cs/src/core/Device/ManagedLocalStorageDevice.cs b/cs/src/core/Device/ManagedLocalStorageDevice.cs
index 3979aaf5b..edfc0a38e 100644
--- a/cs/src/core/Device/ManagedLocalStorageDevice.cs
+++ b/cs/src/core/Device/ManagedLocalStorageDevice.cs
@@ -433,6 +433,21 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs
callback(result);
}
+ ///
+ public override long GetFileSize(int segment)
+ {
+ if (segmentSize > 0) return segmentSize;
+ var pool = GetOrAddHandle(segment);
+ long size;
+ if (!pool.Item1.TryGet(out var stream))
+ stream = pool.Item1.GetAsync().GetAwaiter().GetResult();
+
+ size = stream.Length;
+ pool.Item1.Return(stream);
+ return size;
+ }
+
+
///
/// Close device
///
diff --git a/cs/src/core/Device/StorageDeviceBase.cs b/cs/src/core/Device/StorageDeviceBase.cs
index 37605da47..2c7fda6bb 100644
--- a/cs/src/core/Device/StorageDeviceBase.cs
+++ b/cs/src/core/Device/StorageDeviceBase.cs
@@ -294,5 +294,12 @@ public virtual bool TryComplete()
{
return true;
}
+
+ ///
+ public virtual long GetFileSize(int segment)
+ {
+ if (segmentSize > 0) return segmentSize;
+ return long.MaxValue;
+ }
}
}
\ No newline at end of file
diff --git a/cs/src/core/Index/CheckpointManagement/DefaultCheckpointNamingScheme.cs b/cs/src/core/Index/CheckpointManagement/DefaultCheckpointNamingScheme.cs
index bcf128191..536701914 100644
--- a/cs/src/core/Index/CheckpointManagement/DefaultCheckpointNamingScheme.cs
+++ b/cs/src/core/Index/CheckpointManagement/DefaultCheckpointNamingScheme.cs
@@ -26,6 +26,9 @@ public DefaultCheckpointNamingScheme(string baseName = "")
///
public string BaseName() => baseName;
+ ///
+ public FileDescriptor LogCheckpointBase(Guid token) => new FileDescriptor($"{LogCheckpointBasePath()}/{token}", null);
+
///
public FileDescriptor LogCheckpointMetadata(Guid token) => new FileDescriptor($"{LogCheckpointBasePath()}/{token}", "info.dat");
@@ -33,8 +36,13 @@ public DefaultCheckpointNamingScheme(string baseName = "")
public FileDescriptor LogSnapshot(Guid token) => new FileDescriptor($"{LogCheckpointBasePath()}/{token}", "snapshot.dat");
///
public FileDescriptor ObjectLogSnapshot(Guid token) => new FileDescriptor($"{LogCheckpointBasePath()}/{token}", "snapshot.obj.dat");
+ ///
+ public FileDescriptor DeltaLog(Guid token) => new FileDescriptor($"{LogCheckpointBasePath()}/{token}", "delta.dat");
+ ///
+ public FileDescriptor IndexCheckpointBase(Guid token) => new FileDescriptor($"{IndexCheckpointBasePath()}/{token}", null);
+
///
public FileDescriptor IndexCheckpointMetadata(Guid token) => new FileDescriptor($"{IndexCheckpointBasePath()}/{token}", "info.dat");
///
diff --git a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
index 53baf2b78..fd77e59dc 100644
--- a/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
+++ b/cs/src/core/Index/CheckpointManagement/DeviceLogCommitCheckpointManager.cs
@@ -31,6 +31,11 @@ public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointMa
///
private long commitNum;
+ ///
+ /// Track historical commits for automatic purging
+ ///
+ private readonly Guid[] indexTokenHistory, logTokenHistory;
+ private int indexTokenHistoryOffset, logTokenHistoryOffset;
///
/// Create new instance of log commit manager
@@ -49,6 +54,14 @@ public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, IChec
this.overwriteLogCommits = overwriteLogCommits;
this.removeOutdated = removeOutdated;
+ if (removeOutdated)
+ {
+ // We keep two index checkpoints as the latest index might not have a
+ // later log checkpoint to work with
+ indexTokenHistory = new Guid[2];
+ // We only keep the latest log checkpoint
+ logTokenHistory = new Guid[1];
+ }
this._disposed = false;
deviceFactory.Initialize(checkpointNamingScheme.BaseName());
@@ -89,9 +102,11 @@ public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMet
WriteInto(device, 0, ms.ToArray(), (int)ms.Position);
if (!overwriteLogCommits)
+ {
device.Dispose();
-
- RemoveOutdated();
+ if (removeOutdated && commitNum > 1)
+ deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(commitNum - 2));
+ }
}
///
@@ -169,12 +184,6 @@ private IDevice NextCommitDevice()
return deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum++));
}
-
- private void RemoveOutdated()
- {
- if (removeOutdated && commitNum > 1)
- deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(commitNum - 2));
- }
#endregion
@@ -192,6 +201,15 @@ public unsafe void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
WriteInto(device, 0, ms.ToArray(), (int)ms.Position);
device.Dispose();
+
+ if (removeOutdated)
+ {
+ var prior = indexTokenHistory[indexTokenHistoryOffset];
+ indexTokenHistory[indexTokenHistoryOffset] = indexToken;
+ indexTokenHistoryOffset = (indexTokenHistoryOffset + 1) % indexTokenHistory.Length;
+ if (prior != default)
+ deviceFactory.Delete(checkpointNamingScheme.IndexCheckpointBase(prior));
+ }
}
///
@@ -230,6 +248,37 @@ public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
WriteInto(device, 0, ms.ToArray(), (int)ms.Position);
device.Dispose();
+
+ if (removeOutdated)
+ {
+ var prior = logTokenHistory[logTokenHistoryOffset];
+ logTokenHistory[logTokenHistoryOffset] = logToken;
+ logTokenHistoryOffset = (logTokenHistoryOffset + 1) % logTokenHistory.Length;
+ if (prior != default)
+ deviceFactory.Delete(checkpointNamingScheme.LogCheckpointBase(prior));
+ }
+ }
+
+ ///
+ public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog)
+ {
+ deltaLog.Allocate(out int length, out long physicalAddress);
+ if (length < commitMetadata.Length)
+ {
+ deltaLog.Seal(0, type: 1);
+ deltaLog.Allocate(out length, out physicalAddress);
+ if (length < commitMetadata.Length)
+ {
+ deltaLog.Seal(0);
+ throw new Exception($"Metadata of size {commitMetadata.Length} does not fit in delta log space of size {length}");
+ }
+ }
+ fixed (byte* ptr = commitMetadata)
+ {
+ Buffer.MemoryCopy(ptr, (void*)physicalAddress, commitMetadata.Length, commitMetadata.Length);
+ }
+ deltaLog.Seal(commitMetadata.Length, type: 1);
+ deltaLog.FlushAsync().Wait();
}
///
@@ -239,8 +288,27 @@ public IEnumerable GetLogCheckpointTokens()
}
///
- public byte[] GetLogCheckpointMetadata(Guid logToken)
+ public byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog)
{
+ byte[] metadata = null;
+ if (deltaLog != null)
+ {
+ // Try to get latest valid metadata from delta-log
+ deltaLog.Reset();
+ while (deltaLog.GetNext(out long physicalAddress, out int entryLength, out int type))
+ {
+ if (type != 1) continue; // consider only metadata records
+ long endAddress = physicalAddress + entryLength;
+ metadata = new byte[entryLength];
+ unsafe
+ {
+ fixed (byte* m = metadata)
+ Buffer.MemoryCopy((void*)physicalAddress, m, entryLength, entryLength);
+ }
+ }
+ if (metadata != null) return metadata;
+ }
+
var device = deviceFactory.Get(checkpointNamingScheme.LogCheckpointMetadata(logToken));
ReadInto(device, 0, out byte[] writePad, sizeof(int));
@@ -273,6 +341,12 @@ public IDevice GetSnapshotObjectLogDevice(Guid token)
return deviceFactory.Get(checkpointNamingScheme.ObjectLogSnapshot(token));
}
+ ///
+ public IDevice GetDeltaLogDevice(Guid token)
+ {
+ return deviceFactory.Get(checkpointNamingScheme.DeltaLog(token));
+ }
+
///
public void InitializeIndexCheckpoint(Guid indexToken)
{
@@ -283,23 +357,43 @@ public void InitializeLogCheckpoint(Guid logToken)
{
}
- private IDevice NextIndexCheckpointDevice(Guid token)
+ ///
+ public void OnRecovery(Guid indexToken, Guid logToken)
{
- if (!removeOutdated)
+ if (!removeOutdated) return;
+
+ // Add recovered tokens to history, for eventual purging
+ if (indexToken != default)
{
- return deviceFactory.Get(checkpointNamingScheme.IndexCheckpointMetadata(token));
+ indexTokenHistory[indexTokenHistoryOffset] = indexToken;
+ indexTokenHistoryOffset = (indexTokenHistoryOffset + 1) % indexTokenHistory.Length;
+ }
+ if (logToken != default)
+ {
+ logTokenHistory[logTokenHistoryOffset] = logToken;
+ logTokenHistoryOffset = (logTokenHistoryOffset + 1) % logTokenHistory.Length;
}
- throw new NotImplementedException();
- }
- private IDevice NextLogCheckpointDevice(Guid token)
- {
- if (!removeOutdated)
+ // Purge all log checkpoints that were not used for recovery
+ foreach (var recoveredLogToken in GetLogCheckpointTokens())
{
- return deviceFactory.Get(checkpointNamingScheme.LogCheckpointMetadata(token));
+ if (recoveredLogToken != logToken)
+ deviceFactory.Delete(checkpointNamingScheme.LogCheckpointBase(recoveredLogToken));
+ }
+
+ // Purge all index checkpoints that were not used for recovery
+ foreach (var recoveredIndexToken in GetIndexCheckpointTokens())
+ {
+ if (recoveredIndexToken != indexToken)
+ deviceFactory.Delete(checkpointNamingScheme.IndexCheckpointBase(recoveredIndexToken));
}
- throw new NotImplementedException();
}
+
+ private IDevice NextIndexCheckpointDevice(Guid token)
+ => deviceFactory.Get(checkpointNamingScheme.IndexCheckpointMetadata(token));
+
+ private IDevice NextLogCheckpointDevice(Guid token)
+ => deviceFactory.Get(checkpointNamingScheme.LogCheckpointMetadata(token));
#endregion
private unsafe void IOCallback(uint errorCode, uint numBytes, object context)
diff --git a/cs/src/core/Index/CheckpointManagement/ICheckpointNamingScheme.cs b/cs/src/core/Index/CheckpointManagement/ICheckpointNamingScheme.cs
index f2d4f6415..e11e660d1 100644
--- a/cs/src/core/Index/CheckpointManagement/ICheckpointNamingScheme.cs
+++ b/cs/src/core/Index/CheckpointManagement/ICheckpointNamingScheme.cs
@@ -28,6 +28,14 @@ public interface ICheckpointNamingScheme
///
FileDescriptor HashTable(Guid token);
+
+ ///
+ /// Index checkpoint base location (folder)
+ ///
+ ///
+ ///
+ FileDescriptor IndexCheckpointBase(Guid token);
+
///
/// Index checkpoint metadata
///
@@ -36,6 +44,13 @@ public interface ICheckpointNamingScheme
FileDescriptor IndexCheckpointMetadata(Guid token);
+ ///
+ /// Hybrid log checkpoint base location (folder)
+ ///
+ ///
+ ///
+ FileDescriptor LogCheckpointBase(Guid token);
+
///
/// Hybrid log checkpoint metadata
///
@@ -57,6 +72,13 @@ public interface ICheckpointNamingScheme
///
FileDescriptor ObjectLogSnapshot(Guid token);
+ ///
+ /// Delta log
+ ///
+ ///
+ ///
+ FileDescriptor DeltaLog(Guid token);
+
///
/// FasterLog commit metadata
///
diff --git a/cs/src/core/Index/Common/CheckpointSettings.cs b/cs/src/core/Index/Common/CheckpointSettings.cs
index a460c516b..d504f91a2 100644
--- a/cs/src/core/Index/Common/CheckpointSettings.cs
+++ b/cs/src/core/Index/Common/CheckpointSettings.cs
@@ -40,5 +40,10 @@ public class CheckpointSettings
/// using local storage device.
///
public string CheckpointDir = null;
+
+ ///
+ /// Whether FASTER should remove outdated checkpoints automatically
+ ///
+ public bool RemoveOutdated = false;
}
}
diff --git a/cs/src/core/Index/Common/Contexts.cs b/cs/src/core/Index/Common/Contexts.cs
index 5869573c4..c2959a98f 100644
--- a/cs/src/core/Index/Common/Contexts.cs
+++ b/cs/src/core/Index/Common/Contexts.cs
@@ -224,7 +224,7 @@ public struct CommitPoint
///
public struct HybridLogRecoveryInfo
{
- const int CheckpointVersion = 1;
+ const int CheckpointVersion = 2;
///
/// Guid
@@ -239,6 +239,10 @@ public struct HybridLogRecoveryInfo
///
public int version;
///
+ /// Next Version
+ ///
+ public int nextVersion;
+ ///
/// Flushed logical address
///
public long flushedLogicalAddress;
@@ -251,6 +255,11 @@ public struct HybridLogRecoveryInfo
///
public long finalLogicalAddress;
///
+ /// Snapshot end logical address: snaphot is [startLogicalAddress, snapshotFinalLogicalAddress)
+ /// Note that finalLogicalAddress may be higher due to delta records
+ ///
+ public long snapshotFinalLogicalAddress;
+ ///
/// Head address
///
public long headAddress;
@@ -274,6 +283,12 @@ public struct HybridLogRecoveryInfo
///
public long[] objectLogSegmentOffsets;
+
+ ///
+ /// Tail address of delta file
+ ///
+ public long deltaTailAddress;
+
///
/// Initialize
///
@@ -287,6 +302,8 @@ public void Initialize(Guid token, int _version)
flushedLogicalAddress = 0;
startLogicalAddress = 0;
finalLogicalAddress = 0;
+ snapshotFinalLogicalAddress = 0;
+ deltaTailAddress = 0;
headAddress = 0;
checkpointTokens = new ConcurrentDictionary();
@@ -317,6 +334,9 @@ public void Initialize(StreamReader reader)
value = reader.ReadLine();
version = int.Parse(value);
+ value = reader.ReadLine();
+ nextVersion = int.Parse(value);
+
value = reader.ReadLine();
flushedLogicalAddress = long.Parse(value);
@@ -326,12 +346,18 @@ public void Initialize(StreamReader reader)
value = reader.ReadLine();
finalLogicalAddress = long.Parse(value);
+ value = reader.ReadLine();
+ snapshotFinalLogicalAddress = long.Parse(value);
+
value = reader.ReadLine();
headAddress = long.Parse(value);
value = reader.ReadLine();
beginAddress = long.Parse(value);
+ value = reader.ReadLine();
+ deltaTailAddress = long.Parse(value);
+
value = reader.ReadLine();
var numSessions = int.Parse(value);
@@ -378,10 +404,11 @@ public void Initialize(StreamReader reader)
///
///
///
+ ///
///
- internal void Recover(Guid token, ICheckpointManager checkpointManager)
+ internal void Recover(Guid token, ICheckpointManager checkpointManager, DeltaLog deltaLog = null)
{
- var metadata = checkpointManager.GetLogCheckpointMetadata(token);
+ var metadata = checkpointManager.GetLogCheckpointMetadata(token, deltaLog);
if (metadata == null)
throw new FasterException("Invalid log commit metadata for ID " + token.ToString());
@@ -404,11 +431,14 @@ public byte[] ToByteArray()
writer.WriteLine(guid);
writer.WriteLine(useSnapshotFile);
writer.WriteLine(version);
+ writer.WriteLine(nextVersion);
writer.WriteLine(flushedLogicalAddress);
writer.WriteLine(startLogicalAddress);
writer.WriteLine(finalLogicalAddress);
+ writer.WriteLine(snapshotFinalLogicalAddress);
writer.WriteLine(headAddress);
writer.WriteLine(beginAddress);
+ writer.WriteLine(deltaTailAddress);
writer.WriteLine(checkpointTokens.Count);
foreach (var kvp in checkpointTokens)
@@ -439,7 +469,7 @@ private readonly long Checksum(int checkpointTokensCount)
var bytes = guid.ToByteArray();
var long1 = BitConverter.ToInt64(bytes, 0);
var long2 = BitConverter.ToInt64(bytes, 8);
- return long1 ^ long2 ^ version ^ flushedLogicalAddress ^ startLogicalAddress ^ finalLogicalAddress ^ headAddress ^ beginAddress
+ return long1 ^ long2 ^ version ^ flushedLogicalAddress ^ startLogicalAddress ^ finalLogicalAddress ^ snapshotFinalLogicalAddress ^ headAddress ^ beginAddress
^ checkpointTokensCount ^ (objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length);
}
@@ -450,12 +480,15 @@ public readonly void DebugPrint()
{
Debug.WriteLine("******** HybridLog Checkpoint Info for {0} ********", guid);
Debug.WriteLine("Version: {0}", version);
+ Debug.WriteLine("Next Version: {0}", nextVersion);
Debug.WriteLine("Is Snapshot?: {0}", useSnapshotFile == 1);
Debug.WriteLine("Flushed LogicalAddress: {0}", flushedLogicalAddress);
Debug.WriteLine("Start Logical Address: {0}", startLogicalAddress);
Debug.WriteLine("Final Logical Address: {0}", finalLogicalAddress);
+ Debug.WriteLine("Snapshot Final Logical Address: {0}", snapshotFinalLogicalAddress);
Debug.WriteLine("Head Address: {0}", headAddress);
Debug.WriteLine("Begin Address: {0}", beginAddress);
+ Debug.WriteLine("Delta Tail Address: {0}", deltaTailAddress);
Debug.WriteLine("Num sessions recovered: {0}", continueTokens.Count);
Debug.WriteLine("Recovered sessions: ");
foreach (var sessionInfo in continueTokens.Take(10))
@@ -473,7 +506,10 @@ internal struct HybridLogCheckpointInfo
public HybridLogRecoveryInfo info;
public IDevice snapshotFileDevice;
public IDevice snapshotFileObjectLogDevice;
+ public IDevice deltaFileDevice;
+ public DeltaLog deltaLog;
public SemaphoreSlim flushedSemaphore;
+ public int prevVersion;
public void Initialize(Guid token, int _version, ICheckpointManager checkpointManager)
{
@@ -481,17 +517,28 @@ public void Initialize(Guid token, int _version, ICheckpointManager checkpointMa
checkpointManager.InitializeLogCheckpoint(token);
}
- public void Recover(Guid token, ICheckpointManager checkpointManager)
+ public void Recover(Guid token, ICheckpointManager checkpointManager, int deltaLogPageSizeBits)
{
- info.Recover(token, checkpointManager);
+ deltaFileDevice = checkpointManager.GetDeltaLogDevice(token);
+ deltaFileDevice.Initialize(-1);
+ if (deltaFileDevice.GetFileSize(0) > 0)
+ {
+ deltaLog = new DeltaLog(deltaFileDevice, deltaLogPageSizeBits, -1);
+ deltaLog.InitializeForReads();
+ info.Recover(token, checkpointManager, deltaLog);
+ }
+ else
+ {
+ info.Recover(token, checkpointManager, null);
+ }
}
public void Reset()
{
flushedSemaphore = null;
info = default;
- if (snapshotFileDevice != null) snapshotFileDevice.Dispose();
- if (snapshotFileObjectLogDevice != null) snapshotFileObjectLogDevice.Dispose();
+ snapshotFileDevice?.Dispose();
+ snapshotFileObjectLogDevice?.Dispose();
}
public bool IsDefault()
diff --git a/cs/src/core/Index/Common/RecordInfo.cs b/cs/src/core/Index/Common/RecordInfo.cs
index 5802263aa..c7623a609 100644
--- a/cs/src/core/Index/Common/RecordInfo.cs
+++ b/cs/src/core/Index/Common/RecordInfo.cs
@@ -218,6 +218,7 @@ public int Version
{
return (int)(((word & kVersionMaskInWord) >> kVersionShiftInWord) & kVersionMaskInInteger);
}
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
set
{
word &= ~kVersionMaskInWord;
diff --git a/cs/src/core/Index/FASTER/FASTER.cs b/cs/src/core/Index/FASTER/FASTER.cs
index 15abb47cc..388c243b7 100644
--- a/cs/src/core/Index/FASTER/FASTER.cs
+++ b/cs/src/core/Index/FASTER/FASTER.cs
@@ -135,7 +135,7 @@ public FasterKV(long size, LogSettings logSettings,
new DeviceLogCommitCheckpointManager
(new LocalStorageNamedDeviceFactory(),
new DefaultCheckpointNamingScheme(
- new DirectoryInfo(checkpointSettings.CheckpointDir ?? ".").FullName));
+ new DirectoryInfo(checkpointSettings.CheckpointDir ?? ".").FullName), removeOutdated: checkpointSettings.RemoveOutdated);
}
if (checkpointSettings.CheckpointManager == null)
@@ -355,14 +355,23 @@ public bool TakeHybridLogCheckpoint(out Guid token)
///
/// Checkpoint token
/// Checkpoint type
+ /// For snapshot, try to store as incremental delta over last snapshot
/// Whether we could initiate the checkpoint. Use CompleteCheckpointAsync to wait completion.
- public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType)
+ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, bool tryIncremental = false)
{
+ if (tryIncremental && checkpointType != CheckpointType.Snapshot)
+ throw new FasterException("Can use tryIncremental only with snapshot checkpoints");
+
ISynchronizationTask backend;
if (checkpointType == CheckpointType.FoldOver)
backend = new FoldOverCheckpointTask();
else if (checkpointType == CheckpointType.Snapshot)
- backend = new SnapshotCheckpointTask();
+ {
+ if (tryIncremental && _lastSnapshotCheckpoint.info.guid != default && _lastSnapshotCheckpoint.info.finalLogicalAddress > hlog.FlushedUntilAddress)
+ backend = new IncrementalSnapshotCheckpointTask();
+ else
+ backend = new SnapshotCheckpointTask();
+ }
else
throw new FasterException("Unsupported checkpoint type");
@@ -375,6 +384,7 @@ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointTyp
/// Take log-only checkpoint
///
/// Checkpoint type
+ /// For snapshot, try to store as incremental delta over last snapshot
/// Cancellation token
///
/// (bool success, Guid token)
@@ -384,9 +394,9 @@ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointTyp
/// token: Token for taken checkpoint
/// Await task to complete checkpoint, if initiated successfully
///
- public async ValueTask<(bool success, Guid token)> TakeHybridLogCheckpointAsync(CheckpointType checkpointType, CancellationToken cancellationToken = default)
+ public async ValueTask<(bool success, Guid token)> TakeHybridLogCheckpointAsync(CheckpointType checkpointType, bool tryIncremental = false, CancellationToken cancellationToken = default)
{
- var success = TakeHybridLogCheckpoint(out Guid token, checkpointType);
+ var success = TakeHybridLogCheckpoint(out Guid token, checkpointType, tryIncremental);
if (success)
await CompleteCheckpointAsync(cancellationToken);
@@ -395,33 +405,33 @@ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointTyp
}
///
- /// Recover from the latest checkpoint (blocking operation)
+ /// Recover from the latest valid checkpoint (blocking operation)
///
/// Number of pages to preload into memory (beyond what needs to be read for recovery)
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
- public void Recover(int numPagesToPreload = -1, bool undoFutureVersions = true)
+ /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
+ public void Recover(int numPagesToPreload = -1, bool undoNextVersion = true)
{
- InternalRecoverFromLatestCheckpoints(numPagesToPreload, undoFutureVersions);
+ InternalRecoverFromLatestCheckpoints(numPagesToPreload, undoNextVersion);
}
///
- /// Asynchronously recover from the latest checkpoint (blocking operation)
+ /// Asynchronously recover from the latest valid checkpoint (blocking operation)
///
/// Number of pages to preload into memory (beyond what needs to be read for recovery)
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
+ /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
/// Cancellation token
- public ValueTask RecoverAsync(int numPagesToPreload = -1, bool undoFutureVersions = true, CancellationToken cancellationToken = default)
- => InternalRecoverFromLatestCheckpointsAsync(numPagesToPreload, undoFutureVersions, cancellationToken);
+ public ValueTask RecoverAsync(int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default)
+ => InternalRecoverFromLatestCheckpointsAsync(numPagesToPreload, undoNextVersion, cancellationToken);
///
/// Recover from specific token (blocking operation)
///
/// Token
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
- public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true)
+ /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
+ public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true)
{
- InternalRecover(fullCheckpointToken, fullCheckpointToken, numPagesToPreload, undoFutureVersions);
+ InternalRecover(fullCheckpointToken, fullCheckpointToken, numPagesToPreload, undoNextVersion);
}
///
@@ -429,10 +439,10 @@ public void Recover(Guid fullCheckpointToken, int numPagesToPreload = -1, bool u
///
/// Token
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
+ /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
/// Cancellation token
- public ValueTask RecoverAsync(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true, CancellationToken cancellationToken = default)
- => InternalRecoverAsync(fullCheckpointToken, fullCheckpointToken, numPagesToPreload, undoFutureVersions, cancellationToken);
+ public ValueTask RecoverAsync(Guid fullCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default)
+ => InternalRecoverAsync(fullCheckpointToken, fullCheckpointToken, numPagesToPreload, undoNextVersion, cancellationToken);
///
/// Recover from specific index and log token (blocking operation)
@@ -440,10 +450,10 @@ public ValueTask RecoverAsync(Guid fullCheckpointToken, int numPagesToPreload =
///
///
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
- public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true)
+ /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
+ public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true)
{
- InternalRecover(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload, undoFutureVersions);
+ InternalRecover(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload, undoNextVersion);
}
///
@@ -452,10 +462,10 @@ public void Recover(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, in
///
///
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
+ /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
/// Cancellation token
- public ValueTask RecoverAsync(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true, CancellationToken cancellationToken = default)
- => InternalRecoverAsync(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload, undoFutureVersions, cancellationToken);
+ public ValueTask RecoverAsync(Guid indexCheckpointToken, Guid hybridLogCheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default)
+ => InternalRecoverAsync(indexCheckpointToken, hybridLogCheckpointToken, numPagesToPreload, undoNextVersion, cancellationToken);
///
/// Wait for ongoing checkpoint to complete
@@ -701,6 +711,8 @@ public void Dispose()
Free();
hlog.Dispose();
readcache?.Dispose();
+ _lastSnapshotCheckpoint.deltaLog?.Dispose();
+ _lastSnapshotCheckpoint.deltaFileDevice?.Dispose();
if (disposeCheckpointManager)
checkpointManager?.Dispose();
}
diff --git a/cs/src/core/Index/FASTER/FASTERImpl.cs b/cs/src/core/Index/FASTER/FASTERImpl.cs
index 5214ac998..8e3faab97 100644
--- a/cs/src/core/Index/FASTER/FASTERImpl.cs
+++ b/cs/src/core/Index/FASTER/FASTERImpl.cs
@@ -360,7 +360,10 @@ internal OperationStatus InternalUpsert(
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
if (!recordInfo.Tombstone
&& fasterSession.ConcurrentWriter(ref key, ref value, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress))
+ {
+ hlog.MarkPage(logicalAddress, sessionCtx.version);
return OperationStatus.SUCCESS;
+ }
goto CreateNewRecord;
}
@@ -383,6 +386,8 @@ internal OperationStatus InternalUpsert(
if (!recordInfo.Tombstone
&& fasterSession.ConcurrentWriter(ref key, ref value, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress))
{
+ if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version);
+ else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version);
status = OperationStatus.SUCCESS;
goto LatchRelease; // Release shared latch (if acquired)
}
@@ -654,7 +659,10 @@ internal OperationStatus InternalRMW(
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
if (!recordInfo.Tombstone
&& fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress))
+ {
+ hlog.MarkPage(logicalAddress, sessionCtx.version);
return OperationStatus.SUCCESS;
+ }
goto CreateNewRecord;
}
@@ -683,6 +691,8 @@ internal OperationStatus InternalRMW(
if (fasterSession.InPlaceUpdater(ref key, ref input, ref hlog.GetValue(physicalAddress), ref recordInfo, logicalAddress))
{
+ if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version);
+ else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version);
status = OperationStatus.SUCCESS;
goto LatchRelease; // Release shared latch (if acquired)
}
@@ -1099,6 +1109,8 @@ internal OperationStatus InternalDelete(
ref RecordInfo recordInfo = ref hlog.GetInfo(physicalAddress);
ref Value value = ref hlog.GetValue(physicalAddress);
fasterSession.ConcurrentDeleter(ref hlog.GetKey(physicalAddress), ref value, ref recordInfo, logicalAddress);
+ if (sessionCtx.phase == Phase.REST) hlog.MarkPage(logicalAddress, sessionCtx.version);
+ else hlog.MarkPageAtomic(logicalAddress, sessionCtx.version);
if (WriteDefaultOnDelete)
value = default;
diff --git a/cs/src/core/Index/Interfaces/IFasterKV.cs b/cs/src/core/Index/Interfaces/IFasterKV.cs
index b52431b63..940e4afd4 100644
--- a/cs/src/core/Index/Interfaces/IFasterKV.cs
+++ b/cs/src/core/Index/Interfaces/IFasterKV.cs
@@ -134,53 +134,55 @@ public AdvancedClientSession
/// Token describing checkpoint
/// The checkpoint type to use (ignores the checkpoint type specified in the )
+ /// For snapshot, try to store as incremental delta over last snapshot
/// Whether we successfully initiated the checkpoint (initiation mayfail if we are already taking a checkpoint or performing some other
/// operation such as growing the index). Use CompleteCheckpointAsync to await completion.
- public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType);
+ public bool TakeHybridLogCheckpoint(out Guid token, CheckpointType checkpointType, bool tryIncremental = false);
///
/// Initiate checkpoint of FASTER log only (not index)
///
- /// A token to cancel the operation
/// The checkpoint type to use (ignores the checkpoint type specified in the )
+ /// For snapshot, try to store as incremental delta over last snapshot
+ /// A token to cancel the operation
/// A (bool success, Guid token) tuple.
/// success: Whether we successfully initiated the checkpoint (initiation may fail if we are already taking a checkpoint or performing some other
/// operation such as growing the index).
/// token: Token for taken checkpoint.
/// Await the task to complete checkpoint, if initiated successfully
- public ValueTask<(bool success, Guid token)> TakeHybridLogCheckpointAsync(CheckpointType checkpointType, CancellationToken cancellationToken = default);
+ public ValueTask<(bool success, Guid token)> TakeHybridLogCheckpointAsync(CheckpointType checkpointType, bool tryIncremental = false, CancellationToken cancellationToken = default);
///
/// Recover from last successful index and log checkpoints
///
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
- void Recover(int numPagesToPreload = -1, bool undoFutureVersions = true);
+ /// Whether records with version after checkpoint version need to be undone (and invalidated on log)
+ void Recover(int numPagesToPreload = -1, bool undoNextVersion = true);
///
/// Asynchronously recover from last successful index and log checkpoint
///
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
+ /// Whether records with version after checkpoint version need to be undone (and invalidated on log)
/// Cancellation token
- ValueTask RecoverAsync(int numPagesToPreload = -1, bool undoFutureVersions = true, CancellationToken cancellationToken = default);
+ ValueTask RecoverAsync(int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default);
///
/// Recover using full checkpoint token
///
///
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
- void Recover(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true);
+ /// Whether records with version after checkpoint version need to be undone (and invalidated on log)
+ void Recover(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true);
///
/// Asynchronously recover using full checkpoint token
///
///
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
+ /// Whether records with version after checkpoint version need to be undone (and invalidated on log)
/// Cancellation token
- ValueTask RecoverAsync(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoFutureVersions = true, CancellationToken cancellationToken = default);
+ ValueTask RecoverAsync(Guid fullcheckpointToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default);
///
/// Recover using a separate index and log checkpoint token
@@ -188,8 +190,8 @@ public AdvancedClientSession
///
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
- void Recover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoFutureVersions = true);
+ /// Whether records with version after checkpoint version need to be undone (and invalidated on log)
+ void Recover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoNextVersion = true);
///
/// Asynchronously recover using a separate index and log checkpoint token
@@ -197,9 +199,9 @@ public AdvancedClientSession
///
/// Number of pages to preload into memory after recovery
- /// Whether records with versions beyond checkpoint version need to be undone (and invalidated on log)
+ /// Whether records with version after checkpoint version need to be undone (and invalidated on log)
/// Cancellation token
- ValueTask RecoverAsync(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoFutureVersions = true, CancellationToken cancellationToken = default);
+ ValueTask RecoverAsync(Guid indexToken, Guid hybridLogToken, int numPagesToPreload = -1, bool undoNextVersion = true, CancellationToken cancellationToken = default);
///
/// Complete ongoing checkpoint (spin-wait)
diff --git a/cs/src/core/Index/Recovery/Checkpoint.cs b/cs/src/core/Index/Recovery/Checkpoint.cs
index bf7554480..09e1598d4 100644
--- a/cs/src/core/Index/Recovery/Checkpoint.cs
+++ b/cs/src/core/Index/Recovery/Checkpoint.cs
@@ -40,6 +40,7 @@ internal TaskCompletionSource checkpointTcs
internal Guid _indexCheckpointToken;
internal Guid _hybridLogCheckpointToken;
internal HybridLogCheckpointInfo _hybridLogCheckpoint;
+ internal HybridLogCheckpointInfo _lastSnapshotCheckpoint;
internal Task CheckpointTask => checkpointTcs.Task;
@@ -61,6 +62,11 @@ internal void WriteHybridLogMetaInfo()
checkpointManager.CommitLogCheckpoint(_hybridLogCheckpointToken, _hybridLogCheckpoint.info.ToByteArray());
}
+ internal void WriteHybridLogIncrementalMetaInfo(DeltaLog deltaLog)
+ {
+ checkpointManager.CommitLogIncrementalCheckpoint(_hybridLogCheckpointToken, _hybridLogCheckpoint.info.version, _hybridLogCheckpoint.info.ToByteArray(), deltaLog);
+ }
+
internal void WriteIndexMetaInfo()
{
checkpointManager.CommitIndexCheckpoint(_indexCheckpointToken, _indexCheckpoint.info.ToByteArray());
diff --git a/cs/src/core/Index/Recovery/DeltaLog.cs b/cs/src/core/Index/Recovery/DeltaLog.cs
new file mode 100644
index 000000000..535dd5dc7
--- /dev/null
+++ b/cs/src/core/Index/Recovery/DeltaLog.cs
@@ -0,0 +1,393 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System;
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace FASTER.core
+{
+ [StructLayout(LayoutKind.Explicit, Size = DeltaLog.HeaderSize)]
+ struct DeltalogHeader
+ {
+ [FieldOffset(0)]
+ public ulong Checksum;
+ [FieldOffset(8)]
+ public int Length;
+ [FieldOffset(12)]
+ public int Type;
+ }
+
+ ///
+ /// Scan iterator for hybrid log
+ ///
+ public sealed class DeltaLog : ScanIteratorBase, IDisposable
+ {
+ ///
+ /// Header size
+ ///
+ public const int HeaderSize = 16;
+
+ readonly IDevice deltaLogDevice;
+ readonly int LogPageSizeBits;
+ readonly int PageSize;
+ readonly int PageSizeMask;
+ readonly int AlignedPageSizeBytes;
+ readonly int sectorSize;
+ BlittableFrame frame;
+ bool disposed = false;
+
+ // Fields to support writes
+ SectorAlignedBufferPool memory;
+ long tailAddress;
+ long flushedUntilAddress;
+
+ SemaphoreSlim completedSemaphore;
+ int issuedFlush;
+ SectorAlignedMemory buffer;
+
+ ///
+ /// Tail address
+ ///
+ public long TailAddress => tailAddress;
+
+ ///
+ /// Constructor
+ ///
+ public DeltaLog(IDevice deltaLogDevice, int logPageSizeBits, long tailAddress)
+ : base(0, tailAddress >= 0 ? tailAddress : deltaLogDevice.GetFileSize(0), ScanBufferingMode.SinglePageBuffering, default, logPageSizeBits, false)
+ {
+ LogPageSizeBits = logPageSizeBits;
+ PageSize = 1 << LogPageSizeBits;
+ PageSizeMask = PageSize - 1;
+ this.deltaLogDevice = deltaLogDevice;
+ this.tailAddress = this.flushedUntilAddress = endAddress;
+ sectorSize = (int)deltaLogDevice.SectorSize;
+ AlignedPageSizeBytes = (int)Align(PageSize);
+ issuedFlush = 1;
+ completedSemaphore = new SemaphoreSlim(0);
+ }
+
+ ///
+ public override void InitializeForReads()
+ {
+ base.InitializeForReads();
+ if (frameSize > 0 && (endAddress > 0 || tailAddress > 0))
+ frame = new BlittableFrame(frameSize, 1 << LogPageSizeBits, sectorSize);
+ }
+
+ ///
+ /// Dispose the iterator
+ ///
+ public override void Dispose()
+ {
+ if (!disposed)
+ {
+ base.Dispose();
+
+ // Dispose/unpin the frame from memory
+ frame?.Dispose();
+ // Wait for ongoing page flushes
+ if (Interlocked.Decrement(ref issuedFlush) == 0)
+ completedSemaphore.Release();
+ completedSemaphore.Wait();
+ // Dispose flush buffer
+ buffer?.Dispose();
+ disposed = true;
+ }
+ }
+
+ internal override void AsyncReadPagesFromDeviceToFrame(long readPageStart, int numPages, long untilAddress, TContext context, out CountdownEvent completed, long devicePageOffset = 0, IDevice device = null, IDevice objectLogDevice = null, CancellationTokenSource cts = null)
+ {
+ IDevice usedDevice = deltaLogDevice;
+ completed = new CountdownEvent(numPages);
+ for (long readPage = readPageStart; readPage < (readPageStart + numPages); readPage++)
+ {
+ int pageIndex = (int)(readPage % frame.frameSize);
+ if (frame.frame[pageIndex] == null)
+ {
+ frame.Allocate(pageIndex);
+ }
+ else
+ {
+ frame.Clear(pageIndex);
+ }
+ var asyncResult = new PageAsyncReadResult()
+ {
+ page = readPage,
+ context = context,
+ handle = completed,
+ frame = frame
+ };
+
+ ulong offsetInFile = (ulong)(AlignedPageSizeBytes * readPage);
+
+ uint readLength = (uint)AlignedPageSizeBytes;
+ long adjustedUntilAddress = (AlignedPageSizeBytes * (untilAddress >> LogPageSizeBits) + (untilAddress & PageSizeMask));
+
+ if (adjustedUntilAddress > 0 && ((adjustedUntilAddress - (long)offsetInFile) < PageSize))
+ {
+ readLength = (uint)(adjustedUntilAddress - (long)offsetInFile);
+ readLength = (uint)(Align(readLength));
+ }
+
+ if (device != null)
+ offsetInFile = (ulong)(AlignedPageSizeBytes * (readPage - devicePageOffset));
+
+ usedDevice.ReadAsync(offsetInFile, (IntPtr)frame.pointers[pageIndex], readLength, AsyncReadPagesCallback, asyncResult);
+ }
+ }
+
+ private unsafe static ref DeltalogHeader GetHeader(long physicalAddress) => ref Unsafe.AsRef((void*)physicalAddress);
+
+ private unsafe void AsyncReadPagesCallback(uint errorCode, uint numBytes, object context)
+ {
+ try
+ {
+ var result = (PageAsyncReadResult)context;
+
+ if (errorCode != 0)
+ {
+ Trace.TraceError("AsyncReadPagesCallback error: {0}", errorCode);
+ result.cts?.Cancel();
+ }
+ Debug.Assert(result.freeBuffer1 == null);
+
+ if (errorCode == 0)
+ result.handle?.Signal();
+
+ Interlocked.MemoryBarrier();
+ }
+ catch when (disposed) { }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private long Align(long length)
+ {
+ return (length + sectorSize - 1) & ~(sectorSize - 1);
+ }
+
+ ///
+ /// Get next entry
+ ///
+ ///
+ ///
+ ///
+ ///
+ public unsafe bool GetNext(out long physicalAddress, out int entryLength, out int type)
+ {
+ while (true)
+ {
+ physicalAddress = 0;
+ entryLength = 0;
+ currentAddress = nextAddress;
+ type = 0;
+
+ var _currentPage = currentAddress >> LogPageSizeBits;
+ var _currentFrame = _currentPage % frameSize;
+ var _currentOffset = currentAddress & PageSizeMask;
+ var _headAddress = long.MaxValue;
+
+ if (disposed)
+ return false;
+
+ var _endAddress = endAddress;
+ if (tailAddress > _endAddress) _endAddress = tailAddress;
+
+ if (currentAddress >= _endAddress)
+ return false;
+
+
+ if (BufferAndLoad(currentAddress, _currentPage, _currentFrame, _headAddress, _endAddress))
+ continue;
+ physicalAddress = frame.GetPhysicalAddress(_currentFrame, _currentOffset);
+
+ // Get and check entry length
+ entryLength = GetHeader(physicalAddress).Length;
+ type = GetHeader(physicalAddress).Type;
+
+ if (entryLength == 0)
+ {
+ currentAddress = (1 + (currentAddress >> LogPageSizeBits)) << LogPageSizeBits;
+ if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _))
+ return false;
+ else
+ continue;
+ }
+
+ int recordSize = (int)(Align(_currentOffset + HeaderSize + entryLength) - _currentOffset);
+ if (entryLength < 0 || (_currentOffset + recordSize > PageSize))
+ {
+ currentAddress = (1 + (currentAddress >> LogPageSizeBits)) << LogPageSizeBits;
+ if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _))
+ return false;
+ else
+ continue;
+ }
+
+ // Verify checksum
+ if (!VerifyBlockChecksum((byte*)physicalAddress, entryLength))
+ {
+ currentAddress = (1 + (currentAddress >> LogPageSizeBits)) << LogPageSizeBits;
+ if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out _))
+ return false;
+ else
+ continue;
+ }
+ physicalAddress += HeaderSize;
+
+ if ((currentAddress & PageSizeMask) + recordSize == PageSize)
+ currentAddress = (1 + (currentAddress >> LogPageSizeBits)) << LogPageSizeBits;
+ else
+ currentAddress += recordSize;
+
+ if (Utility.MonotonicUpdate(ref nextAddress, currentAddress, out long oldCurrentAddress))
+ {
+ currentAddress = oldCurrentAddress;
+ return true;
+ }
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private unsafe static bool VerifyBlockChecksum(byte* ptr, int length)
+ {
+ var cs = Utility.XorBytes(ptr + 8, length + HeaderSize - 8);
+ if (cs != GetHeader((long)ptr).Checksum)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ private unsafe static void SetBlockHeader(int length, int type, byte* dest)
+ {
+ ref var header = ref GetHeader((long)dest);
+ header.Length = length;
+ header.Type = type;
+ header.Checksum = Utility.XorBytes(dest + 8, length + HeaderSize - 8);
+ }
+
+ ///
+ /// Initialize for writes
+ ///
+ ///
+ public void InitializeForWrites(SectorAlignedBufferPool memory)
+ {
+ this.memory = memory;
+ buffer = memory.Get(PageSize);
+ }
+
+ ///
+ /// Returns allocated region on delta log to write to
+ ///
+ /// Max usable size of allocated region
+ /// Address for caller to write to
+ public unsafe void Allocate(out int maxEntryLength, out long physicalAddress)
+ {
+ long pageEndAddress = (1 + (tailAddress >> LogPageSizeBits)) << LogPageSizeBits;
+ long dataStartAddress = tailAddress + HeaderSize;
+ maxEntryLength = (int)(pageEndAddress - dataStartAddress);
+ int offset = (int)(dataStartAddress & PageSizeMask);
+ physicalAddress = (long)buffer.aligned_pointer + offset;
+ }
+
+ ///
+ /// Seal allocated region for given size, write header, move tail address
+ ///
+ /// Entry length
+ /// Optional record type
+ public unsafe void Seal(int entryLength, int type = 0)
+ {
+ if (entryLength > 0)
+ {
+ int offset = (int)(tailAddress & PageSizeMask);
+ SetBlockHeader(entryLength, type, buffer.aligned_pointer + offset);
+
+ long oldTailAddress = tailAddress;
+ tailAddress += HeaderSize + entryLength;
+ tailAddress = Align(tailAddress);
+
+ long pageEndAddress = (1 + (tailAddress >> LogPageSizeBits)) << LogPageSizeBits;
+ if (tailAddress + HeaderSize >= pageEndAddress)
+ tailAddress = (1 + (tailAddress >> LogPageSizeBits)) << LogPageSizeBits;
+
+ if ((oldTailAddress >> LogPageSizeBits) < (tailAddress >> LogPageSizeBits))
+ FlushPage();
+ }
+ else
+ {
+ // Unable to use entry, skip to next page
+ tailAddress = (1 + (tailAddress >> LogPageSizeBits)) << LogPageSizeBits;
+ FlushPage();
+ }
+ }
+
+ private unsafe void FlushPage()
+ {
+ long pageStartAddress = tailAddress & ~PageSizeMask;
+ int offset = (int)(tailAddress & PageSizeMask);
+ if (offset == 0)
+ pageStartAddress = (tailAddress - 1) & ~PageSizeMask;
+ if (flushedUntilAddress > pageStartAddress)
+ pageStartAddress = flushedUntilAddress;
+ int startOffset = (int)(pageStartAddress & PageSizeMask);
+
+ var asyncResult = new PageAsyncFlushResult { count = 1, freeBuffer1 = buffer };
+ var alignedBlockSize = Align(tailAddress - pageStartAddress);
+ Interlocked.Increment(ref issuedFlush);
+ deltaLogDevice.WriteAsync((IntPtr)buffer.aligned_pointer + startOffset,
+ (ulong)pageStartAddress,
+ (uint)alignedBlockSize, AsyncFlushPageToDeviceCallback, asyncResult);
+ flushedUntilAddress = tailAddress;
+ buffer = memory.Get(PageSize);
+ }
+
+ ///
+ /// Flush
+ ///
+ ///
+ public async Task FlushAsync()
+ {
+ // Flush last page if needed
+ long pageStartAddress = tailAddress & ~PageSizeMask;
+ if (tailAddress > pageStartAddress)
+ FlushPage();
+ if (Interlocked.Decrement(ref issuedFlush) == 0)
+ completedSemaphore.Release();
+ await completedSemaphore.WaitAsync();
+ Interlocked.Increment(ref issuedFlush);
+ completedSemaphore = new SemaphoreSlim(0);
+ }
+
+ ///
+ /// IOCompletion callback for page flush
+ ///
+ ///
+ ///
+ ///
+ private void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, object context)
+ {
+ try
+ {
+ if (errorCode != 0)
+ {
+ Trace.TraceError("AsyncFlushPageToDeviceCallback error: {0}", errorCode);
+ }
+
+ PageAsyncFlushResult result = (PageAsyncFlushResult)context;
+ if (Interlocked.Decrement(ref result.count) == 0)
+ {
+ result.Free();
+ }
+ if (Interlocked.Decrement(ref issuedFlush) == 0)
+ completedSemaphore.Release();
+ }
+ catch when (disposed) { }
+ }
+ }
+}
diff --git a/cs/src/core/Index/Recovery/DirectoryConfiguration.cs b/cs/src/core/Index/Recovery/DirectoryConfiguration.cs
index 52987a5ae..34429fd58 100644
--- a/cs/src/core/Index/Recovery/DirectoryConfiguration.cs
+++ b/cs/src/core/Index/Recovery/DirectoryConfiguration.cs
@@ -20,6 +20,7 @@ public DirectoryConfiguration(string checkpointDir)
public const string hash_table_file = "ht";
public const string overflow_buckets_file = "ofb";
public const string snapshot_file = "snapshot";
+ public const string delta_file = "delta";
public const string cpr_base_folder = "cpr-checkpoints";
public const string cpr_meta_file = "info";
@@ -112,9 +113,14 @@ public string GetObjectLogSnapshotFileName(Guid token)
return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString(), snapshot_file, ".obj.dat");
}
- private static string GetMergedFolderPath(params String[] paths)
+ public string GetDeltaLogFileName(Guid token)
{
- String fullPath = paths[0];
+ return GetMergedFolderPath(checkpointDir, cpr_base_folder, token.ToString(), delta_file, ".dat");
+ }
+
+ private static string GetMergedFolderPath(params string[] paths)
+ {
+ string fullPath = paths[0];
for (int i = 1; i < paths.Length; i++)
{
diff --git a/cs/src/core/Index/Recovery/ICheckpointManager.cs b/cs/src/core/Index/Recovery/ICheckpointManager.cs
index 2619bed7b..685c9a500 100644
--- a/cs/src/core/Index/Recovery/ICheckpointManager.cs
+++ b/cs/src/core/Index/Recovery/ICheckpointManager.cs
@@ -58,6 +58,15 @@ public interface ICheckpointManager : IDisposable
///
void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata);
+ ///
+ /// Commit log incremental checkpoint (incremental snapshot)
+ ///
+ ///
+ ///
+ ///
+ ///
+ void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog);
+
///
/// Retrieve commit metadata for specified index checkpoint
///
@@ -69,8 +78,9 @@ public interface ICheckpointManager : IDisposable
/// Retrieve commit metadata for specified log checkpoint
///
/// Token
+ /// Delta log
/// Metadata, or null if invalid
- byte[] GetLogCheckpointMetadata(Guid logToken);
+ byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog);
///
/// Get list of index checkpoint tokens, in order of usage preference
@@ -106,9 +116,23 @@ public interface ICheckpointManager : IDisposable
///
IDevice GetSnapshotObjectLogDevice(Guid token);
+ ///
+ /// Provide device to store incremental (delta) snapshot of log (required only for incremental snapshot checkpoints)
+ ///
+ ///
+ ///
+ IDevice GetDeltaLogDevice(Guid token);
+
///
/// Cleanup all data (subfolder) related to checkpoints by this manager
///
public void PurgeAll();
+
+ ///
+ /// Initiatize manager on recovery (e.g., deleting other checkpoints)
+ ///
+ ///
+ ///
+ public void OnRecovery(Guid indexToken, Guid logToken);
}
}
\ No newline at end of file
diff --git a/cs/src/core/Index/Recovery/LocalCheckpointManager.cs b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs
index 103dbbe4a..4f385816a 100644
--- a/cs/src/core/Index/Recovery/LocalCheckpointManager.cs
+++ b/cs/src/core/Index/Recovery/LocalCheckpointManager.cs
@@ -119,9 +119,31 @@ public byte[] GetIndexCheckpointMetadata(Guid indexToken)
/// Retrieve commit metadata for specified log checkpoint
///
/// Token
+ /// Delta log
/// Metadata, or null if invalid
- public byte[] GetLogCheckpointMetadata(Guid logToken)
+ public byte[] GetLogCheckpointMetadata(Guid logToken, DeltaLog deltaLog)
{
+ byte[] metadata = null;
+ if (deltaLog != null)
+ {
+ // Get latest valid metadata from delta-log
+ deltaLog.Reset();
+ while (deltaLog.GetNext(out long physicalAddress, out int entryLength, out int type))
+ {
+ if (type != 1) continue; // consider only metadata records
+ long endAddress = physicalAddress + entryLength;
+ metadata = new byte[entryLength];
+ unsafe
+ {
+ fixed (byte* m = metadata)
+ {
+ Buffer.MemoryCopy((void*)physicalAddress, m, entryLength, entryLength);
+ }
+ }
+ }
+ if (metadata != null) return metadata;
+ }
+
var dir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder(logToken));
if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"))
return null;
@@ -164,6 +186,16 @@ public IDevice GetSnapshotObjectLogDevice(Guid token)
return Devices.CreateLogDevice(directoryConfiguration.GetObjectLogSnapshotFileName(token), false);
}
+ ///
+ /// Provide device to store delta log for incremental snapshot checkpoints
+ ///
+ ///
+ ///
+ public IDevice GetDeltaLogDevice(Guid token)
+ {
+ return Devices.CreateLogDevice(directoryConfiguration.GetDeltaLogFileName(token), false);
+ }
+
///
public IEnumerable GetIndexCheckpointTokens()
{
@@ -224,5 +256,31 @@ public IEnumerable GetLogCheckpointTokens()
public void Dispose()
{
}
+
+ ///
+ public unsafe void CommitLogIncrementalCheckpoint(Guid logToken, int version, byte[] commitMetadata, DeltaLog deltaLog)
+ {
+ deltaLog.Allocate(out int length, out long physicalAddress);
+ if (length < commitMetadata.Length)
+ {
+ deltaLog.Seal(0, type: 1);
+ deltaLog.Allocate(out length, out physicalAddress);
+ if (length < commitMetadata.Length)
+ {
+ deltaLog.Seal(0);
+ throw new Exception($"Metadata of size {commitMetadata.Length} does not fit in delta log space of size {length}");
+ }
+ }
+ fixed (byte* ptr = commitMetadata)
+ {
+ Buffer.MemoryCopy(ptr, (void*)physicalAddress, commitMetadata.Length, commitMetadata.Length);
+ }
+ deltaLog.Seal(commitMetadata.Length, type: 1);
+ deltaLog.FlushAsync().Wait();
+ }
+
+ public void OnRecovery(Guid indexToken, Guid logToken)
+ {
+ }
}
}
\ No newline at end of file
diff --git a/cs/src/core/Index/Recovery/Recovery.cs b/cs/src/core/Index/Recovery/Recovery.cs
index 9cffbaa06..93785f40e 100644
--- a/cs/src/core/Index/Recovery/Recovery.cs
+++ b/cs/src/core/Index/Recovery/Recovery.cs
@@ -19,6 +19,7 @@ internal class RecoveryStatus
{
public long startPage;
public long endPage;
+ public long snapshotEndPage;
public long untilAddress;
public int capacity;
public CheckpointType checkpointType;
@@ -26,6 +27,7 @@ internal class RecoveryStatus
public IDevice recoveryDevice;
public long recoveryDevicePageOffset;
public IDevice objectLogRecoveryDevice;
+ public IDevice deltaRecoveryDevice;
// These are circular buffers of 'capacity' size; the indexing wraps due to hlog.GetPageIndexForPage().
public ReadStatus[] readStatus;
@@ -99,21 +101,22 @@ internal void Dispose()
{
recoveryDevice.Dispose();
objectLogRecoveryDevice.Dispose();
+ deltaRecoveryDevice?.Dispose();
}
}
public partial class FasterKV : FasterBase, IFasterKV
{
- private void InternalRecoverFromLatestCheckpoints(int numPagesToPreload, bool undoFutureVersions)
+ private void InternalRecoverFromLatestCheckpoints(int numPagesToPreload, bool undoNextVersion)
{
GetRecoveryInfoFromLatestCheckpoints(out HybridLogCheckpointInfo recoveredHLCInfo, out IndexCheckpointInfo recoveredICInfo);
- InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoFutureVersions);
+ InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoNextVersion);
}
- private ValueTask InternalRecoverFromLatestCheckpointsAsync(int numPagesToPreload, bool undoFutureVersions, CancellationToken cancellationToken)
+ private ValueTask InternalRecoverFromLatestCheckpointsAsync(int numPagesToPreload, bool undoNextVersion, CancellationToken cancellationToken)
{
GetRecoveryInfoFromLatestCheckpoints(out HybridLogCheckpointInfo recoveredHLCInfo, out IndexCheckpointInfo recoveredICInfo);
- return InternalRecoverAsync(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoFutureVersions, cancellationToken);
+ return InternalRecoverAsync(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoNextVersion, cancellationToken);
}
private void GetRecoveryInfoFromLatestCheckpoints(out HybridLogCheckpointInfo recoveredHLCInfo, out IndexCheckpointInfo recoveredICInfo)
@@ -126,7 +129,7 @@ private void GetRecoveryInfoFromLatestCheckpoints(out HybridLogCheckpointInfo re
try
{
recoveredHLCInfo = new HybridLogCheckpointInfo();
- recoveredHLCInfo.Recover(hybridLogToken, checkpointManager);
+ recoveredHLCInfo.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits);
}
catch
{
@@ -180,16 +183,16 @@ private bool IsCompatible(in IndexRecoveryInfo indexInfo, in HybridLogRecoveryIn
return l1 <= l2;
}
- private void InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoFutureVersions)
+ private void InternalRecover(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoNextVersion)
{
GetRecoveryInfo(indexToken, hybridLogToken, out HybridLogCheckpointInfo recoveredHLCInfo, out IndexCheckpointInfo recoveredICInfo);
- InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoFutureVersions);
+ InternalRecover(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoNextVersion);
}
- private ValueTask InternalRecoverAsync(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoFutureVersions, CancellationToken cancellationToken)
+ private ValueTask InternalRecoverAsync(Guid indexToken, Guid hybridLogToken, int numPagesToPreload, bool undoNextVersion, CancellationToken cancellationToken)
{
GetRecoveryInfo(indexToken, hybridLogToken, out HybridLogCheckpointInfo recoveredHLCInfo, out IndexCheckpointInfo recoveredICInfo);
- return InternalRecoverAsync(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoFutureVersions, cancellationToken);
+ return InternalRecoverAsync(recoveredICInfo, recoveredHLCInfo, numPagesToPreload, undoNextVersion, cancellationToken);
}
private void GetRecoveryInfo(Guid indexToken, Guid hybridLogToken, out HybridLogCheckpointInfo recoveredHLCInfo, out IndexCheckpointInfo recoveredICInfo)
@@ -201,7 +204,7 @@ private void GetRecoveryInfo(Guid indexToken, Guid hybridLogToken, out HybridLog
// Recovery appropriate context information
recoveredHLCInfo = new HybridLogCheckpointInfo();
- recoveredHLCInfo.Recover(hybridLogToken, checkpointManager);
+ recoveredHLCInfo.Recover(hybridLogToken, checkpointManager, hlog.LogPageSizeBits);
recoveredHLCInfo.info.DebugPrint();
try
{
@@ -231,7 +234,7 @@ private void GetRecoveryInfo(Guid indexToken, Guid hybridLogToken, out HybridLog
}
}
- private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoFutureVersions)
+ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoNextVersion)
{
if (!RecoverToInitialPage(recoveredICInfo, recoveredHLCInfo, out long recoverFromAddress))
RecoverFuzzyIndex(recoveredICInfo);
@@ -239,11 +242,12 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
if (!SetRecoveryPageRanges(recoveredHLCInfo, numPagesToPreload, recoverFromAddress, out long tailAddress, out long headAddress, out long scanFromAddress))
return;
+ long readOnlyAddress;
// Make index consistent for version v
if (recoveredHLCInfo.info.useSnapshotFile == 0)
{
- RecoverHybridLog(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, CheckpointType.FoldOver, undoFutureVersions);
- hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, tailAddress);
+ RecoverHybridLog(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.nextVersion, CheckpointType.FoldOver, undoNextVersion);
+ readOnlyAddress = tailAddress;
}
else
{
@@ -251,17 +255,31 @@ private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheck
headAddress = recoveredHLCInfo.info.flushedLogicalAddress;
// First recover from index starting point (fromAddress) to snapshot starting point (flushedLogicalAddress)
- RecoverHybridLog(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.version, CheckpointType.Snapshot, undoFutureVersions);
+ RecoverHybridLog(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.nextVersion, CheckpointType.Snapshot, undoNextVersion);
// Then recover snapshot into mutable region
- RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, recoveredHLCInfo.info.guid, undoFutureVersions);
- hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, recoveredHLCInfo.info.flushedLogicalAddress);
+ RecoverHybridLogFromSnapshotFile(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.startLogicalAddress, recoveredHLCInfo.info.snapshotFinalLogicalAddress, recoveredHLCInfo.info.nextVersion, recoveredHLCInfo.info.guid, undoNextVersion, recoveredHLCInfo.deltaLog);
+
+ readOnlyAddress = recoveredHLCInfo.info.flushedLogicalAddress;
}
+ // Adjust head and read-only address post-recovery
+ var _head = (1 + (tailAddress >> hlog.LogPageSizeBits) - hlog.GetCapacityNumPages()) << hlog.LogPageSizeBits;
+ if (_head > headAddress)
+ headAddress = _head;
+ if (readOnlyAddress < headAddress)
+ readOnlyAddress = headAddress;
+
// Recover session information
+ hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress);
_recoveredSessions = recoveredHLCInfo.info.continueTokens;
+
+ recoveredHLCInfo.deltaLog?.Dispose();
+ recoveredHLCInfo.deltaFileDevice?.Dispose();
+
+ checkpointManager.OnRecovery(recoveredICInfo.info.token, recoveredHLCInfo.info.guid);
}
- private async ValueTask InternalRecoverAsync(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoFutureVersions, CancellationToken cancellationToken)
+ private async ValueTask InternalRecoverAsync(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo, int numPagesToPreload, bool undoNextVersion, CancellationToken cancellationToken)
{
if (!RecoverToInitialPage(recoveredICInfo, recoveredHLCInfo, out long recoverFromAddress))
await RecoverFuzzyIndexAsync(recoveredICInfo, cancellationToken);
@@ -269,11 +287,12 @@ private async ValueTask InternalRecoverAsync(IndexCheckpointInfo recoveredICInfo
if (!SetRecoveryPageRanges(recoveredHLCInfo, numPagesToPreload, recoverFromAddress, out long tailAddress, out long headAddress, out long scanFromAddress))
return;
+ long readOnlyAddress;
// Make index consistent for version v
if (recoveredHLCInfo.info.useSnapshotFile == 0)
{
- await RecoverHybridLogAsync(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, CheckpointType.FoldOver, undoFutureVersions, cancellationToken);
- hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, tailAddress);
+ await RecoverHybridLogAsync(scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.nextVersion, CheckpointType.FoldOver, undoNextVersion, cancellationToken);
+ readOnlyAddress = tailAddress;
}
else
{
@@ -281,14 +300,26 @@ private async ValueTask InternalRecoverAsync(IndexCheckpointInfo recoveredICInfo
headAddress = recoveredHLCInfo.info.flushedLogicalAddress;
// First recover from index starting point (fromAddress) to snapshot starting point (flushedLogicalAddress)
- await RecoverHybridLogAsync (scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.version, CheckpointType.Snapshot, undoFutureVersions, cancellationToken);
+ await RecoverHybridLogAsync (scanFromAddress, recoverFromAddress, recoveredHLCInfo.info.flushedLogicalAddress, recoveredHLCInfo.info.nextVersion, CheckpointType.Snapshot, undoNextVersion, cancellationToken);
// Then recover snapshot into mutable region
- await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.version, recoveredHLCInfo.info.guid, undoFutureVersions, cancellationToken);
- hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, recoveredHLCInfo.info.flushedLogicalAddress);
+ await RecoverHybridLogFromSnapshotFileAsync(recoveredHLCInfo.info.flushedLogicalAddress, recoverFromAddress, recoveredHLCInfo.info.finalLogicalAddress, recoveredHLCInfo.info.startLogicalAddress, recoveredHLCInfo.info.snapshotFinalLogicalAddress, recoveredHLCInfo.info.nextVersion, recoveredHLCInfo.info.guid, undoNextVersion, recoveredHLCInfo.deltaLog, cancellationToken);
+
+ readOnlyAddress = recoveredHLCInfo.info.flushedLogicalAddress;
}
+ // Adjust head and read-only address post-recovery
+ var _head = (1 + (tailAddress >> hlog.LogPageSizeBits) - hlog.GetCapacityNumPages()) << hlog.LogPageSizeBits;
+ if (_head > headAddress)
+ headAddress = _head;
+ if (readOnlyAddress < headAddress)
+ readOnlyAddress = headAddress;
+
// Recover session information
+ hlog.RecoveryReset(tailAddress, headAddress, recoveredHLCInfo.info.beginAddress, readOnlyAddress);
_recoveredSessions = recoveredHLCInfo.info.continueTokens;
+
+ recoveredHLCInfo.deltaLog?.Dispose();
+ recoveredHLCInfo.deltaFileDevice?.Dispose();
}
///
@@ -383,7 +414,7 @@ private bool SetRecoveryPageRanges(HybridLogCheckpointInfo recoveredHLCInfo, int
return true;
}
- private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, int version, CheckpointType checkpointType, bool undoFutureVersions)
+ private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, long untilAddress, int nextVersion, CheckpointType checkpointType, bool undoNextVersion)
{
if (untilAddress <= scanFromAddress)
return;
@@ -398,7 +429,7 @@ private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon
int pageIndex = hlog.GetPageIndexForPage(page);
recoveryStatus.WaitRead(pageIndex);
- if (ProcessReadPage(recoverFromAddress, untilAddress, version, undoFutureVersions, recoveryStatus, page, pageIndex))
+ if (ProcessReadPage(recoverFromAddress, untilAddress, nextVersion, undoNextVersion, recoveryStatus, page, pageIndex))
{
// Page was modified due to undoFutureVersion. Flush it to disk; the callback issues the after-capacity read request if necessary.
hlog.AsyncFlushPages(page, 1, AsyncFlushPageCallbackForRecovery, recoveryStatus);
@@ -419,9 +450,9 @@ private void RecoverHybridLog(long scanFromAddress, long recoverFromAddress, lon
WaitUntilAllPagesHaveBeenFlushed(startPage, endPage, recoveryStatus);
}
- private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, int version, CheckpointType checkpointType, bool undoFutureVersions, CancellationToken cancellationToken)
+ private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, int nextVersion, CheckpointType checkpointType, bool undoNextVersion, CancellationToken cancellationToken)
{
- if (untilAddress < scanFromAddress)
+ if (untilAddress <= scanFromAddress)
return;
var recoveryStatus = GetPageRangesToRead(scanFromAddress, untilAddress, checkpointType, out long startPage, out long endPage, out int capacity, out int numPagesToReadFirst);
@@ -434,7 +465,7 @@ private async ValueTask RecoverHybridLogAsync(long scanFromAddress, long recover
int pageIndex = hlog.GetPageIndexForPage(page);
await recoveryStatus.WaitReadAsync(pageIndex, cancellationToken);
- if (ProcessReadPage(recoverFromAddress, untilAddress, version, undoFutureVersions, recoveryStatus, page, pageIndex))
+ if (ProcessReadPage(recoverFromAddress, untilAddress, nextVersion, undoNextVersion, recoveryStatus, page, pageIndex))
{
// Page was modified due to undoFutureVersion. Flush it to disk; the callback issues the after-capacity read request if necessary.
hlog.AsyncFlushPages(page, 1, AsyncFlushPageCallbackForRecovery, recoveryStatus);
@@ -470,7 +501,7 @@ private RecoveryStatus GetPageRangesToRead(long scanFromAddress, long untilAddre
return new RecoveryStatus(capacity, startPage, endPage, untilAddress, checkpointType);
}
- private bool ProcessReadPage(long recoverFromAddress, long untilAddress, int version, bool undoFutureVersions, RecoveryStatus recoveryStatus, long page, int pageIndex)
+ private bool ProcessReadPage(long recoverFromAddress, long untilAddress, int nextVersion, bool undoNextVersion, RecoveryStatus recoveryStatus, long page, int pageIndex)
{
var startLogicalAddress = hlog.GetStartLogicalAddress(page);
var endLogicalAddress = hlog.GetStartLogicalAddress(page + 1);
@@ -486,7 +517,7 @@ private bool ProcessReadPage(long recoverFromAddress, long untilAddress, int ver
pageUntilAddress = hlog.GetOffsetInPage(untilAddress);
var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress);
- if (RecoverFromPage(recoverFromAddress, pageFromAddress, pageUntilAddress, startLogicalAddress, physicalAddress, version, undoFutureVersions))
+ if (RecoverFromPage(recoverFromAddress, pageFromAddress, pageUntilAddress, startLogicalAddress, physicalAddress, nextVersion, undoNextVersion))
{
// The current page was modified due to undoFutureVersion; caller will flush it to storage and issue a read request if necessary.
recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
@@ -510,31 +541,56 @@ private async ValueTask WaitUntilAllPagesHaveBeenFlushedAsync(long startPage, lo
await recoveryStatus.WaitFlushAsync(hlog.GetPageIndexForPage(page), cancellationToken);
}
- private void RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recoverFromAddress, long untilAddress, int version, Guid guid, bool undoFutureVersions)
+ private void RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, int nextVersion, Guid guid, bool undoNextVersion, DeltaLog deltaLog)
{
- GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, guid, out long startPage, out long endPage, out int capacity, out var recoveryStatus, out int numPagesToReadFirst);
+ GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadFirst);
- hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress,
+ hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, snapshotEndAddress,
hlog.AsyncReadPagesCallbackForRecovery,
recoveryStatus, recoveryStatus.recoveryDevicePageOffset,
recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice);
- for (long page = startPage; page < endPage; page++)
+ for (long page = startPage; page < endPage; page += capacity)
{
- // Ensure the page is read from file or flushed
- int pageIndex = hlog.GetPageIndexForPage(page);
- recoveryStatus.WaitRead(pageIndex);
+ long end = Math.Min(page + capacity, endPage);
+ for (long p = page; p < end; p++)
+ {
+ int pageIndex = hlog.GetPageIndexForPage(p);
+ if (p < snapshotEndPage)
+ {
+ // Ensure the page is read from file
+ recoveryStatus.WaitRead(pageIndex);
+ }
+ else
+ {
+ recoveryStatus.WaitFlush(pageIndex);
+ if (!hlog.IsAllocated(pageIndex))
+ hlog.AllocatePage(pageIndex);
+ else
+ hlog.ClearPage(pageIndex);
+ }
+ }
- if (recoverFromAddress < hlog.GetStartLogicalAddress(page+1))
- ProcessReadSnapshotPage(scanFromAddress, untilAddress, version, undoFutureVersions, recoveryStatus, page, pageIndex);
+ // Apply delta
+ hlog.ApplyDelta(deltaLog, page, end);
- // Issue next read
- if (page + capacity < endPage)
+ for (long p = page; p < end; p++)
{
- recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
- hlog.AsyncReadPagesFromDevice(page + capacity, 1, untilAddress, hlog.AsyncReadPagesCallbackForRecovery,
- recoveryStatus, recoveryStatus.recoveryDevicePageOffset,
- recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice);
+ int pageIndex = hlog.GetPageIndexForPage(p);
+
+ if (recoverFromAddress < hlog.GetStartLogicalAddress(p + 1) && recoverFromAddress < untilAddress)
+ ProcessReadSnapshotPage(scanFromAddress, untilAddress, nextVersion, undoNextVersion, recoveryStatus, p, pageIndex);
+
+ // Issue next read
+ if (p + capacity < endPage)
+ {
+ // Flush snapshot page to main log
+ // Flush callback will issue further reads or page clears
+ recoveryStatus.flushStatus[pageIndex] = FlushStatus.Pending;
+ if (p + capacity < snapshotEndPage)
+ recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
+ hlog.AsyncFlushPages(p, 1, AsyncFlushPageCallbackForRecovery, recoveryStatus);
+ }
}
}
@@ -542,31 +598,56 @@ private void RecoverHybridLogFromSnapshotFile(long scanFromAddress, long recover
recoveryStatus.Dispose();
}
- private async ValueTask RecoverHybridLogFromSnapshotFileAsync(long fromAddress, long recoverFromAddress, long untilAddress, int version, Guid guid, bool undoFutureVersions, CancellationToken cancellationToken)
+ private async ValueTask RecoverHybridLogFromSnapshotFileAsync(long scanFromAddress, long recoverFromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, int nextVersion, Guid guid, bool undoNextVersion, DeltaLog deltaLog, CancellationToken cancellationToken)
{
- GetSnapshotPageRangesToRead(fromAddress, untilAddress, guid, out long startPage, out long endPage, out int capacity, out var recoveryStatus, out int numPagesToReadFirst);
+ GetSnapshotPageRangesToRead(scanFromAddress, untilAddress, snapshotStartAddress, snapshotEndAddress, guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity, out var recoveryStatus, out int numPagesToReadFirst);
- hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, untilAddress,
+ hlog.AsyncReadPagesFromDevice(startPage, numPagesToReadFirst, snapshotEndAddress,
hlog.AsyncReadPagesCallbackForRecovery,
recoveryStatus, recoveryStatus.recoveryDevicePageOffset,
recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice);
- for (long page = startPage; page < endPage; page++)
+ for (long page = startPage; page < endPage; page += capacity)
{
- // Ensure the page is read from file or flushed
- int pageIndex = hlog.GetPageIndexForPage(page);
- await recoveryStatus.WaitReadAsync(pageIndex, cancellationToken);
+ long end = Math.Min(page + capacity, endPage);
+ for (long p = page; p < end; p++)
+ {
+ int pageIndex = hlog.GetPageIndexForPage(p);
+ if (p < snapshotEndPage)
+ {
+ // Ensure the page is read from file
+ await recoveryStatus.WaitReadAsync(pageIndex, cancellationToken);
+ }
+ else
+ {
+ await recoveryStatus.WaitFlushAsync(pageIndex, cancellationToken);
+ if (!hlog.IsAllocated(pageIndex))
+ hlog.AllocatePage(pageIndex);
+ else
+ hlog.ClearPage(pageIndex);
+ }
+ }
- if (recoverFromAddress < hlog.GetStartLogicalAddress(page + 1))
- ProcessReadSnapshotPage(fromAddress, untilAddress, version, undoFutureVersions, recoveryStatus, page, pageIndex);
+ // Apply delta
+ hlog.ApplyDelta(deltaLog, page, end);
- // Issue next read
- if (page + capacity < endPage)
+ for (long p = page; p < end; p++)
{
- recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
- hlog.AsyncReadPagesFromDevice(page + capacity, 1, untilAddress, hlog.AsyncReadPagesCallbackForRecovery,
- recoveryStatus, recoveryStatus.recoveryDevicePageOffset,
- recoveryStatus.recoveryDevice, recoveryStatus.objectLogRecoveryDevice);
+ int pageIndex = hlog.GetPageIndexForPage(p);
+
+ if (recoverFromAddress < hlog.GetStartLogicalAddress(p + 1) && recoverFromAddress < untilAddress)
+ ProcessReadSnapshotPage(scanFromAddress, untilAddress, nextVersion, undoNextVersion, recoveryStatus, p, pageIndex);
+
+ // Issue next read
+ if (p + capacity < endPage)
+ {
+ // Flush snapshot page to main log
+ // Flush callback will issue further reads or page clears
+ recoveryStatus.flushStatus[pageIndex] = FlushStatus.Pending;
+ if (p + capacity < snapshotEndPage)
+ recoveryStatus.readStatus[pageIndex] = ReadStatus.Pending;
+ hlog.AsyncFlushPages(p, 1, AsyncFlushPageCallbackForRecovery, recoveryStatus);
+ }
}
}
@@ -574,36 +655,43 @@ private async ValueTask RecoverHybridLogFromSnapshotFileAsync(long fromAddress,
recoveryStatus.Dispose();
}
- private void GetSnapshotPageRangesToRead(long fromAddress, long untilAddress, Guid guid, out long startPage, out long endPage, out int capacity,
+ private void GetSnapshotPageRangesToRead(long fromAddress, long untilAddress, long snapshotStartAddress, long snapshotEndAddress, Guid guid, out long startPage, out long endPage, out long snapshotEndPage, out int capacity,
out RecoveryStatus recoveryStatus, out int numPagesToReadFirst)
{
// Compute startPage and endPage
startPage = hlog.GetPage(fromAddress);
endPage = hlog.GetPage(untilAddress);
if (untilAddress > hlog.GetStartLogicalAddress(endPage))
- {
endPage++;
- }
+ long snapshotStartPage = hlog.GetPage(snapshotStartAddress);
+ snapshotEndPage = hlog.GetPage(snapshotEndAddress);
+ if (snapshotEndAddress > hlog.GetStartLogicalAddress(snapshotEndPage))
+ snapshotEndPage++;
// By default first page has one extra record
capacity = hlog.GetCapacityNumPages();
var recoveryDevice = checkpointManager.GetSnapshotLogDevice(guid);
var objectLogRecoveryDevice = checkpointManager.GetSnapshotObjectLogDevice(guid);
+ var deltaRecoveryDevice = checkpointManager.GetDeltaLogDevice(guid);
+
recoveryDevice.Initialize(hlog.GetSegmentSize());
objectLogRecoveryDevice.Initialize(-1);
+ deltaRecoveryDevice.Initialize(-1);
recoveryStatus = new RecoveryStatus(capacity, startPage, endPage, untilAddress, CheckpointType.Snapshot)
{
recoveryDevice = recoveryDevice,
objectLogRecoveryDevice = objectLogRecoveryDevice,
- recoveryDevicePageOffset = startPage
+ deltaRecoveryDevice = deltaRecoveryDevice,
+ recoveryDevicePageOffset = snapshotStartPage,
+ snapshotEndPage = snapshotEndPage
};
// Initially issue read request for all pages that can be held in memory
- int totalPagesToRead = (int)(endPage - startPage);
+ int totalPagesToRead = (int)(snapshotEndPage - startPage);
numPagesToReadFirst = Math.Min(capacity, totalPagesToRead);
}
- private void ProcessReadSnapshotPage(long fromAddress, long untilAddress, int version, bool undoFutureVersions, RecoveryStatus recoveryStatus, long page, int pageIndex)
+ private void ProcessReadSnapshotPage(long fromAddress, long untilAddress, int nextVersion, bool undoNextVersion, RecoveryStatus recoveryStatus, long page, int pageIndex)
{
// Page at hand
var startLogicalAddress = hlog.GetStartLogicalAddress(page);
@@ -629,7 +717,7 @@ private void ProcessReadSnapshotPage(long fromAddress, long untilAddress, int ve
var physicalAddress = hlog.GetPhysicalAddress(startLogicalAddress);
RecoverFromPage(fromAddress, pageFromAddress, pageUntilAddress,
- startLogicalAddress, physicalAddress, version, undoFutureVersions);
+ startLogicalAddress, physicalAddress, nextVersion, undoNextVersion);
}
recoveryStatus.flushStatus[pageIndex] = FlushStatus.Done;
@@ -640,7 +728,7 @@ private unsafe bool RecoverFromPage(long startRecoveryAddress,
long untilLogicalAddressInPage,
long pageLogicalAddress,
long pagePhysicalAddress,
- int version, bool undoFutureVersions)
+ int nextVersion, bool undoNextVersion)
{
bool touched = false;
@@ -672,7 +760,7 @@ private unsafe bool RecoverFromPage(long startRecoveryAddress,
entry = default;
FindOrCreateTag(hash, tag, ref bucket, ref slot, ref entry, hlog.BeginAddress);
- if (info.Version <= version || !undoFutureVersions)
+ if (info.Version != RecordInfo.GetShortVersion(nextVersion) || !undoNextVersion)
{
entry.Address = pageLogicalAddress + pointer;
entry.Tag = tag;
@@ -724,10 +812,14 @@ private void AsyncFlushPageCallbackForRecovery(uint errorCode, uint numBytes, ob
}
else
{
- hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, hlog.AsyncReadPagesCallbackForRecovery,
+ if (readPage < result.context.snapshotEndPage)
+ {
+ // If next page is in snapshot, issue retrieval for it
+ hlog.AsyncReadPagesFromDevice(readPage, 1, result.context.untilAddress, hlog.AsyncReadPagesCallbackForRecovery,
result.context,
result.context.recoveryDevicePageOffset,
result.context.recoveryDevice, result.context.objectLogRecoveryDevice);
+ }
}
}
result.Free();
diff --git a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs
index b1eafa668..13a043633 100644
--- a/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs
+++ b/cs/src/core/Index/Synchronization/HybridLogCheckpointTask.cs
@@ -24,30 +24,16 @@ public virtual void GlobalBeforeEnteringState(SystemState next,
faster._hybridLogCheckpointToken = Guid.NewGuid();
faster.InitializeHybridLogCheckpoint(faster._hybridLogCheckpointToken, next.version);
}
-
+ faster._hybridLogCheckpoint.info.version = next.version;
faster.ObtainCurrentTailAddress(ref faster._hybridLogCheckpoint.info.startLogicalAddress);
break;
case Phase.WAIT_FLUSH:
faster._hybridLogCheckpoint.info.headAddress = faster.hlog.HeadAddress;
faster._hybridLogCheckpoint.info.beginAddress = faster.hlog.BeginAddress;
+ faster._hybridLogCheckpoint.info.nextVersion = next.version;
break;
case Phase.PERSISTENCE_CALLBACK:
- // Collect object log offsets only after flushes
- // are completed
- var seg = faster.hlog.GetSegmentOffsets();
- if (seg != null)
- {
- faster._hybridLogCheckpoint.info.objectLogSegmentOffsets = new long[seg.Length];
- Array.Copy(seg, faster._hybridLogCheckpoint.info.objectLogSegmentOffsets, seg.Length);
- }
-
- // Temporarily block new sessions from starting, which may add an entry to the table and resize the
- // dictionary. There should be minimal contention here.
- lock (faster._activeSessions)
- // write dormant sessions to checkpoint
- foreach (var kvp in faster._activeSessions)
- kvp.Value.AtomicSwitch(next.version - 1);
-
+ CollectMetadata(next, faster);
faster.WriteHybridLogMetaInfo();
break;
case Phase.REST:
@@ -59,6 +45,25 @@ public virtual void GlobalBeforeEnteringState(SystemState next,
}
}
+ protected void CollectMetadata(SystemState next, FasterKV faster)
+ {
+ // Collect object log offsets only after flushes
+ // are completed
+ var seg = faster.hlog.GetSegmentOffsets();
+ if (seg != null)
+ {
+ faster._hybridLogCheckpoint.info.objectLogSegmentOffsets = new long[seg.Length];
+ Array.Copy(seg, faster._hybridLogCheckpoint.info.objectLogSegmentOffsets, seg.Length);
+ }
+
+ // Temporarily block new sessions from starting, which may add an entry to the table and resize the
+ // dictionary. There should be minimal contention here.
+ lock (faster._activeSessions)
+ // write dormant sessions to checkpoint
+ foreach (var kvp in faster._activeSessions)
+ kvp.Value.AtomicSwitch(next.version - 1);
+ }
+
///
public virtual void GlobalAfterEnteringState(SystemState next,
FasterKV faster)
@@ -105,6 +110,13 @@ public override void GlobalBeforeEnteringState(SystemState next,
FasterKV faster)
{
base.GlobalBeforeEnteringState(next, faster);
+
+ if (next.phase == Phase.PREPARE)
+ {
+ faster._lastSnapshotCheckpoint.deltaFileDevice?.Dispose();
+ faster._lastSnapshotCheckpoint.deltaLog?.Dispose();
+ faster._lastSnapshotCheckpoint = default;
+ }
if (next.phase != Phase.WAIT_FLUSH) return;
faster.hlog.ShiftReadOnlyToTail(out var tailAddress,
@@ -162,15 +174,20 @@ internal sealed class SnapshotCheckpointTask : HybridLogCheckpointOrchestrationT
///
public override void GlobalBeforeEnteringState(SystemState next, FasterKV faster)
{
- base.GlobalBeforeEnteringState(next, faster);
switch (next.phase)
{
case Phase.PREPARE:
- faster._hybridLogCheckpoint.info.flushedLogicalAddress = faster.hlog.FlushedUntilAddress;
+ faster._lastSnapshotCheckpoint.deltaFileDevice?.Dispose();
+ faster._lastSnapshotCheckpoint.deltaLog?.Dispose();
+ faster._lastSnapshotCheckpoint = default;
+ base.GlobalBeforeEnteringState(next, faster);
+ faster._hybridLogCheckpoint.info.startLogicalAddress = faster.hlog.FlushedUntilAddress;
faster._hybridLogCheckpoint.info.useSnapshotFile = 1;
break;
case Phase.WAIT_FLUSH:
+ base.GlobalBeforeEnteringState(next, faster);
faster.ObtainCurrentTailAddress(ref faster._hybridLogCheckpoint.info.finalLogicalAddress);
+ faster._hybridLogCheckpoint.info.snapshotFinalLogicalAddress = faster._hybridLogCheckpoint.info.finalLogicalAddress;
faster._hybridLogCheckpoint.snapshotFileDevice =
faster.checkpointManager.GetSnapshotLogDevice(faster._hybridLogCheckpointToken);
@@ -179,7 +196,7 @@ public override void GlobalBeforeEnteringState(SystemState next, Fas
faster._hybridLogCheckpoint.snapshotFileDevice.Initialize(faster.hlog.GetSegmentSize());
faster._hybridLogCheckpoint.snapshotFileObjectLogDevice.Initialize(-1);
- long startPage = faster.hlog.GetPage(faster._hybridLogCheckpoint.info.flushedLogicalAddress);
+ long startPage = faster.hlog.GetPage(faster._hybridLogCheckpoint.info.startLogicalAddress);
long endPage = faster.hlog.GetPage(faster._hybridLogCheckpoint.info.finalLogicalAddress);
if (faster._hybridLogCheckpoint.info.finalLogicalAddress >
faster.hlog.GetStartLogicalAddress(endPage))
@@ -187,8 +204,10 @@ public override void GlobalBeforeEnteringState(SystemState next, Fas
endPage++;
}
- // This can be run on a new thread if we want to immediately parallelize
- // the rest of the log flush
+ // We are writing pages outside epoch protection, so callee should be able to
+ // handle corrupted or unexpected concurrent page changes during the flush, e.g., by
+ // resuming epoch protection if necessary. Correctness is not affected as we will
+ // only read safe pages during recovery.
faster.hlog.AsyncFlushPagesToDevice(
startPage,
endPage,
@@ -197,6 +216,103 @@ public override void GlobalBeforeEnteringState(SystemState next, Fas
faster._hybridLogCheckpoint.snapshotFileObjectLogDevice,
out faster._hybridLogCheckpoint.flushedSemaphore);
break;
+ case Phase.PERSISTENCE_CALLBACK:
+ // update flushed-until address to the latest
+ faster._hybridLogCheckpoint.info.flushedLogicalAddress = faster.hlog.FlushedUntilAddress;
+ base.GlobalBeforeEnteringState(next, faster);
+ faster._lastSnapshotCheckpoint = faster._hybridLogCheckpoint;
+ break;
+ default:
+ base.GlobalBeforeEnteringState(next, faster);
+ break;
+ }
+ }
+
+ ///
+ public override void OnThreadState(
+ SystemState current,
+ SystemState prev, FasterKV faster,
+ FasterKV.FasterExecutionContext ctx,
+ FasterSession fasterSession,
+ List valueTasks,
+ CancellationToken token = default)
+ {
+ base.OnThreadState(current, prev, faster, ctx, fasterSession, valueTasks, token);
+
+ if (current.phase != Phase.WAIT_FLUSH) return;
+
+ if (ctx == null || !ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush])
+ {
+ var s = faster._hybridLogCheckpoint.flushedSemaphore;
+
+ var notify = s != null && s.CurrentCount > 0;
+ notify = notify || !faster.SameCycle(ctx, current) || s == null;
+
+ if (valueTasks != null && !notify)
+ {
+ Debug.Assert(s != null);
+ valueTasks.Add(new ValueTask(s.WaitAsync(token).ContinueWith(t => s.Release())));
+ }
+
+ if (!notify) return;
+
+ if (ctx != null)
+ ctx.prevCtx.markers[EpochPhaseIdx.WaitFlush] = true;
+ }
+
+ if (ctx != null)
+ faster.epoch.Mark(EpochPhaseIdx.WaitFlush, current.version);
+
+ if (faster.epoch.CheckIsComplete(EpochPhaseIdx.WaitFlush, current.version))
+ faster.GlobalStateMachineStep(current);
+ }
+ }
+
+ ///
+ /// A Incremental Snapshot makes a copy of only changes that have happened since the last full Snapshot. It is
+ /// slower and more complex than a foldover, but more space-efficient on the log, and retains in-place
+ /// update performance as it does not advance the readonly marker unnecessarily.
+ ///
+ internal sealed class IncrementalSnapshotCheckpointTask : HybridLogCheckpointOrchestrationTask
+ {
+ ///
+ public override void GlobalBeforeEnteringState(SystemState next, FasterKV faster)
+ {
+ switch (next.phase)
+ {
+ case Phase.PREPARE:
+ faster._hybridLogCheckpoint = faster._lastSnapshotCheckpoint;
+ base.GlobalBeforeEnteringState(next, faster);
+ faster._hybridLogCheckpoint.info.startLogicalAddress = faster.hlog.FlushedUntilAddress;
+ faster._hybridLogCheckpoint.prevVersion = next.version;
+ break;
+ case Phase.WAIT_FLUSH:
+ base.GlobalBeforeEnteringState(next, faster);
+ faster._hybridLogCheckpoint.info.finalLogicalAddress = 0;
+ faster.ObtainCurrentTailAddress(ref faster._hybridLogCheckpoint.info.finalLogicalAddress);
+
+ if (faster._hybridLogCheckpoint.deltaLog == null)
+ {
+ faster._hybridLogCheckpoint.deltaFileDevice = faster.checkpointManager.GetDeltaLogDevice(faster._hybridLogCheckpointToken);
+ faster._hybridLogCheckpoint.deltaFileDevice.Initialize(-1);
+ faster._hybridLogCheckpoint.deltaLog = new DeltaLog(faster._hybridLogCheckpoint.deltaFileDevice, faster.hlog.LogPageSizeBits, -1);
+ faster._hybridLogCheckpoint.deltaLog.InitializeForWrites(faster.hlog.bufferPool);
+ }
+
+ faster.hlog.AsyncFlushDeltaToDevice(
+ faster._hybridLogCheckpoint.info.startLogicalAddress,
+ faster._hybridLogCheckpoint.info.finalLogicalAddress,
+ faster._lastSnapshotCheckpoint.info.finalLogicalAddress,
+ faster._hybridLogCheckpoint.prevVersion,
+ faster._hybridLogCheckpoint.deltaLog);
+ break;
+ case Phase.PERSISTENCE_CALLBACK:
+ faster._hybridLogCheckpoint.info.flushedLogicalAddress = faster.hlog.FlushedUntilAddress;
+ CollectMetadata(next, faster);
+ faster.WriteHybridLogIncrementalMetaInfo(faster._hybridLogCheckpoint.deltaLog);
+ faster._hybridLogCheckpoint.info.deltaTailAddress = faster._hybridLogCheckpoint.deltaLog.TailAddress;
+ faster._lastSnapshotCheckpoint = faster._hybridLogCheckpoint;
+ break;
}
}
diff --git a/cs/src/core/Utilities/Native32.cs b/cs/src/core/Utilities/Native32.cs
index aa96f4578..d937f99da 100644
--- a/cs/src/core/Utilities/Native32.cs
+++ b/cs/src/core/Utilities/Native32.cs
@@ -139,6 +139,11 @@ internal static extern bool GetQueuedCompletionStatus(
[Out] out NativeOverlapped* lpOverlapped,
[In] UInt32 dwMilliseconds);
+ [DllImport("kernel32.dll", SetLastError = true)]
+ internal static extern bool GetFileSizeEx(
+ [In] SafeFileHandle hFile,
+ [Out] out long lpFileSize);
+
internal enum EMoveMethod : uint
{
Begin = 0,
diff --git a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
index 6b2be40bd..fa35a0d28 100644
--- a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
+++ b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
@@ -218,6 +218,15 @@ public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAs
}
}
+ ///
+ public override long GetFileSize(int segmentId)
+ {
+ // We didn't find segment in blob cache
+ if (!blobs.TryGetValue(segmentId, out _))
+ return 0;
+ return segmentSize == -1 ? MAX_PAGEBLOB_SIZE : segmentSize;
+ }
+
//---- The actual read and write accesses to the page blobs
private unsafe Task WritePortionToBlobUnsafeAsync(CloudPageBlob blob, IntPtr sourceAddress, long destinationAddress, long offset, uint length)
diff --git a/cs/test/DeltaLogTests.cs b/cs/test/DeltaLogTests.cs
new file mode 100644
index 000000000..ede9b6b95
--- /dev/null
+++ b/cs/test/DeltaLogTests.cs
@@ -0,0 +1,76 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT license.
+
+using System;
+using System.IO;
+using FASTER.core;
+using NUnit.Framework;
+
+namespace FASTER.test
+{
+ [TestFixture]
+ internal class DeltaLogStandAloneTests
+ {
+
+ [Test]
+ public void DeltaLogTest1()
+ {
+ int TotalCount = 1000;
+ string path = TestContext.CurrentContext.TestDirectory + "/" + TestContext.CurrentContext.Test.Name + "/";
+ DirectoryInfo di = Directory.CreateDirectory(path);
+ using (IDevice device = Devices.CreateLogDevice(path + TestContext.CurrentContext.Test.Name + "/delta.log", deleteOnClose: false))
+ {
+ device.Initialize(-1);
+ using DeltaLog deltaLog = new DeltaLog(device, 12, 0);
+ Random r = new Random(20);
+ int i;
+
+ var bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
+ deltaLog.InitializeForWrites(bufferPool);
+ for (i = 0; i < TotalCount; i++)
+ {
+ int len = 1 + r.Next(254);
+ long address;
+ while (true)
+ {
+ deltaLog.Allocate(out int maxLen, out address);
+ if (len <= maxLen) break;
+ deltaLog.Seal(0);
+ }
+ for (int j = 0; j < len; j++)
+ {
+ unsafe { *(byte*)(address + j) = (byte)len; }
+ }
+ deltaLog.Seal(len, i);
+ }
+ deltaLog.FlushAsync().Wait();
+
+ deltaLog.InitializeForReads();
+ i = 0;
+ r = new Random(20);
+ while (deltaLog.GetNext(out long address, out int len, out int type))
+ {
+ int _len = 1 + r.Next(254);
+ Assert.IsTrue(type == i);
+ Assert.IsTrue(_len == len);
+ for (int j = 0; j < len; j++)
+ {
+ unsafe { Assert.IsTrue(*(byte*)(address + j) == (byte)_len); };
+ }
+ i++;
+ }
+ Assert.IsTrue(i == TotalCount);
+ bufferPool.Free();
+ }
+ while (true)
+ {
+ try
+ {
+ di.Delete(recursive: true);
+ break;
+ }
+ catch { }
+ }
+ }
+ }
+}
diff --git a/cs/test/EnqueueTests.cs b/cs/test/EnqueueTests.cs
index 4c33fe70c..24f660766 100644
--- a/cs/test/EnqueueTests.cs
+++ b/cs/test/EnqueueTests.cs
@@ -166,7 +166,7 @@ public async Task EnqueueAsyncBasicTest()
bool datacheckrun = false;
- CancellationToken cancellationToken;
+ CancellationToken cancellationToken = default;
ReadOnlyMemory readOnlyMemoryEntry = entry;
ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(5);
diff --git a/cs/test/FasterLogResumeTests.cs b/cs/test/FasterLogResumeTests.cs
index c45dcaa86..d104251a0 100644
--- a/cs/test/FasterLogResumeTests.cs
+++ b/cs/test/FasterLogResumeTests.cs
@@ -42,7 +42,7 @@ public void TearDown()
[Category("FasterLog")]
public async Task FasterLogResumePersistedReaderSpec([Values] LogChecksumType logChecksum)
{
- CancellationToken cancellationToken;
+ CancellationToken cancellationToken = default;
var input1 = new byte[] { 0, 1, 2, 3 };
var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 };
diff --git a/cs/test/FasterLogTests.cs b/cs/test/FasterLogTests.cs
index 48db1b429..c39c662a1 100644
--- a/cs/test/FasterLogTests.cs
+++ b/cs/test/FasterLogTests.cs
@@ -363,7 +363,7 @@ public async ValueTask TruncateUntilBasic([Values]LogChecksumType logChecksum, [
[Category("FasterLog")]
public async ValueTask EnqueueAndWaitForCommitAsyncBasicTest([Values]LogChecksumType logChecksum)
{
- CancellationToken cancellationToken;
+ CancellationToken cancellationToken = default;
ReadOnlySpanBatch spanBatch = new ReadOnlySpanBatch(numSpanEntries);
diff --git a/cs/test/ObjectRecoveryTest.cs b/cs/test/ObjectRecoveryTest.cs
index 26fa7f5c5..c7e37b133 100644
--- a/cs/test/ObjectRecoveryTest.cs
+++ b/cs/test/ObjectRecoveryTest.cs
@@ -166,7 +166,7 @@ public unsafe void Verify(Guid cprVersion, Guid indexVersion)
new DeviceLogCommitCheckpointManager(
new LocalStorageNamedDeviceFactory(),
new DefaultCheckpointNamingScheme(
- new DirectoryInfo(test_path).FullName)));
+ new DirectoryInfo(test_path).FullName)), null);
// Compute expected array
long[] expected = new long[numUniqueKeys];
diff --git a/cs/test/RecoveryChecks.cs b/cs/test/RecoveryChecks.cs
index e0358e6dc..1a7d57964 100644
--- a/cs/test/RecoveryChecks.cs
+++ b/cs/test/RecoveryChecks.cs
@@ -6,9 +6,16 @@
using System.IO;
using NUnit.Framework;
using FASTER.test.recovery.sumstore;
+using System;
+using FASTER.devices;
namespace FASTER.test.recovery
{
+ public enum DeviceMode
+ {
+ Local,
+ Cloud
+ }
[TestFixture]
public class RecoveryChecks
@@ -17,6 +24,8 @@ public class RecoveryChecks
const int numOps = 5000;
AdId[] inputArray;
string path;
+ public const string EMULATED_STORAGE_STRING = "UseDevelopmentStorage=true;";
+ public const string TEST_CONTAINER = "recoverychecks";
[SetUp]
public void Setup()
@@ -47,6 +56,17 @@ public override void ReadCompletionCallback(ref long key, ref long input, ref lo
}
}
+ public class MyFunctions2 : SimpleFunctions
+ {
+ public override void ReadCompletionCallback(ref long key, ref long input, ref long output, Empty ctx, Status status)
+ {
+ if (key < 950)
+ Assert.IsTrue(status == Status.OK && output == key);
+ else
+ Assert.IsTrue(status == Status.OK && output == key + 1);
+ }
+ }
+
[Test]
[Category("FasterKV")]
public async ValueTask RecoveryCheck1([Values] CheckpointType checkpointType, [Values] bool isAsync, [Values] bool useReadCache, [Values(128, 1<<10)]int size)
@@ -387,5 +407,99 @@ public async ValueTask RecoveryCheck5([Values] CheckpointType checkpointType, [V
}
s2.CompletePending(true);
}
+
+
+ [Test]
+ public async ValueTask IncrSnapshotRecoveryCheck([Values] DeviceMode deviceMode)
+ {
+ ICheckpointManager checkpointManager;
+ if (deviceMode == DeviceMode.Local)
+ {
+ checkpointManager = new DeviceLogCommitCheckpointManager(
+ new LocalStorageNamedDeviceFactory(),
+ new DefaultCheckpointNamingScheme(TestContext.CurrentContext.TestDirectory + $"/RecoveryChecks/IncrSnapshotRecoveryCheck"));
+ }
+ else
+ {
+ if ("yes".Equals(Environment.GetEnvironmentVariable("RunAzureTests")))
+ {
+ checkpointManager = new DeviceLogCommitCheckpointManager(
+ new AzureStorageNamedDeviceFactory(EMULATED_STORAGE_STRING),
+ new DefaultCheckpointNamingScheme($"{TEST_CONTAINER}/IncrSnapshotRecoveryCheck"));
+ }
+ else
+ return;
+ }
+
+ await IncrSnapshotRecoveryCheck(checkpointManager);
+ checkpointManager.PurgeAll();
+ checkpointManager.Dispose();
+ }
+
+ public async ValueTask IncrSnapshotRecoveryCheck(ICheckpointManager checkpointManager)
+ {
+ using var fht1 = new FasterKV
+ (1 << 10,
+ logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 20, ReadCacheSettings = null },
+ checkpointSettings: new CheckpointSettings { CheckpointManager = checkpointManager }
+ );
+
+ using var s1 = fht1.NewSession(new MyFunctions2());
+ for (long key = 0; key < 1000; key++)
+ {
+ s1.Upsert(ref key, ref key);
+ }
+
+ var task = fht1.TakeHybridLogCheckpointAsync(CheckpointType.Snapshot);
+ var result = await task;
+
+ for (long key = 950; key < 1000; key++)
+ {
+ s1.Upsert(key, key+1);
+ }
+
+ var _result1 = fht1.TakeHybridLogCheckpoint(out var _token1, CheckpointType.Snapshot, true);
+ await fht1.CompleteCheckpointAsync();
+
+ Assert.IsTrue(_result1);
+ Assert.IsTrue(_token1 == result.token);
+
+ for (long key = 1000; key < 2000; key++)
+ {
+ s1.Upsert(key, key + 1);
+ }
+
+ var _result2 = fht1.TakeHybridLogCheckpoint(out var _token2, CheckpointType.Snapshot, true);
+ await fht1.CompleteCheckpointAsync();
+
+ Assert.IsTrue(_result2);
+ Assert.IsTrue(_token2 == result.token);
+
+
+ using var fht2 = new FasterKV
+ (1 << 10,
+ logSettings: new LogSettings { LogDevice = log, MutableFraction = 1, PageSizeBits = 10, MemorySizeBits = 14, ReadCacheSettings = null },
+ checkpointSettings: new CheckpointSettings { CheckpointManager = checkpointManager }
+ );
+
+ await fht2.RecoverAsync();
+
+ Assert.IsTrue(fht1.Log.TailAddress == fht2.Log.TailAddress, $"fht1 tail = {fht1.Log.TailAddress}; fht2 tail = {fht2.Log.TailAddress}");
+
+ using var s2 = fht2.NewSession(new MyFunctions2());
+ for (long key = 0; key < 2000; key++)
+ {
+ long output = default;
+ var status = s2.Read(ref key, ref output);
+ if (status != Status.PENDING)
+ {
+ if (key < 950)
+ Assert.IsTrue(status == Status.OK && output == key);
+ else
+ Assert.IsTrue(status == Status.OK && output == key + 1);
+ }
+ }
+ s2.CompletePending(true);
+ }
}
}
diff --git a/cs/test/SimpleAsyncTests.cs b/cs/test/SimpleAsyncTests.cs
index 5048fa158..2ef8b6db5 100644
--- a/cs/test/SimpleAsyncTests.cs
+++ b/cs/test/SimpleAsyncTests.cs
@@ -72,7 +72,7 @@ public async Task ReadAsyncMinParamTest()
[Category("FasterKV")]
public async Task ReadAsyncMinParamTestNoDefaultTest()
{
- CancellationToken cancellationToken;
+ CancellationToken cancellationToken = default;
using var s1 = fht1.NewSession(new SimpleFunctions());
for (long key = 0; key < numOps; key++)
diff --git a/docs/_docs/20-fasterkv-basics.md b/docs/_docs/20-fasterkv-basics.md
index ac5c04926..8e8a670ed 100644
--- a/docs/_docs/20-fasterkv-basics.md
+++ b/docs/_docs/20-fasterkv-basics.md
@@ -342,6 +342,8 @@ Typically, you may compact around 20% (up to 100%) of the log, e.g., you could s
## Checkpointing and Recovery
+### Overall Summary
+
FASTER supports asynchronous non-blocking **checkpoint-based recovery**. Every new checkpoint persists (or makes durable) additional user-operations
(Read, Upsert or RMW). FASTER allows clients to keep track of operations that have persisted and those that have not using
a session-based API.
@@ -355,7 +357,7 @@ incremental checkpointing instead of a WAL to implement group commit in a scalab
Recall that each FASTER client starts a session, associated with a unique session ID (or name). All FASTER session operations
(Read, Upsert, RMW) carry a monotonic sequence number (sequence numbers are implicit in case of async calls). At any point in
-time, one may call `Checkpoint` to initiate an asynchronous checkpoint of FASTER. After calling `Checkpoint`, each FASTER
+time, one may call the checkpointing API to initiate an asynchronous checkpoint of FASTER. After invoking the checkpoint, each FASTER
session is (eventually) notified of a commit point. A commit point consists of (1) a sequence number, such that all operations
until, and no operations after, that sequence number, are guaranteed to be persisted as part of that checkpoint; (2) an optional
exception list of operations that were not part of the commit because they went pending and could not complete before the
@@ -370,7 +372,83 @@ With async session operations on FASTER, operations return as soon as they compl
you simply issue an `await session.WaitForCommitAsync()` call. The call completes only after the operation is made persistent by
an asynchronous commit (checkpoint). The user is responsible for initiating the checkpoint asynchronously.
-Below, we show a simple recovery example with asynchronous checkpointing.
+### Taking Checkpoints
+
+A FASTER checkpoint consists of an optional index checkpoint, coupled with a later log
+checkpoint. FASTER first recovers the index and then replays the relevant part of the log
+to get back to a consistent recovered state. If an index checkpoint is unavailable, FASTER
+replays the entire log to reconstruct the index. An index checkpoint is taken as follows:
+
+```cs
+await store.TakeIndexCheckpointAsync();
+```
+
+FASTER supports two notions of log checkpointing: Snapshot and Fold-Over.
+
+### Snapshot Checkpoint
+
+This checkpoint is a full snapshot of in-memory portion of the hybrid log into a separate
+snapshot file in the checkpoint folder. We recover using the main log followed by reading the
+snapshot back into main memory to complete recovery. FASTER also supports incremental
+snapshots, where the changes since the last full (or incremental) snapshot are captured into
+a delta log file in the same folder as the base snapshot. This is specified using the
+`tryIncremental` parameter to the checkpoint operation.
+
+```cs
+await store.TakeHybridLogCheckpointAsync(CheckpointType.Snapshot, tryIncremental: false);
+```
+
+### Fold-Over Checkpoint
+
+A fold-over checkpoint simply flushes the main data log to disk, making it read-only, and
+writes a small metadata file (`info.dat`) to the checkpoint folder. This is an incremental
+checkpoint by definition, as the mutable log consists of all changes since the previous
+fold-over checkpoint. FoldOver effectively moves the read-only marker of the hybrid log to
+the tail, and thus all the data is persisted as part of the same hybrid log (there is no
+separate snapshot file).
+
+All subsequent updates are written to new hybrid log tail locations, which gives Fold-Over
+its incremental nature. FoldOver is a very fast checkpointing scheme, but creates multiple
+versions of the data on the main log, which can increase the cost of garbage collection
+and take up main memory.
+
+```cs
+await store.TakeHybridLogCheckpointAsync(CheckpointType.FoldOver);
+```
+
+### Full Checkpoint
+
+You can take an index and log checkpoint together as follows:
+
+```cs
+await store.TakeFullCheckpointAsync(CheckpointType.FoldOver);
+```
+
+This is usually more expensive than log-only checkpoints as it needs to write the entire
+hash table to disk. A preferred approach is to take frequent log-only checkpoints and
+take an index checkpoint at coarse grained intervals in order to reduce recovery time.
+
+### Checkpoint Management
+
+By default, FASTER creates checkpoints in the folder specified using
+`CheckpointSettings.CheckpointDir`, with one folder per index or log checkpoint (each
+as a unique Guid token). You can auto-purge old checkpoint as new ones are generated, by
+setting `CheckpointSettings.RemoveOutdated` to `true`. The last two index checkpoints
+and the last log checkpoint are kept. We keep the last two index checkpoints because the
+last index checkpoint may not be usable in case there is no subsequent log checkpoint
+available. Make sure every index checkpoint is followed by at least one log checkpoint, for
+the index checkpoint to be usable for recovery.
+
+### Examples
+
+You can find several checkpointing examples here:
+* [StoreCheckpointRecover](https://github.com/microsoft/FASTER/tree/master/cs/samples/StoreCheckpointRecover)
+* [ClassRecoveryDurablity](https://github.com/microsoft/FASTER/tree/master/cs/playground/ClassRecoveryDurability)
+* [SumStore](https://github.com/microsoft/FASTER/tree/master/cs/playground/SumStore)
+* [SimpleRecoveryTest](https://github.com/microsoft/FASTER/blob/master/cs/test/SimpleRecoveryTest.cs)
+* [RecoveryChecks](https://github.com/microsoft/FASTER/blob/master/cs/test/RecoveryChecks.cs)
+
+Below, we show a simple recovery example with asynchronous fold-over checkpointing.
```cs
public class PersistenceExample
@@ -437,14 +515,4 @@ public class PersistenceExample
t.Start();
}
}
-```
-
-FASTER supports two notions of checkpointing: Snapshot and Fold-Over. The former is a full snapshot of in-memory into a separate
-snapshot file, whereas the latter is an _incremental_ checkpoint of the changes since the last checkpoint. Fold-Over effectively
-moves the read-only marker of the hybrid log to the tail, and thus all the data is persisted as part of the same hybrid log (there
-is no separate snapshot file). All subsequent updates are written to new hybrid log tail locations, which gives Fold-Over its
-incremental nature. You can find a few basic checkpointing examples
-[here](https://github.com/Microsoft/FASTER/blob/master/cs/test/SimpleRecoveryTest.cs) and
-[here](https://github.com/Microsoft/FASTER/tree/master/cs/playground/SumStore). We plan to add more examples and details going
-forward.
-
+```
\ No newline at end of file