Skip to content

Commit

Permalink
Update position before ReadAsync starts, but fix it after incomplete …
Browse files Browse the repository at this point in the history
…read (dotnet#56531)

* move CanRead and CanWrite checks to FileStream

* don't check IsClosed twice_(FileHandle.CanSeek already contains a IsClosed check)

* add a failing test

* handle incomplete async reads

* don't try to cache file length when file is opened for writing, as updating file position before performing async write can lead to invalid cached length value

* maybe Win 7 & 8 fix
  • Loading branch information
adamsitnik authored Jul 31, 2021
1 parent 7d9a08c commit e176fdf
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 113 deletions.
34 changes: 34 additions & 0 deletions src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Linq;
using System.Security.Cryptography;
using System.Threading;
using System.Threading.Tasks;
using Xunit;
Expand Down Expand Up @@ -92,6 +94,38 @@ public async Task ReadAsyncCanceledFile()
}
}
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[InlineData(FileShare.None, FileOptions.Asynchronous)] // FileShare.None: exclusive access
[InlineData(FileShare.ReadWrite, FileOptions.Asynchronous)] // FileShare.ReadWrite: others can write to the file, the length can't be cached
[InlineData(FileShare.None, FileOptions.None)]
[InlineData(FileShare.ReadWrite, FileOptions.None)]
public async Task IncompleteReadCantSetPositionBeyondEndOfFile(FileShare fileShare, FileOptions options)
{
const int fileSize = 10_000;
string filePath = GetTestFilePath();
byte[] content = RandomNumberGenerator.GetBytes(fileSize);
File.WriteAllBytes(filePath, content);

byte[][] buffers = Enumerable.Repeat(Enumerable.Repeat(byte.MaxValue, fileSize * 2).ToArray(), 10).ToArray();

using (FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, fileShare, bufferSize: 0, options))
{
Task<int>[] reads = buffers.Select(buffer => fs.ReadAsync(buffer, 0, buffer.Length)).ToArray();

// the reads were not awaited, it's an anti-pattern and Position can be (0, buffersLength) now:
Assert.InRange(fs.Position, 0, buffers.Sum(buffer => buffer.Length));

await Task.WhenAll(reads);
// but when they are finished, the first buffer should contain valid data:
Assert.Equal(fileSize, reads.First().Result);
AssertExtensions.SequenceEqual(content, buffers.First().AsSpan(0, fileSize));
// and other reads should return 0:
Assert.All(reads.Skip(1), read => Assert.Equal(0, read.Result));
// and the Position must be correct:
Assert.Equal(fileSize, fs.Position);
}
}
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/34582", TestPlatforms.Windows, TargetFrameworkMonikers.Netcoreapp, TestRuntimes.Mono)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Buffers;
using System.Diagnostics;
using System.IO;
using System.IO.Strategies;
using System.Threading;
using System.Threading.Tasks.Sources;

Expand Down Expand Up @@ -45,7 +46,9 @@ internal sealed unsafe class OverlappedValueTaskSource : IValueTaskSource<int>,

internal readonly PreAllocatedOverlapped _preallocatedOverlapped;
internal readonly SafeFileHandle _fileHandle;
private AsyncWindowsFileStreamStrategy? _strategy;
internal MemoryHandle _memoryHandle;
private int _bufferSize;
internal ManualResetValueTaskSourceCore<int> _source; // mutable struct; do not make this readonly
private NativeOverlapped* _overlapped;
private CancellationTokenRegistration _cancellationRegistration;
Expand Down Expand Up @@ -74,9 +77,11 @@ internal static Exception GetIOError(int errorCode, string? path)
? ThrowHelper.CreateEndOfFileException()
: Win32Marshal.GetExceptionForWin32Error(errorCode, path);

internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset)
internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory<byte> memory, long fileOffset, AsyncWindowsFileStreamStrategy? strategy = null)
{
_result = 0;
_strategy = strategy;
_bufferSize = memory.Length;
_memoryHandle = memory.Pin();
_overlapped = _fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped);
_overlapped->OffsetLow = (int)fileOffset;
Expand Down Expand Up @@ -132,8 +137,9 @@ internal void RegisterForCancellation(CancellationToken cancellationToken)
}
}

internal void ReleaseResources()
private void ReleaseResources()
{
_strategy = null;
// Unpin any pinned buffer.
_memoryHandle.Dispose();

Expand Down Expand Up @@ -187,11 +193,19 @@ private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped*

internal void Complete(uint errorCode, uint numBytes)
{
Debug.Assert(errorCode == Interop.Errors.ERROR_SUCCESS || numBytes == 0, $"Callback returned {errorCode} error and {numBytes} bytes");

AsyncWindowsFileStreamStrategy? strategy = _strategy;
ReleaseResources();

if (strategy is not null && _bufferSize != numBytes) // true only for incomplete reads
{
strategy.OnIncompleteRead(_bufferSize, (int)numBytes);
}

switch (errorCode)
{
case 0:
case Interop.Errors.ERROR_SUCCESS:
case Interop.Errors.ERROR_BROKEN_PIPE:
case Interop.Errors.ERROR_NO_DATA:
case Interop.Errors.ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file)
Expand Down
36 changes: 28 additions & 8 deletions src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,14 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
{
return Task.FromCanceled<int>(cancellationToken);
}
else if (_strategy.IsClosed)
else if (!_strategy.CanRead)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
if (_strategy.IsClosed)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
}

ThrowHelper.ThrowNotSupportedException_UnreadableStream();
}

return _strategy.ReadAsync(buffer, offset, count, cancellationToken);
Expand All @@ -294,9 +299,14 @@ public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken
{
return ValueTask.FromCanceled<int>(cancellationToken);
}
else if (_strategy.IsClosed)
else if (!_strategy.CanRead)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
if (_strategy.IsClosed)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
}

ThrowHelper.ThrowNotSupportedException_UnreadableStream();
}

return _strategy.ReadAsync(buffer, cancellationToken);
Expand All @@ -319,9 +329,14 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
{
return Task.FromCanceled(cancellationToken);
}
else if (_strategy.IsClosed)
else if (!_strategy.CanWrite)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
if (_strategy.IsClosed)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
}

ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}

return _strategy.WriteAsync(buffer, offset, count, cancellationToken);
Expand All @@ -333,9 +348,14 @@ public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationTo
{
return ValueTask.FromCanceled(cancellationToken);
}
else if (_strategy.IsClosed)
else if (!_strategy.CanWrite)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
if (_strategy.IsClosed)
{
ThrowHelper.ThrowObjectDisposedException_FileClosed();
}

ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}

return _strategy.WriteAsync(buffer, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,21 +242,23 @@ internal static ValueTask<int> ReadAtOffsetAsync(SafeFileHandle handle, Memory<b
return ScheduleSyncReadAtOffsetAsync(handle, buffer, fileOffset, cancellationToken);
}

internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset, CancellationToken cancellationToken)
internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory<byte> buffer, long fileOffset,
CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null)
{
handle.EnsureThreadPoolBindingInitialized();

SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource();
int errorCode = 0;
try
{
NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset);
NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy);
Debug.Assert(vts._memoryHandle.Pointer != null);

// Queue an async ReadFile operation.
if (Interop.Kernel32.ReadFile(handle, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, nativeOverlapped) == 0)
{
// The operation failed, or it's pending.
int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle);
switch (errorCode)
{
case Interop.Errors.ERROR_IO_PENDING:
Expand Down Expand Up @@ -286,6 +288,13 @@ internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int error
vts.Dispose();
throw;
}
finally
{
if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS)
{
strategy?.OnIncompleteRead(buffer.Length, 0);
}
}

// Completion handled by callback.
vts.FinishedScheduling();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,29 @@ public override Task<int> ReadAsync(byte[] buffer, int offset, int count, Cancel
public override ValueTask<int> ReadAsync(Memory<byte> destination, CancellationToken cancellationToken = default)
=> ReadAsyncInternal(destination, cancellationToken);

private unsafe ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
private ValueTask<int> ReadAsyncInternal(Memory<byte> destination, CancellationToken cancellationToken)
{
if (!CanRead)
if (!CanSeek)
{
ThrowHelper.ThrowNotSupportedException_UnreadableStream();
return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken);
}

long positionBefore = _filePosition;
if (CanSeek)
if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length)
{
long len = Length;
if (positionBefore + destination.Length > len)
{
destination = positionBefore <= len ?
destination.Slice(0, (int)(len - positionBefore)) :
default;
}

// When using overlapped IO, the OS is not supposed to
// touch the file pointer location at all. We will adjust it
// ourselves, but only in memory. This isn't threadsafe.
_filePosition += destination.Length;

// We know for sure that there is nothing to read, so we just return here and avoid a sys-call.
if (destination.IsEmpty && LengthCachingSupported)
{
return ValueTask.FromResult(0);
}
// We know for sure that the file length can be safely cached and it has already been obtained.
// If we have reached EOF we just return here and avoid a sys-call.
return ValueTask.FromResult(0);
}

(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, positionBefore, cancellationToken);
// This implementation updates the file position before the operation starts and updates it after incomplete read.
// This is done to keep backward compatibility for concurrent reads.
// It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time.
long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;

(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, readOffset, cancellationToken, this);
return vts != null
? new ValueTask<int>(vts, vts.Version)
: (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(positionBefore, errorCode));
: (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException<int>(HandleIOError(readOffset, errorCode));
}

public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
Expand All @@ -69,35 +58,22 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
=> WriteAsyncInternal(buffer, cancellationToken);

private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
private ValueTask WriteAsyncInternal(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
if (!CanWrite)
{
ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}

long positionBefore = _filePosition;
if (CanSeek)
{
// When using overlapped IO, the OS is not supposed to
// touch the file pointer location at all. We will adjust it
// ourselves, but only in memory. This isn't threadsafe.
_filePosition += source.Length;
UpdateLengthOnChangePosition();
}
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;

(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, positionBefore, cancellationToken);
(SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, writeOffset, cancellationToken);
return vts != null
? new ValueTask(vts, vts.Version)
: (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(positionBefore, errorCode));
: (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode));
}

private Exception HandleIOError(long positionBefore, int errorCode)
{
if (!_fileHandle.IsClosed && CanSeek)
if (_fileHandle.CanSeek)
{
// Update Position... it could be anywhere.
_filePosition = positionBefore;
Interlocked.Exchange(ref _filePosition, positionBefore);
}

return SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ internal abstract class OSFileStreamStrategy : FileStreamStrategy
private readonly FileAccess _access; // What file was opened for.

protected long _filePosition;
protected long _length = -1; // negative means that hasn't been fetched.
private long _appendStart; // When appending, prevent overwriting file.
private long _length = -1; // When the file is locked for writes on Windows ((share & FileShare.Write) == 0) cache file length in-memory, negative means that hasn't been fetched.
private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed and FileShare.Write was not specified when the handle was opened.
private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed, file has been opened for reading and not shared for writing.

internal OSFileStreamStrategy(SafeFileHandle handle, FileAccess access)
{
Expand All @@ -44,7 +44,7 @@ internal OSFileStreamStrategy(string path, FileMode mode, FileAccess access, Fil
string fullPath = Path.GetFullPath(path);

_access = access;
_lengthCanBeCached = (share & FileShare.Write) == 0;
_lengthCanBeCached = (share & FileShare.Write) == 0 && (access & FileAccess.Write) == 0;

_fileHandle = SafeFileHandle.Open(fullPath, mode, access, share, options, preallocationSize);

Expand Down Expand Up @@ -96,21 +96,9 @@ public unsafe sealed override long Length
}
}

protected void UpdateLengthOnChangePosition()
{
// Do not update the cached length if the file is not locked
// or if the length hasn't been fetched.
if (!LengthCachingSupported || _length < 0)
{
Debug.Assert(_length < 0);
return;
}

if (_filePosition > _length)
{
_length = _filePosition;
}
}
// in case of concurrent incomplete reads, there can be multiple threads trying to update the position
// at the same time. That is why we are using Interlocked here.
internal void OnIncompleteRead(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead);

protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached;

Expand Down Expand Up @@ -287,18 +275,8 @@ public sealed override void Write(ReadOnlySpan<byte> buffer)
ThrowHelper.ThrowNotSupportedException_UnwritableStream();
}

try
{
RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
}
catch
{
_length = -1; // invalidate cached length
throw;
}

RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition);
_filePosition += buffer.Length;
UpdateLengthOnChangePosition();
}
}
}
Loading

0 comments on commit e176fdf

Please sign in to comment.