Skip to content

Commit

Permalink
Abstraction for blockio-style file operations
Browse files Browse the repository at this point in the history
Changes include:
* Add blockio-uring dependency
* Add an abstract API that captures the file
operations from `blockio-uring`.
* Implementations of this for three different
operating systems: Linux, MacOS, or Windows. The
Linux implementation uses `blockio-uring` and
benefits from async IO. MacOS and Windows use a
simple implementation that performs file I/O
sequentially instead of in asynchronous batches.
* Implement some basic tests for the API.
  • Loading branch information
jorisdral committed Mar 19, 2024
1 parent 0a8b9ec commit 57bb4f0
Show file tree
Hide file tree
Showing 12 changed files with 509 additions and 2 deletions.
19 changes: 19 additions & 0 deletions .github/workflows/haskell.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ jobs:
cabal-version: ${{ matrix.cabal }}
cabal-update: true

- name: Install liburing (on Linux)
id: setup-liburing
if: matrix.os == 'ubuntu-latest'
run: |
sudo apt-get update
sudo apt-get -y install pkg-config
echo "PKG_CONFIG_PATH=$PKG_CONFIG_PATH"
mkdir tmp
cd tmp
git clone https://github.com/axboe/liburing.git
cd liburing
git checkout liburing-2.5
./configure --cc=gcc --cxx=g++
make -j$(nproc)
sudo make install
cd ../..
sudo rm -rf ./tmp
pkg-config --modversion liburing
- name: Configure the build
run: |
cabal configure --enable-tests --enable-benchmark --ghc-options="-Werror" --ghc-options="-fno-ignore-asserts"
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ It has a number of custom features that are primarily tailored towards performan

## System requirements

This library only supports 64-bit, little-endian systems.
This library only supports 64-bit, little-endian systems.

Provide the -threaded flag to executables, test suites and benchmark suites if
you use this library on Linux systems.
15 changes: 15 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,18 @@ package lsm-tree
-- apply this to all components
-- relevant mostly only for development & testing
ghc-options: -fno-ignore-asserts

if(os(linux))
source-repository-package
type: git
location: https://github.com/well-typed/blockio-uring
tag: bbeb81130ec3eafd8ced81564cc8bd46d24aff08

-- fs-api with support for I/O using user-supplied buffers
source-repository-package
type: git
location: https://github.com/input-output-hk/fs-sim
tag: 6a4a456640dd1fed434ccb4cbb553482afe8e2d4
subdir:
fs-api
fs-sim
99 changes: 99 additions & 0 deletions fs-api-blockio/src-linux/System/FS/BlockIO/Async.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{-# LANGUAGE NamedFieldPuns #-}

module System.FS.BlockIO.Async (
asyncHasBlockIO
) where

import Control.Exception
import qualified Control.Exception as E
import Control.Monad
import Foreign.C.Error
import GHC.IO.Exception
import GHC.Stack
import System.FS.API (BufferOffset (..), FsErrorPath, Handle (..),
HasFS (..), SomeHasFS (..), ioToFsError)
import qualified System.FS.BlockIO.API as API
import System.FS.BlockIO.API (IOOp (..), IOResult (..), ioopHandle)
import System.FS.IO (HandleIO)
import System.FS.IO.Internal.Handle
import qualified System.IO.BlockIO as I
import System.IO.Error (ioeSetErrorString, isResourceVanishedError)
import System.Posix.Types

-- | IO instantiation of 'HasBlockIO', using @blockio-uring@.
asyncHasBlockIO :: HasFS IO HandleIO -> Maybe API.IOCtxParams -> IO (API.HasBlockIO IO HandleIO)
asyncHasBlockIO hasFS ctxParams = do
ctx <- I.initIOCtx (maybe I.defaultIOCtxParams ctxParamsConv ctxParams)
pure $ API.HasBlockIO {
API.close = I.closeIOCtx ctx
, API.submitIO = submitIO hasFS ctx
}

ctxParamsConv :: API.IOCtxParams -> I.IOCtxParams
ctxParamsConv API.IOCtxParams{API.ioctxBatchSizeLimit, API.ioctxConcurrencyLimit} =
I.IOCtxParams {
I.ioctxBatchSizeLimit = ioctxBatchSizeLimit
, I.ioctxConcurrencyLimit = ioctxConcurrencyLimit
}

submitIO ::
HasFS IO HandleIO
-> I.IOCtx
-> [IOOp IO HandleIO]
-> IO [IOResult]
submitIO hasFS ioctx ioops = do
ioops' <- mapM ioopConv ioops
ress <- I.submitIO ioctx ioops' `catch` rethrowClosedError
zipWithM rethrowErrno ioops ress
where
rethrowClosedError :: IOError -> IO a
rethrowClosedError e@IOError{} =
-- Pattern matching on the error is brittle, because the structure of
-- the exception might change between versions of @blockio-uring@.
-- Nonetheless, it's better than nothing.
if isResourceVanishedError e && ioe_location e == "IOCtx closed"
then throwIO (API.mkClosedError (SomeHasFS hasFS) "submitIO")
else throwIO e

rethrowErrno ::
HasCallStack
=> IOOp IO HandleIO
-> I.IOResult
-> IO IOResult
rethrowErrno ioop res = do
case res of
I.IOResult c -> pure (IOResult c)
I.IOError e -> throwAsFsError e
where
throwAsFsError :: HasCallStack => Errno -> IO a
throwAsFsError errno = E.throwIO $ ioToFsError fep (fromErrno errno)

fep :: FsErrorPath
fep = mkFsErrorPath hasFS (handlePath (ioopHandle ioop))

fromErrno :: Errno -> IOError
fromErrno errno = ioeSetErrorString
(errnoToIOError "submitIO" errno Nothing Nothing)
("submitIO failed: " <> ioopType)

ioopType :: String
ioopType = case ioop of
IOOpRead{} -> "IOOpRead"
IOOpWrite{} -> "IOOpWrite"

ioopConv :: IOOp IO HandleIO -> IO (I.IOOp IO)
ioopConv (IOOpRead h off buf bufOff c) = handleFd h >>= \fd ->
pure (I.IOOpRead fd off buf (unBufferOffset bufOff) c)
ioopConv (IOOpWrite h off buf bufOff c) = handleFd h >>= \fd ->
pure (I.IOOpWrite fd off buf (unBufferOffset bufOff) c)

-- This only checks whether the handle is open when we convert to an Fd. After
-- that, the handle could be closed when we're still performing blockio
-- operations.
--
-- TODO: if the handle were to have a reader/writer lock, then we could take the
-- reader lock in 'submitIO'. However, the current implementation of 'Handle'
-- only allows mutally exclusive access to the underlying file descriptor, so it
-- would require a change in @fs-api@. See [fs-sim#49].
handleFd :: Handle HandleIO -> IO Fd
handleFd h = withOpenHandle "submitIO" (handleRaw h) pure
15 changes: 15 additions & 0 deletions fs-api-blockio/src-linux/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module System.FS.BlockIO.Internal (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Async as I
import System.FS.IO (HandleIO)

ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO hfs _bhfs = I.asyncHasBlockIO hfs
20 changes: 20 additions & 0 deletions fs-api-blockio/src-macos/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module System.FS.BlockIO.Internal (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Serial as Serial
import System.FS.IO (HandleIO)

-- | For now we use the portable serial implementation of HasBlockIO. If you
-- want to provide a proper async I/O implementation for OSX, then this is where
-- you should put it.
--
-- The recommended choice would be to use the POSIX AIO API.
ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO hasFS hasBufFS _ = Serial.serialHasBlockIO hasFS hasBufFS
20 changes: 20 additions & 0 deletions fs-api-blockio/src-windows/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module System.FS.BlockIO.Internal (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Serial as Serial
import System.FS.IO (HandleIO)

-- | For now we use the portable serial implementation of HasBlockIO. If you
-- want to provide a proper async I/O implementation for Windows, then this is
-- where you should put it.
--
-- The recommended choice would be to use the Win32 IOCP API.
ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO hasFS hasBufFS _ = Serial.serialHasBlockIO hasFS hasBufFS
64 changes: 64 additions & 0 deletions fs-api-blockio/src/System/FS/BlockIO/API.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}

module System.FS.BlockIO.API (
HasBlockIO (..)
, IOCtxParams (..)
, mkClosedError
, IOOp (..)
, ioopHandle
, IOResult (..)
-- * Re-exports
, ByteCount
, FileOffset
) where

import Control.Monad.Primitive (PrimMonad (PrimState))
import Data.Primitive.ByteArray (MutableByteArray)
import GHC.IO.Exception (IOErrorType (ResourceVanished))
import System.FS.API
import System.IO.Error (ioeSetErrorString, mkIOError)
import System.Posix.Types (ByteCount, FileOffset)
import Util.CallStack

-- | Abstract interface for submitting large batches of I\/O operations.
data HasBlockIO m h = HasBlockIO {
-- | (Idempotent) close the interface.
--
-- Using 'submitIO' after 'close' should thrown an 'FsError' exception. See
-- 'mkClosedError'.
close :: HasCallStack => m ()
-- | Submit a batch of I\/O operations and wait for the result.
--
-- Results correspond to input 'IOOp's in a pair-wise manner, i.e., one can
-- match 'IOOp's with 'IOResult's by zipping the input and output list.
--
-- If any of the I\/O operations fails, an 'FsError' exception will be thrown.
, submitIO :: HasCallStack => [IOOp m h] -> m [IOResult]
}

-- | Concurrency parameters for initialising a 'HasBlockIO. Can be ignored by
-- serial implementations.
data IOCtxParams = IOCtxParams {
ioctxBatchSizeLimit :: !Int,
ioctxConcurrencyLimit :: !Int
}

mkClosedError :: HasCallStack => SomeHasFS m -> String -> FsError
mkClosedError (SomeHasFS hasFS) loc = ioToFsError (mkFsErrorPath hasFS (mkFsPath [])) ioerr
where ioerr =
ioeSetErrorString
(mkIOError ResourceVanished loc Nothing Nothing)
("HasBlockIO closed: " <> loc)


data IOOp m h =
IOOpRead !(Handle h) !FileOffset !(MutableByteArray (PrimState m)) !BufferOffset !ByteCount
| IOOpWrite !(Handle h) !FileOffset !(MutableByteArray (PrimState m)) !BufferOffset !ByteCount

ioopHandle :: IOOp m h -> Handle h
ioopHandle (IOOpRead h _ _ _ _) = h
ioopHandle (IOOpWrite h _ _ _ _) = h

-- | Number of read/written bytes.
newtype IOResult = IOResult ByteCount
16 changes: 16 additions & 0 deletions fs-api-blockio/src/System/FS/BlockIO/IO.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module System.FS.BlockIO.IO (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Internal as I
import System.FS.IO (HandleIO)

-- | Platform-dependent IO instantiation of 'HasBlockIO'.
ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO = I.ioHasBlockIO
60 changes: 60 additions & 0 deletions fs-api-blockio/src/System/FS/BlockIO/Serial.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

module System.FS.BlockIO.Serial (
serialHasBlockIO
) where

import Control.Concurrent.Class.MonadMVar
import Control.Monad (unless)
import Control.Monad.Class.MonadThrow
import System.FS.API
import qualified System.FS.BlockIO.API as API
import System.FS.BlockIO.API (IOOp (..), IOResult (..))

-- | IO instantiation of 'HasBlockIO', using an existing 'HasFS'. Thus this
-- implementation does not take advantage of parallel I/O.
serialHasBlockIO ::
(MonadThrow m, MonadMVar m, Eq h)
=> HasFS m h
-> HasBufFS m h
-> m (API.HasBlockIO m h)
serialHasBlockIO hfs hbfs = do
ctx <- initIOCtx (SomeHasFS hfs)
pure $ API.HasBlockIO {
API.close = close ctx
, API.submitIO = submitIO hfs hbfs ctx
}

data IOCtx m = IOCtx { ctxFS :: SomeHasFS m, openVar :: MVar m Bool }

guardIsOpen :: (MonadMVar m, MonadThrow m) => IOCtx m -> m ()
guardIsOpen ctx = readMVar (openVar ctx) >>= \b ->
unless b $ throwIO (API.mkClosedError (ctxFS ctx) "submitIO")

initIOCtx :: MonadMVar m => SomeHasFS m -> m (IOCtx m)
initIOCtx someHasFS = IOCtx someHasFS <$> newMVar True

close :: MonadMVar m => IOCtx m -> m ()
close ctx = modifyMVar_ (openVar ctx) $ const (pure False)

submitIO ::
(MonadMVar m, MonadThrow m)
=> HasFS m h
-> HasBufFS m h
-> IOCtx m
-> [IOOp m h]
-> m [IOResult]
submitIO hfs hbfs ctx ioops = do
guardIsOpen ctx
mapM (ioop hfs hbfs) ioops

-- | Perform the IOOp using synchronous I\/O.
ioop ::
MonadThrow m
=> HasFS m h
-> HasBufFS m h
-> IOOp m h
-> m IOResult
ioop hfs hbfs (IOOpRead h off buf bufOff c) =
IOResult <$> hGetBufExactlyAt hfs hbfs h buf bufOff c (fromIntegral off)
ioop _hfs hbfs (IOOpWrite h off buf bufOff c) =
IOResult <$> hPutBufExactlyAt hbfs h buf bufOff c (fromIntegral off)
Loading

0 comments on commit 57bb4f0

Please sign in to comment.