Skip to content

Commit

Permalink
Page blob device (microsoft#147)
Browse files Browse the repository at this point in the history
* Implement AzureStorageDevice, an IDevice backed by Azure Page Blobs
* Support Azure Storage Emulator on the C# build pipeline
* Add environment var for azure-only tests
  • Loading branch information
tli2 authored and badrishc committed Jul 23, 2019
1 parent f28f8a9 commit 0519715
Show file tree
Hide file tree
Showing 12 changed files with 427 additions and 34 deletions.
13 changes: 13 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
variables:
solution: 'cs/FASTER.sln'
RunAzureTests: 'yes'

jobs:
- job: 'csharpWindows'
Expand Down Expand Up @@ -37,6 +38,18 @@ jobs:
solution: '$(solution)'
platform: '$(buildPlatform)'
configuration: '$(buildConfiguration)'

- powershell: 'Invoke-WebRequest -OutFile azure-storage-emulator.msi -Uri "https://go.microsoft.com/fwlink/?LinkId=717179&clcid=0x409"'
displayName: 'Download Azure Storage Emulator'

- powershell: 'msiexec /passive /lvx installation.log /a azure-storage-emulator.msi TARGETDIR="C:\storage-emulator"'
displayName: 'Install Azure Storage Emulator'

- script: '"C:\Program Files\Microsoft SQL Server\130\Tools\Binn\SqlLocalDB.exe" create "v13.0" 13.0 -s'
displayName: 'Init Test Db'

- script: '"C:\storage-emulator\root\Microsoft SDKs\Azure\Storage Emulator\AzureStorageEmulator.exe" start'
displayName: 'Start Storage Emulator'

- task: VSTest@2
inputs:
Expand Down
17 changes: 17 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmark", "benchmark", "{CA6AB459-A31A-4C15-B1A6-A82C349B54B4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{81B3B5D1-70F6-4979-AC76-003F9A6B316B}"
ProjectSection(SolutionItems) = preProject
src\core\FASTER.core.nuspec = src\core\FASTER.core.nuspec
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SumStore", "playground\SumStore\SumStore.csproj", "{05D61B37-9714-4234-9961-384A63F7175E}"
EndProject
Expand All @@ -37,6 +40,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "VarLenStructSample", "playg
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FixedLenStructSample", "playground\FixedLenStructSample\FixedLenStructSample.csproj", "{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "devices", "devices", "{A6B14415-D316-4955-BE5F-725BB2DEBEBE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorageDevice", "src\devices\AzureStorageDevice\FASTER.devices.AzureStorageDevice.csproj", "{E571E686-01A0-44D5-BFF5-B7678284258B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -141,6 +148,14 @@ Global
{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|Any CPU.Build.0 = Release|x64
{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|x64.ActiveCfg = Release|x64
{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|x64.Build.0 = Release|x64
{E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|x64.ActiveCfg = Debug|x64
{E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|x64.Build.0 = Debug|x64
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.Build.0 = Release|Any CPU
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.ActiveCfg = Release|x64
{E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -159,6 +174,8 @@ Global
{F989FF23-5DD7-4D8F-9458-BDA22EFC038D} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{37B3C501-A7A1-4E86-B766-22F9BEF31DFE} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496}
{E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
2 changes: 1 addition & 1 deletion cs/src/core/Allocator/GenericAllocator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ private void AsyncFlushPartialObjectLogCallback<TContext>(uint errorCode, uint n
{
if (errorCode != 0)
{
Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
}

// Set the page status to flushed
Expand Down
2 changes: 0 additions & 2 deletions cs/src/core/Device/Devices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@

namespace FASTER.core
{


/// <summary>
/// Factory to create FASTER objects
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions cs/src/core/Device/LocalStorageDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,21 @@ public override void Close()
logHandle.Dispose();
}

/// <summary>
///
/// </summary>
/// <param name="segmentId"></param>
/// <returns></returns>
protected string GetSegmentName(int segmentId)
{
return FileName + "." + segmentId;
}

/// <summary>
///
/// </summary>
/// <param name="_segmentId"></param>
/// <returns></returns>
// Can be used to pre-load handles, e.g., after a checkpoint
protected SafeFileHandle GetOrAddHandle(int _segmentId)
{
Expand Down
14 changes: 0 additions & 14 deletions cs/src/core/FASTER.core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,4 @@
<ItemGroup>
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="4.5.2" />
</ItemGroup>
<ItemGroup>
<EmbeddedResource Include="Index/FASTER/Checkpoint.cs" />
<EmbeddedResource Include="Index/Common/Contexts.cs" />
<EmbeddedResource Include="Index/FASTER/FASTER.cs" />
<EmbeddedResource Include="Index/FASTER/FASTERBase.cs" />
<EmbeddedResource Include="Index/FASTER/FASTERImpl.cs" />
<EmbeddedResource Include="Index/FASTER/FASTERThread.cs" />
<EmbeddedResource Include="Index/FASTER/IndexCheckpoint.cs" />
<EmbeddedResource Include="Index/FASTER/IndexRecovery.cs" />
<EmbeddedResource Include="Index/FASTER/Recovery.cs" />
<EmbeddedResource Include="Utilities/AsyncResultTypes.cs" />
<EmbeddedResource Include="Utilities/StateTransitions.cs" />
</ItemGroup>

</Project>
179 changes: 179 additions & 0 deletions cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using FASTER.core;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;

namespace FASTER.devices
{
/// <summary>
/// A IDevice Implementation that is backed by<see href="https://docs.microsoft.com/en-us/azure/storage/blobs/storage-blob-pageblob-overview">Azure Page Blob</see>.
/// This device is slower than a local SSD or HDD, but provides scalability and shared access in the cloud.
/// </summary>
public class AzureStorageDevice : StorageDeviceBase
{
private CloudBlobContainer container;
private readonly ConcurrentDictionary<int, BlobEntry> blobs;
private readonly string blobName;
private readonly bool deleteOnClose;

// Page Blobs permit blobs of max size 8 TB, but the emulator permits only 2 GB
private const long MAX_BLOB_SIZE = (long)(2 * 10e8);
// Azure Page Blobs have a fixed sector size of 512 bytes.
private const uint PAGE_BLOB_SECTOR_SIZE = 512;

/// <summary>
/// Constructs a new AzureStorageDevice instance, backed by Azure Page Blobs
/// </summary>
/// <param name="connectionString"> The connection string to use when estblishing connection to Azure Blobs</param>
/// <param name="containerName">Name of the Azure Blob container to use. If there does not exist a container with the supplied name, one is created</param>
/// <param name="blobName">A descriptive name that will be the prefix of all blobs created with this device</param>
/// <param name="deleteOnClose">
/// True if the program should delete all blobs created on call to <see cref="Close">Close</see>. False otherwise.
/// The container is not deleted even if it was created in this constructor
/// </param>
public AzureStorageDevice(string connectionString, string containerName, string blobName, bool deleteOnClose = false)
: base(connectionString + "/" + containerName + "/" + blobName, PAGE_BLOB_SECTOR_SIZE)
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
CloudBlobClient client = storageAccount.CreateCloudBlobClient();
container = client.GetContainerReference(containerName);
container.CreateIfNotExists();
blobs = new ConcurrentDictionary<int, BlobEntry>();
this.blobName = blobName;
this.deleteOnClose = deleteOnClose;
}

/// <summary>
/// <see cref="IDevice.Close">Inherited</see>
/// </summary>
public override void Close()
{
// Unlike in LocalStorageDevice, we explicitly remove all page blobs if the deleteOnClose flag is set, instead of relying on the operating system
// to delete files after the end of our process. This leads to potential problems if multiple instances are sharing the same underlying page blobs.
//
// Since this flag is presumably only used for testing though, it is probably fine.
if (deleteOnClose)
{
foreach (var entry in blobs)
{
entry.Value.GetPageBlob().Delete();
}
}
}

/// <summary>
/// <see cref="IDevice.Close">Inherited</see>
/// </summary>
public override void DeleteSegmentRange(int fromSegment, int toSegment)
{
for (int i = fromSegment; i < toSegment; i++)
{
if (blobs.TryRemove(i, out BlobEntry blob))
{
blob.GetPageBlob().Delete();
}
}
}

/// <summary>
/// <see cref="IDevice.ReadAsync(int, ulong, IntPtr, uint, IOCompletionCallback, IAsyncResult)">Inherited</see>
/// </summary>
public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr destinationAddress, uint readLength, IOCompletionCallback callback, IAsyncResult asyncResult)
{
// It is up to the allocator to make sure no reads are issued to segments before they are written
if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry)) throw new InvalidOperationException("Attempting to read non-existent segments");

// Even though Azure Page Blob does not make use of Overlapped, we populate one to conform to the callback API
Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);

UnmanagedMemoryStream stream = new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write);
CloudPageBlob pageBlob = blobEntry.GetPageBlob();
pageBlob.BeginDownloadRangeToStream(stream, (Int64)sourceAddress, readLength, ar => {
try
{
pageBlob.EndDownloadRangeToStream(ar);
}
// I don't think I can be more specific in catch here because no documentation on exception behavior is provided
catch (Exception e)
{
Trace.TraceError(e.Message);
// Is there any documentation on the meaning of error codes here? The handler suggests that any non-zero value is an error
// but does not distinguish between them.
callback(2, readLength, ovNative);
}
callback(0, readLength, ovNative);
}, asyncResult);
}

/// <summary>
/// <see cref="IDevice.WriteAsync(IntPtr, int, ulong, uint, IOCompletionCallback, IAsyncResult)">Inherited</see>
/// </summary>
public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
{
if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry))
{
BlobEntry entry = new BlobEntry();
if (blobs.TryAdd(segmentId, entry))
{
CloudPageBlob pageBlob = container.GetPageBlobReference(blobName + segmentId);
// If segment size is -1, which denotes absence, we request the largest possible blob. This is okay because
// page blobs are not backed by real pages on creation, and the given size is only a the physical limit of
// how large it can grow to.
var size = segmentSize == -1 ? MAX_BLOB_SIZE : segmentSize;
// If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement)
// After creation is done, we can call write.
entry.CreateAsync(size, pageBlob);
}
// Otherwise, some other thread beat us to it. Okay to use their blobs.
blobEntry = blobs[segmentId];
}
TryWriteAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void TryWriteAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
{
CloudPageBlob pageBlob = blobEntry.GetPageBlob();
// If pageBlob is null, it is being created. Attempt to queue the write for the creator to complete after it is done
if (pageBlob == null
&& blobEntry.TryQueueAction(p => WriteToBlobAsync(p, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult))) return;

// Otherwise, invoke directly.
WriteToBlobAsync(pageBlob, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static unsafe void WriteToBlobAsync(CloudPageBlob blob, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
{
// Even though Azure Page Blob does not make use of Overlapped, we populate one to conform to the callback API
Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);
UnmanagedMemoryStream stream = new UnmanagedMemoryStream((byte*)sourceAddress, numBytesToWrite);
blob.BeginWritePages(stream, (long)destinationAddress, null, ar =>
{
try
{
blob.EndWritePages(ar);
}
// I don't think I can be more specific in catch here because no documentation on exception behavior is provided
catch (Exception e)
{
Trace.TraceError(e.Message);
// Is there any documentation on the meaning of error codes here? The handler suggests that any non-zero value is an error
// but does not distinguish between them.
callback(1, numBytesToWrite, ovNative);
}
callback(0, numBytesToWrite, ovNative);
}, asyncResult);
}
}
}
Loading

0 comments on commit 0519715

Please sign in to comment.