diff --git a/azure-pipelines.yml b/azure-pipelines.yml index 2abdd0a75..f7e119a9a 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -1,5 +1,6 @@ variables: solution: 'cs/FASTER.sln' + RunAzureTests: 'yes' jobs: - job: 'csharpWindows' @@ -37,6 +38,18 @@ jobs: solution: '$(solution)' platform: '$(buildPlatform)' configuration: '$(buildConfiguration)' + + - powershell: 'Invoke-WebRequest -OutFile azure-storage-emulator.msi -Uri "https://go.microsoft.com/fwlink/?LinkId=717179&clcid=0x409"' + displayName: 'Download Azure Storage Emulator' + + - powershell: 'msiexec /passive /lvx installation.log /a azure-storage-emulator.msi TARGETDIR="C:\storage-emulator"' + displayName: 'Install Azure Storage Emulator' + + - script: '"C:\Program Files\Microsoft SQL Server\130\Tools\Binn\SqlLocalDB.exe" create "v13.0" 13.0 -s' + displayName: 'Init Test Db' + + - script: '"C:\storage-emulator\root\Microsoft SDKs\Azure\Storage Emulator\AzureStorageEmulator.exe" start' + displayName: 'Start Storage Emulator' - task: VSTest@2 inputs: diff --git a/cs/FASTER.sln b/cs/FASTER.sln index 5a8702776..af8a268e5 100644 --- a/cs/FASTER.sln +++ b/cs/FASTER.sln @@ -12,6 +12,9 @@ EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "benchmark", "benchmark", "{CA6AB459-A31A-4C15-B1A6-A82C349B54B4}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{81B3B5D1-70F6-4979-AC76-003F9A6B316B}" + ProjectSection(SolutionItems) = preProject + src\core\FASTER.core.nuspec = src\core\FASTER.core.nuspec + EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SumStore", "playground\SumStore\SumStore.csproj", "{05D61B37-9714-4234-9961-384A63F7175E}" EndProject @@ -37,6 +40,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "VarLenStructSample", "playg EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FixedLenStructSample", "playground\FixedLenStructSample\FixedLenStructSample.csproj", "{7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "devices", "devices", "{A6B14415-D316-4955-BE5F-725BB2DEBEBE}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorageDevice", "src\devices\AzureStorageDevice\FASTER.devices.AzureStorageDevice.csproj", "{E571E686-01A0-44D5-BFF5-B7678284258B}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -141,6 +148,14 @@ Global {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|Any CPU.Build.0 = Release|x64 {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|x64.ActiveCfg = Release|x64 {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780}.Release|x64.Build.0 = Release|x64 + {E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|x64.ActiveCfg = Debug|x64 + {E571E686-01A0-44D5-BFF5-B7678284258B}.Debug|x64.Build.0 = Debug|x64 + {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|Any CPU.Build.0 = Release|Any CPU + {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.ActiveCfg = Release|x64 + {E571E686-01A0-44D5-BFF5-B7678284258B}.Release|x64.Build.0 = Release|x64 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -159,6 +174,8 @@ Global {F989FF23-5DD7-4D8F-9458-BDA22EFC038D} = {E6026D6A-01C5-4582-B2C1-64751490DABE} {37B3C501-A7A1-4E86-B766-22F9BEF31DFE} = {E6026D6A-01C5-4582-B2C1-64751490DABE} {7EBB5ADF-D9EA-4B8B-AAE7-C48A98EBF780} = {E6026D6A-01C5-4582-B2C1-64751490DABE} + {A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496} + {E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC} diff --git a/cs/src/core/Allocator/GenericAllocator.cs b/cs/src/core/Allocator/GenericAllocator.cs index 2aec04422..e5822429f 100644 --- a/cs/src/core/Allocator/GenericAllocator.cs +++ b/cs/src/core/Allocator/GenericAllocator.cs @@ -498,7 +498,7 @@ private void AsyncFlushPartialObjectLogCallback(uint errorCode, uint n { if (errorCode != 0) { - Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); + Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode); } // Set the page status to flushed diff --git a/cs/src/core/Device/Devices.cs b/cs/src/core/Device/Devices.cs index 02b51679e..9428fd848 100644 --- a/cs/src/core/Device/Devices.cs +++ b/cs/src/core/Device/Devices.cs @@ -8,8 +8,6 @@ namespace FASTER.core { - - /// /// Factory to create FASTER objects /// diff --git a/cs/src/core/Device/LocalStorageDevice.cs b/cs/src/core/Device/LocalStorageDevice.cs index 27517b8d7..1680e4a22 100644 --- a/cs/src/core/Device/LocalStorageDevice.cs +++ b/cs/src/core/Device/LocalStorageDevice.cs @@ -146,11 +146,21 @@ public override void Close() logHandle.Dispose(); } + /// + /// + /// + /// + /// protected string GetSegmentName(int segmentId) { return FileName + "." + segmentId; } + /// + /// + /// + /// + /// // Can be used to pre-load handles, e.g., after a checkpoint protected SafeFileHandle GetOrAddHandle(int _segmentId) { diff --git a/cs/src/core/FASTER.core.csproj b/cs/src/core/FASTER.core.csproj index c2dbeca27..1d68a5425 100644 --- a/cs/src/core/FASTER.core.csproj +++ b/cs/src/core/FASTER.core.csproj @@ -37,18 +37,4 @@ - - - - - - - - - - - - - - \ No newline at end of file diff --git a/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs new file mode 100644 index 000000000..cf3491e57 --- /dev/null +++ b/cs/src/devices/AzureStorageDevice/AzureStorageDevice.cs @@ -0,0 +1,179 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.IO; +using System.Runtime.CompilerServices; +using System.Threading; +using FASTER.core; +using Microsoft.Azure.Storage; +using Microsoft.Azure.Storage.Blob; + +namespace FASTER.devices +{ + /// + /// A IDevice Implementation that is backed byAzure Page Blob. + /// This device is slower than a local SSD or HDD, but provides scalability and shared access in the cloud. + /// + public class AzureStorageDevice : StorageDeviceBase + { + private CloudBlobContainer container; + private readonly ConcurrentDictionary blobs; + private readonly string blobName; + private readonly bool deleteOnClose; + + // Page Blobs permit blobs of max size 8 TB, but the emulator permits only 2 GB + private const long MAX_BLOB_SIZE = (long)(2 * 10e8); + // Azure Page Blobs have a fixed sector size of 512 bytes. + private const uint PAGE_BLOB_SECTOR_SIZE = 512; + + /// + /// Constructs a new AzureStorageDevice instance, backed by Azure Page Blobs + /// + /// The connection string to use when estblishing connection to Azure Blobs + /// Name of the Azure Blob container to use. If there does not exist a container with the supplied name, one is created + /// A descriptive name that will be the prefix of all blobs created with this device + /// + /// True if the program should delete all blobs created on call to Close. False otherwise. + /// The container is not deleted even if it was created in this constructor + /// + public AzureStorageDevice(string connectionString, string containerName, string blobName, bool deleteOnClose = false) + : base(connectionString + "/" + containerName + "/" + blobName, PAGE_BLOB_SECTOR_SIZE) + { + CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString); + CloudBlobClient client = storageAccount.CreateCloudBlobClient(); + container = client.GetContainerReference(containerName); + container.CreateIfNotExists(); + blobs = new ConcurrentDictionary(); + this.blobName = blobName; + this.deleteOnClose = deleteOnClose; + } + + /// + /// Inherited + /// + public override void Close() + { + // Unlike in LocalStorageDevice, we explicitly remove all page blobs if the deleteOnClose flag is set, instead of relying on the operating system + // to delete files after the end of our process. This leads to potential problems if multiple instances are sharing the same underlying page blobs. + // + // Since this flag is presumably only used for testing though, it is probably fine. + if (deleteOnClose) + { + foreach (var entry in blobs) + { + entry.Value.GetPageBlob().Delete(); + } + } + } + + /// + /// Inherited + /// + public override void DeleteSegmentRange(int fromSegment, int toSegment) + { + for (int i = fromSegment; i < toSegment; i++) + { + if (blobs.TryRemove(i, out BlobEntry blob)) + { + blob.GetPageBlob().Delete(); + } + } + } + + /// + /// Inherited + /// + public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr destinationAddress, uint readLength, IOCompletionCallback callback, IAsyncResult asyncResult) + { + // It is up to the allocator to make sure no reads are issued to segments before they are written + if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry)) throw new InvalidOperationException("Attempting to read non-existent segments"); + + // Even though Azure Page Blob does not make use of Overlapped, we populate one to conform to the callback API + Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult); + NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero); + + UnmanagedMemoryStream stream = new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write); + CloudPageBlob pageBlob = blobEntry.GetPageBlob(); + pageBlob.BeginDownloadRangeToStream(stream, (Int64)sourceAddress, readLength, ar => { + try + { + pageBlob.EndDownloadRangeToStream(ar); + } + // I don't think I can be more specific in catch here because no documentation on exception behavior is provided + catch (Exception e) + { + Trace.TraceError(e.Message); + // Is there any documentation on the meaning of error codes here? The handler suggests that any non-zero value is an error + // but does not distinguish between them. + callback(2, readLength, ovNative); + } + callback(0, readLength, ovNative); + }, asyncResult); + } + + /// + /// Inherited + /// + public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult) + { + if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry)) + { + BlobEntry entry = new BlobEntry(); + if (blobs.TryAdd(segmentId, entry)) + { + CloudPageBlob pageBlob = container.GetPageBlobReference(blobName + segmentId); + // If segment size is -1, which denotes absence, we request the largest possible blob. This is okay because + // page blobs are not backed by real pages on creation, and the given size is only a the physical limit of + // how large it can grow to. + var size = segmentSize == -1 ? MAX_BLOB_SIZE : segmentSize; + // If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement) + // After creation is done, we can call write. + entry.CreateAsync(size, pageBlob); + } + // Otherwise, some other thread beat us to it. Okay to use their blobs. + blobEntry = blobs[segmentId]; + } + TryWriteAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void TryWriteAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult) + { + CloudPageBlob pageBlob = blobEntry.GetPageBlob(); + // If pageBlob is null, it is being created. Attempt to queue the write for the creator to complete after it is done + if (pageBlob == null + && blobEntry.TryQueueAction(p => WriteToBlobAsync(p, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult))) return; + + // Otherwise, invoke directly. + WriteToBlobAsync(pageBlob, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult); + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static unsafe void WriteToBlobAsync(CloudPageBlob blob, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult) + { + // Even though Azure Page Blob does not make use of Overlapped, we populate one to conform to the callback API + Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult); + NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero); + UnmanagedMemoryStream stream = new UnmanagedMemoryStream((byte*)sourceAddress, numBytesToWrite); + blob.BeginWritePages(stream, (long)destinationAddress, null, ar => + { + try + { + blob.EndWritePages(ar); + } + // I don't think I can be more specific in catch here because no documentation on exception behavior is provided + catch (Exception e) + { + Trace.TraceError(e.Message); + // Is there any documentation on the meaning of error codes here? The handler suggests that any non-zero value is an error + // but does not distinguish between them. + callback(1, numBytesToWrite, ovNative); + } + callback(0, numBytesToWrite, ovNative); + }, asyncResult); + } + } +} diff --git a/cs/src/devices/AzureStorageDevice/BlobEntry.cs b/cs/src/devices/AzureStorageDevice/BlobEntry.cs new file mode 100644 index 000000000..e70d20b6d --- /dev/null +++ b/cs/src/devices/AzureStorageDevice/BlobEntry.cs @@ -0,0 +1,106 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT license. + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Threading; +using Microsoft.Azure.Storage.Blob; + +namespace FASTER.devices +{ + // This class bundles a page blob object with a queue and a counter to ensure + // 1) BeginCreate is not called more than once + // 2) No writes are issued before EndCreate + // The creator of a BlobEntry is responsible for populating the object with an underlying Page Blob. Any subsequent callers + // either directly write to the created page blob, or queues the write so the creator can clear it after creation is complete. + // In-progress creation is denoted by a null value on the underlying page blob + class BlobEntry + { + private CloudPageBlob pageBlob; + private ConcurrentQueue> pendingWrites; + private int waitingCount; + + /// + /// Creates a new BlobEntry, does not initialize a page blob. Use + /// for actual creation. + /// + public BlobEntry() + { + pageBlob = null; + pendingWrites = new ConcurrentQueue>(); + waitingCount = 0; + } + + /// + /// Getter for the underlying + /// + /// the underlying , or null if there is none + public CloudPageBlob GetPageBlob() + { + return pageBlob; + } + + /// + /// Asynchronously invoke create on the given pageBlob. + /// + /// maximum size of the blob + /// The page blob to create + public void CreateAsync(long size, CloudPageBlob pageBlob) + { + Debug.Assert(waitingCount == 0, "Create should be called on blobs that don't already exist and exactly once"); + // Asynchronously create the blob + pageBlob.BeginCreate(size, ar => + { + try + { + pageBlob.EndCreate(ar); + } + catch (Exception e) + { + // TODO(Tianyu): Can't really do better without knowing error behavior + Trace.TraceError(e.Message); + } + // At this point the blob is fully created. After this line all consequent writers will write immediately. We just + // need to clear the queue of pending writers. + this.pageBlob = pageBlob; + // Take a snapshot of the current waiting count. Exactly this many actions will be cleared. + // Swapping in -1 will inform any stragglers that we are not taking their actions and prompt them to retry (and call write directly) + int waitingCountSnapshot = Interlocked.Exchange(ref waitingCount, -1); + Action action; + // Clear actions + for (int i = 0; i < waitingCountSnapshot; i++) + { + // inserts into the queue may lag behind the creation thread. We have to wait until that happens. + // This is so rare, that we are probably okay with a busy wait. + while (!pendingWrites.TryDequeue(out action)) { } + action(pageBlob); + } + // Mark for deallocation for the GC + pendingWrites = null; + }, null); + } + + /// + /// Attempts to enqueue an action to be invoked by the creator after creation is done. Should only be invoked when + /// creation is in-flight. This call is allowed to fail (and return false) if concurrently the creation is complete. + /// The caller should call the write action directly instead of queueing in this case. + /// + /// The write action to perform + /// Whether the action was successfully enqueued + public bool TryQueueAction(Action writeAction) + { + int currentCount; + do + { + currentCount = waitingCount; + // If current count became -1, creation is complete. New queue entries will not be processed and we must call the action ourselves. + if (currentCount == -1) return false; + } while (Interlocked.CompareExchange(ref waitingCount, currentCount + 1, currentCount) != currentCount); + // Enqueue last. The creation thread is obliged to wait until it has processed waitingCount many actions. + // It is extremely unlikely that we will get scheduled out here anyways. + pendingWrites.Enqueue(writeAction); + return true; + } + } +} diff --git a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj new file mode 100644 index 000000000..de06d0e90 --- /dev/null +++ b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.csproj @@ -0,0 +1,44 @@ + + + + netstandard2.0;net46 + AnyCPU;x64 + + + + true + FASTER.devices + FASTER.devices.AzureStorageDevice + prompt + true + + Library + + ../../../FASTER.snk + false + bin\$(Platform)\$(Configuration)\$(TargetFramework)\$(AssemblyName).xml + + + + TRACE;DEBUG + full + bin\$(Platform)\Debug\ + + + TRACE + pdbonly + true + bin\$(Platform)\Release\ + + + $(DefineConstants);DOTNETCORE + + + + + + + + + + \ No newline at end of file diff --git a/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec new file mode 100644 index 000000000..0694ac4c4 --- /dev/null +++ b/cs/src/devices/AzureStorageDevice/FASTER.devices.AzureStorageDevice.nuspec @@ -0,0 +1,34 @@ + + + + FASTER.devices.AzureStorageDevice + $version$ + FASTER.devices.AzureStorageDevice + Microsoft + Microsoft + https://github.com/Microsoft/FASTER + https://raw.githubusercontent.com/Microsoft/FASTER/master/LICENSE + true + IDevice FASTER is a fast concurrent key-value store that also supports indexing of larger-than-memory data. This is a FASTER IDevice implementation for Azure Storage. + See the project website at https://github.com/Microsoft/FASTER for more details + © Microsoft Corporation. All rights reserved. + en-US + key-value store dictionary hashtable concurrent log persistent azure storage FASTER + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/cs/test/BasicDiskFASTERTests.cs b/cs/test/BasicDiskFASTERTests.cs index a84abb206..0d8ad497f 100644 --- a/cs/test/BasicDiskFASTERTests.cs +++ b/cs/test/BasicDiskFASTERTests.cs @@ -10,37 +10,37 @@ using FASTER.core; using System.IO; using NUnit.Framework; +using FASTER.devices; +using System.Diagnostics; namespace FASTER.test { - [TestFixture] - internal class BasicDiskFASTERTests + internal class BasicStorageFASTERTests { private FasterKV fht; - private IDevice log; + public const string EMULATED_STORAGE_STRING = "UseDevelopmentStorage=true;"; + public const string TEST_CONTAINER = "test"; - [SetUp] - public void Setup() + [Test] + public void LocalStorageWriteRead() { - log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\BasicDiskFASTERTests.log", deleteOnClose: true); - fht = new FasterKV - (1L<<20, new Functions(), new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 10 }); - fht.StartSession(); + TestDeviceWriteRead(Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\BasicDiskFASTERTests.log", deleteOnClose: true)); } - [TearDown] - public void TearDown() + [Test] + public void PageBlobWriteRead() { - fht.StopSession(); - fht.Dispose(); - fht = null; - log.Close(); + if ("yes".Equals(Environment.GetEnvironmentVariable("RunAzureTests"))) + TestDeviceWriteRead(new AzureStorageDevice(EMULATED_STORAGE_STRING, TEST_CONTAINER, "BasicDiskFASTERTests", false)); } - [Test] - public void NativeDiskWriteRead() + void TestDeviceWriteRead(IDevice log) { + fht = new FasterKV + (1L << 20, new Functions(), new LogSettings { LogDevice = log, MemorySizeBits = 15, PageSizeBits = 10 }); + fht.StartSession(); + InputStruct input = default(InputStruct); for (int i = 0; i < 2000; i++) @@ -86,6 +86,11 @@ public void NativeDiskWriteRead() } } } + + fht.StopSession(); + fht.Dispose(); + fht = null; + log.Close(); } } } diff --git a/cs/test/FASTER.test.csproj b/cs/test/FASTER.test.csproj index a6c0cc437..41d0226cc 100644 --- a/cs/test/FASTER.test.csproj +++ b/cs/test/FASTER.test.csproj @@ -44,5 +44,6 @@ +