Skip to content

Commit

Permalink
[C#] Incremental snapshot checkpoints, support auto-purging checkpoin…
Browse files Browse the repository at this point in the history
…ts (microsoft#422)

* Starting commit for incremental snapshot support.
* More wiring up of delta log writing.
* First correct working implementation of incremental snapshot
* Support cloud with incr snapshot
* Support incremental recovery to different number of pages in hlog memory.
* Refactor delta log to handle write and reads of WAL, use it to also store recovery metadata for incremental commits.
* added testcase for deltalog
* Improved interfacing to ICheckpointManager
* Fix .net 5.0 breaks in tests
* add test runner in playground
* Add automatic removal of outdated checkpoint token folders.
* Expand docs on checkpointing
  • Loading branch information
badrishc authored Apr 22, 2021
1 parent 71f956b commit 9dcb777
Show file tree
Hide file tree
Showing 47 changed files with 1,718 additions and 284 deletions.
17 changes: 10 additions & 7 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ jobs:
inputs:
testRunner: VSTest
testResultsFiles: '**/*.trx'
searchFolder: '$(Agent.TempDirectory)'

- job: 'cppWindows'
pool:
Expand Down Expand Up @@ -152,11 +153,6 @@ jobs:
workingDirectory: 'cc/build/Debug'
displayName: 'Run Tests (Debug)'
#- script: |
# CTEST_OUTPUT_ON_FAILURE=1 make test
# workingDirectory: 'cc/build/Release'
# displayName: 'Run Tests (Release)'

- job: 'csharpLinux'
pool:
vmImage: ubuntu-16.04
Expand Down Expand Up @@ -187,8 +183,15 @@ jobs:
inputs:
command: test
projects: '**/test/*.csproj'
arguments: '--configuration $(buildConfiguration) --framework netcoreapp3.1'

arguments: '--configuration $(buildConfiguration) --framework netcoreapp3.1 -l "console;verbosity=detailed"'

- task: PublishTestResults@2
displayName: 'Publish Test Results'
inputs:
testResultsFormat: 'VSTest'
testResultsFiles: '*.trx'
searchFolder: '$(Agent.TempDirectory)'

# - job: 'cppBlobsWindows'
# pool:
# vmImage: vs2017-win2016
Expand Down
10 changes: 10 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docs", "docs", "{C60F148B-2
..\docs\_docs\96-slides-videos.md = ..\docs\_docs\96-slides-videos.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TstRunner", "playground\TstRunner\TstRunner.csproj", "{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -275,6 +277,13 @@ Global
{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693}.Release|Any CPU.Build.0 = Release|x64
{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693}.Release|x64.ActiveCfg = Release|x64
{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693}.Release|x64.Build.0 = Release|x64
{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Debug|Any CPU.ActiveCfg = Debug|x64
{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Debug|Any CPU.Build.0 = Debug|x64
{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Debug|x64.ActiveCfg = Debug|x64
{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Debug|x64.Build.0 = Debug|x64
{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|Any CPU.ActiveCfg = Release|x64
{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|x64.ActiveCfg = Release|x64
{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -305,6 +314,7 @@ Global
{998D4C78-B0C5-40FF-9BDC-716BAC8CF864} = {62BC1134-B6E1-476A-B894-7CA278A8B6DE}
{9EFCF8C5-320B-473C-83DE-3815981D465B} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{E8C7FB0F-38B8-468A-B1CA-8793DF8F2693} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{A265D9D2-3FEA-48BB-B1CC-273ECFEA0611} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
5 changes: 4 additions & 1 deletion cs/playground/AsyncStress/FasterWrapper.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using FASTER.core;
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using Xunit;
using System;
using System.IO;
Expand Down
5 changes: 4 additions & 1 deletion cs/playground/AsyncStress/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.Threading.Tasks;
using Xunit;
Expand Down
7 changes: 1 addition & 6 deletions cs/playground/CacheStoreConcurrent/Types.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using FASTER.core;

namespace CacheStoreConcurrent
{
Expand Down
5 changes: 4 additions & 1 deletion cs/playground/FasterLogMLSDTest/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System;
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.IO;
using System.Threading;
Expand Down
19 changes: 19 additions & 0 deletions cs/playground/TstRunner/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using FASTER.core;
using FASTER.test.recovery.objects;

namespace TstRunner
{
public class Program
{
public static void Main()
{
var test = new ObjectRecoveryTests2();
test.Setup();
test.ObjectRecoveryTest2(CheckpointType.Snapshot, 400, false).GetAwaiter().GetResult();
test.TearDown();
}
}
}
19 changes: 19 additions & 0 deletions cs/playground/TstRunner/TstRunner.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<Platforms>x64</Platforms>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="xunit.assert" Version="2.4.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\core\FASTER.core.csproj" />
<ProjectReference Include="..\..\test\FASTER.test.csproj" />
</ItemGroup>

</Project>
2 changes: 1 addition & 1 deletion cs/samples/SecondaryReaderStore/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ static void SecondaryReader()
{
try
{
secondaryStore.Recover(undoFutureVersions: false); // read-only recovery, no writing back undos
secondaryStore.Recover(undoNextVersion: false); // read-only recovery, no writing back undos
}
catch
{
Expand Down
2 changes: 1 addition & 1 deletion cs/samples/StoreCheckpointRecover/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static void Main()
Console.WriteLine("Error!");
}

// Take fold-over checkpoint of FASTER, wait to complete
// Take index + fold-over checkpoint of FASTER, wait to complete
store.TakeFullCheckpointAsync(CheckpointType.FoldOver)
.GetAwaiter().GetResult();

Expand Down
143 changes: 140 additions & 3 deletions cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ internal struct FullPageStatus
public long LastFlushedUntilAddress;
[FieldOffset(8)]
public long LastClosedUntilAddress;
[FieldOffset(16)]
public int Dirty;
}

[StructLayout(LayoutKind.Explicit)]
Expand Down Expand Up @@ -379,7 +381,7 @@ public abstract (int, int) GetInitialRecordSize<Input, FasterSession>(ref Key ke
/// </summary>
/// <param name="pageIndex"></param>
/// <returns></returns>
protected abstract bool IsAllocated(int pageIndex);
internal abstract bool IsAllocated(int pageIndex);
/// <summary>
/// Populate page
/// </summary>
Expand All @@ -401,6 +403,141 @@ public abstract (int, int) GetInitialRecordSize<Input, FasterSession>(ref Key ke
/// <param name="localSegmentOffsets"></param>
protected abstract void WriteAsyncToDevice<TContext>(long startPage, long flushPage, int pageSize, DeviceIOCompletionCallback callback, PageAsyncFlushResult<TContext> result, IDevice device, IDevice objectLogDevice, long[] localSegmentOffsets);

/// <summary>
/// Delta flush
/// </summary>
/// <param name="startAddress"></param>
/// <param name="endAddress"></param>
/// <param name="prevEndAddress"></param>
/// <param name="version"></param>
/// <param name="deltaLog"></param>
internal unsafe virtual void AsyncFlushDeltaToDevice(long startAddress, long endAddress, long prevEndAddress, int version, DeltaLog deltaLog)
{
long startPage = GetPage(startAddress);
long endPage = GetPage(endAddress);
if (endAddress > GetStartLogicalAddress(endPage))
endPage++;

long prevEndPage = GetPage(prevEndAddress);

deltaLog.Allocate(out int entryLength, out long destPhysicalAddress);
int destOffset = 0;

for (long p = startPage; p < endPage; p++)
{
// All RCU pages need to be added to delta
// For IPU-only pages, prune based on dirty bit
if ((p < prevEndPage || endAddress == prevEndAddress) && PageStatusIndicator[p % BufferSize].Dirty < version)
continue;

var logicalAddress = p << LogPageSizeBits;
var physicalAddress = GetPhysicalAddress(logicalAddress);
var endPhysicalAddress = physicalAddress + PageSize;

if (p == startPage)
{
physicalAddress += (int)(startAddress & PageSizeMask);
logicalAddress += (int)(startAddress & PageSizeMask);
}

while (physicalAddress < endPhysicalAddress)
{
ref var info = ref GetInfo(physicalAddress);
var (recordSize, alignedRecordSize) = GetRecordSize(physicalAddress);
if (info.Version == RecordInfo.GetShortVersion(version))
{
int size = sizeof(long) + sizeof(int) + alignedRecordSize;
if (destOffset + size > entryLength)
{
deltaLog.Seal(destOffset);
deltaLog.Allocate(out entryLength, out destPhysicalAddress);
destOffset = 0;
if (destOffset + size > entryLength)
throw new FasterException("Insufficient page size to write delta");
}
*((long*)(destPhysicalAddress + destOffset)) = logicalAddress;
destOffset += sizeof(long);
*((int*)(destPhysicalAddress + destOffset)) = alignedRecordSize;
destOffset += sizeof(int);
Buffer.MemoryCopy((void*)physicalAddress, (void*)(destPhysicalAddress + destOffset), alignedRecordSize, alignedRecordSize);
destOffset += alignedRecordSize;
}
physicalAddress += alignedRecordSize;
logicalAddress += alignedRecordSize;
}
}

if (destOffset > 0)
deltaLog.Seal(destOffset);
}

internal unsafe void ApplyDelta(DeltaLog log, long startPage, long endPage)
{
if (log == null) return;

long startLogicalAddress = GetStartLogicalAddress(startPage);
long endLogicalAddress = GetStartLogicalAddress(endPage);

log.Reset();
while (log.GetNext(out long physicalAddress, out int entryLength, out int type))
{
if (type != 0) continue; // consider only delta records
long endAddress = physicalAddress + entryLength;
while (physicalAddress < endAddress)
{
long address = *(long*)physicalAddress;
physicalAddress += sizeof(long);
int size = *(int*)physicalAddress;
physicalAddress += sizeof(int);
if (address >= startLogicalAddress && address < endLogicalAddress)
{
var destination = GetPhysicalAddress(address);
Buffer.MemoryCopy((void*)physicalAddress, (void*)destination, size, size);
}
physicalAddress += size;
}
}
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void MarkPage(long logicalAddress, int version)
{
var offset = (logicalAddress >> LogPageSizeBits) % BufferSize;
if (PageStatusIndicator[offset].Dirty < version)
PageStatusIndicator[offset].Dirty = version;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal void MarkPageAtomic(long logicalAddress, int version)
{
var offset = (logicalAddress >> LogPageSizeBits) % BufferSize;
Utility.MonotonicUpdate(ref PageStatusIndicator[offset].Dirty, version, out _);
}

internal void WriteAsync<TContext>(IntPtr alignedSourceAddress, ulong alignedDestinationAddress, uint numBytesToWrite,
DeviceIOCompletionCallback callback, PageAsyncFlushResult<TContext> asyncResult,
IDevice device)
{
if (asyncResult.partial)
{
// Write only required bytes within the page
int aligned_start = (int)((asyncResult.fromAddress - (asyncResult.page << LogPageSizeBits)));
aligned_start = (aligned_start / sectorSize) * sectorSize;

int aligned_end = (int)((asyncResult.untilAddress - (asyncResult.page << LogPageSizeBits)));
aligned_end = ((aligned_end + (sectorSize - 1)) & ~(sectorSize - 1));

numBytesToWrite = (uint)(aligned_end - aligned_start);
device.WriteAsync(alignedSourceAddress + aligned_start, alignedDestinationAddress + (ulong)aligned_start, numBytesToWrite, callback, asyncResult);
}
else
{
device.WriteAsync(alignedSourceAddress, alignedDestinationAddress,
numBytesToWrite, callback, asyncResult);
}
}


/// <summary>
/// Read objects to memory (async)
/// </summary>
Expand All @@ -427,7 +564,7 @@ public abstract (int, int) GetInitialRecordSize<Input, FasterSession>(ref Key ke
/// </summary>
/// <param name="page">Page number to be cleared</param>
/// <param name="offset">Offset to clear from (if partial clear)</param>
protected abstract void ClearPage(long page, int offset = 0);
internal abstract void ClearPage(long page, int offset = 0);
/// <summary>
/// Write page (async)
/// </summary>
Expand Down Expand Up @@ -1796,7 +1933,7 @@ private void AsyncFlushPageCallback(uint errorCode, uint numBytes, object contex
/// <param name="errorCode"></param>
/// <param name="numBytes"></param>
/// <param name="context"></param>
private void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, object context)
protected void AsyncFlushPageToDeviceCallback(uint errorCode, uint numBytes, object context)
{
try
{
Expand Down
Loading

0 comments on commit 9dcb777

Please sign in to comment.