Skip to content

Commit

Permalink
Implement a non-blocking read for BroadcastChan's.
Browse files Browse the repository at this point in the history
  • Loading branch information
merijn committed Aug 25, 2022
1 parent f255f9b commit 2842a60
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 39 deletions.
4 changes: 3 additions & 1 deletion broadcast-chan-conduit/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
0.3.0 [????.??.??]
------------------
* Add missing reexports of `readBChan`, `writeBChan`, and `BChanError` to
`BroadcastChan.Pipes` and `BroadcastChan.Pipes.Throw`.
`BroadcastChan.Conduit` and `BroadcastChan.Conduit.Throw`.
* Add reexports for the new `tryReadBChan` in `BroadcastChan.Conduit` and
`BroadcastChan.Conduit.Throw`.

0.2.1.2 [2022.08.24]
--------------------
Expand Down
1 change: 1 addition & 0 deletions broadcast-chan-pipes/BroadcastChan/Pipes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ module BroadcastChan.Pipes
, newBChanListener
-- ** Basic Operations
, readBChan
, tryReadBChan
, writeBChan
, closeBChan
, isClosedBChan
Expand Down
1 change: 1 addition & 0 deletions broadcast-chan-pipes/BroadcastChan/Pipes/Throw.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ module BroadcastChan.Pipes.Throw
, newBChanListener
-- ** Basic Operations
, readBChan
, tryReadBChan
, writeBChan
, closeBChan
, isClosedBChan
Expand Down
2 changes: 2 additions & 0 deletions broadcast-chan-pipes/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
------------------
* Add missing reexports of `readBChan`, `writeBChan`, and `BChanError` to
`BroadcastChan.Pipes` and `BroadcastChan.Pipes.Throw`.
* Add reexports for the new `tryReadBChan` in `BroadcastChan.Pipes` and
`BroadcastChan.Pipes.Throw`.

0.2.1.2 [2022.08.24]
--------------------
Expand Down
2 changes: 1 addition & 1 deletion broadcast-chan-tests/BroadcastChan/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ infix 0 @?
-- action.
--
-- @since 0.2.0
expect :: (Eq e, Exception e) => e -> IO () -> Assertion
expect :: (Eq e, Exception e) => e -> IO a -> Assertion
expect err act = do
result <- try act
case result of
Expand Down
121 changes: 85 additions & 36 deletions broadcast-chan-tests/tests/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,53 +19,101 @@ shouldn'tBlock act = do
Nothing -> assertFailure "Shouldn't block!"
Just a -> return a

shouldBlock :: IO a -> IO Bool
shouldBlock act = do
result <- timeout 2000000 act
case result of
Nothing -> return True
Just _ -> return False

checkedWrite :: BroadcastChan In a -> a -> IO ()
checkedWrite chan val = writeBChan chan val @? "Write shouldn't fail"

randomList :: Int -> IO [Int]
randomList n = take n . randoms <$> getStdGen

readTests :: TestTree
readTests = testGroup "read tests"
[ readNonEmpty
, readNonEmptyThrow
, readEmptyClosed
, readEmptyClosedThrow
]
where
readNonEmpty :: TestTree
readNonEmpty = testCase "read non-empty" $ do
inChan <- newBroadcastChan
outChan <- newBChanListener inChan
val <- randomIO :: IO Int
data ChanContent b = ChanEmpty (IO b -> Assertion) | ChanNonEmpty (Int -> b)
data ChanState = ChanOpen | ChanClosed

writeBChan inChan val
result <- shouldn'tBlock $ readBChan outChan
assertEqual "Read should match write" (Just val) result
readBoilerPlate
:: (Eq b, Show b)
=> String
-> ChanState
-> ChanContent b
-> (BroadcastChan Out Int -> IO b)
-> TestTree
readBoilerPlate name state content getResult = testCase name $ do
inChan <- newBroadcastChan
outChan <- newBChanListener inChan
val <- randomIO :: IO Int

readNonEmptyThrow :: TestTree
readNonEmptyThrow = testCase "read non-empty (throw)" $ do
inChan <- newBroadcastChan
outChan <- newBChanListener inChan
val <- randomIO :: IO Int
let handleRead = case content of
ChanNonEmpty conv ->
(>>= assertEqual "Read should match write" (conv val))
ChanEmpty conv -> conv

writeBChan inChan val
result <- shouldn'tBlock $ Throw.readBChan outChan
assertEqual "Read should match write" val result
case content of
ChanEmpty{} -> return ()
ChanNonEmpty{} -> Throw.writeBChan inChan val

readEmptyClosed :: TestTree
readEmptyClosed = testCase "read empty closed" $ do
inChan <- newBroadcastChan
outChan <- newBChanListener inChan
closeBChan inChan
isNothing <$> shouldn'tBlock (readBChan outChan) @? "Read should fail"
case state of
ChanOpen -> return ()
ChanClosed -> () <$ closeBChan inChan

readEmptyClosedThrow :: TestTree
readEmptyClosedThrow = testCase "read empty closed (throw)" $ do
inChan <- newBroadcastChan
outChan <- newBChanListener inChan
closeBChan inChan
expect ReadFailed . shouldn'tBlock $ Throw.readBChan outChan
handleRead $ getResult outChan

readTests :: TestTree
readTests = testGroup "read tests"
[ readBoilerPlate "read non-empty" ChanOpen nonEmpty $
shouldn'tBlock . readBChan
, readBoilerPlate "read non-empty (throw)" ChanOpen nonEmptyThrow $
shouldn'tBlock . Throw.readBChan
, readBoilerPlate "read non-empty closed" ChanClosed nonEmpty $
shouldn'tBlock . readBChan
, readBoilerPlate "read non-empty closed (throw)" ChanClosed nonEmptyThrow $
shouldn'tBlock . Throw.readBChan
, readBoilerPlate "read empty" ChanOpen emptyBlock $
shouldBlock . readBChan
, readBoilerPlate "read empty (throw)" ChanOpen emptyBlock $
shouldBlock . Throw.readBChan
, readBoilerPlate "read empty closed" ChanClosed emptyNonBlock $
fmap isNothing . shouldn'tBlock . readBChan
, readBoilerPlate "read empty closed (throw)" ChanClosed emptyThrow $
shouldn'tBlock . Throw.readBChan
]
where
nonEmpty = ChanNonEmpty Just
nonEmptyThrow = ChanNonEmpty id
emptyBlock = ChanEmpty $ (@? "Read should block")
emptyNonBlock = ChanEmpty $ (@? "Read shouldn't block")
emptyThrow = ChanEmpty $ expect ReadFailed

tryReadTests :: TestTree
tryReadTests = testGroup "try read tests"
[ readBoilerPlate "try read non-empty" ChanOpen nonEmpty $
shouldn'tBlock . tryReadBChan
, readBoilerPlate "try read non-empty (throw)" ChanOpen nonEmptyThrow $
shouldn'tBlock . Throw.tryReadBChan
, readBoilerPlate "try read non-empty closed" ChanClosed nonEmpty $
shouldn'tBlock . tryReadBChan
, readBoilerPlate "try read non-empty closed (throw)" ChanClosed nonEmptyThrow $
shouldn'tBlock . Throw.tryReadBChan
, readBoilerPlate "try read empty" ChanOpen empty $
shouldn'tBlock . tryReadBChan
, readBoilerPlate "try read empty (throw)" ChanOpen empty $
shouldn'tBlock . Throw.tryReadBChan
, readBoilerPlate "try read empty closed" ChanClosed emptyClosed $
shouldn'tBlock . tryReadBChan
, readBoilerPlate "try read empty closed (throw)" ChanClosed emptyThrow $
shouldn'tBlock . Throw.tryReadBChan
]
where
nonEmpty = ChanNonEmpty $ Just . Just
nonEmptyThrow = ChanNonEmpty Just
empty = ChanEmpty $ (@? "Read shouldn't block") . fmap isNothing
emptyClosed = ChanEmpty $
(>>= assertEqual "Expect successful nothing" (Just Nothing))
emptyThrow = ChanEmpty $ expect ReadFailed

writeTests :: TestTree
writeTests = testGroup "write tests"
Expand Down Expand Up @@ -310,6 +358,7 @@ foldlTests = testGroup "foldl tests"
main :: IO ()
main = runTests "basic"
[ readTests
, tryReadTests
, writeTests
, closedTests
, chanContentsTests
Expand Down
1 change: 1 addition & 0 deletions broadcast-chan/BroadcastChan.hs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ module BroadcastChan (
, newBChanListener
-- * Basic Operations
, readBChan
, tryReadBChan
, writeBChan
, closeBChan
, isClosedBChan
Expand Down
24 changes: 24 additions & 0 deletions broadcast-chan/BroadcastChan/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,30 @@ readBChan (BChan readVar) = liftIO $ do
Closed -> return (read_end, Nothing)
{-# INLINE readBChan #-}

-- | A non-blocking version of 'readBChan'. Returns immediately with 'Nothing'
-- if the 'BroadcastChan' is empty.
--
-- The inner 'Maybe' value correspond to the return value of 'readBChan'. That
-- is, 'Nothing' if the 'BroadcastChan' is closed and empty and 'Just'
-- otherwise.
--
-- See @BroadcastChan.Throw.@'BroadcastChan.Throw.tryReadBChan' for an
-- exception throwing variant.
--
-- @since 0.3.0
tryReadBChan :: MonadIO m => BroadcastChan Out a -> m (Maybe (Maybe a))
tryReadBChan (BChan readVar) = liftIO $ do
modifyMVarMasked readVar $ \read_end -> do -- Note [modifyMVarMasked]
-- Use tryReadMVar here, not tryTakeMVar,
-- else newBChanListener doesn't work
result <- tryReadMVar read_end
case result of
Nothing -> return (read_end, Nothing)
Just Closed -> return (read_end, Just Nothing)
Just (ChItem val new_read_end) ->
return (new_read_end, Just (Just val))
{-# INLINE tryReadBChan #-}

-- Note [modifyMVarMasked]
-- This prevents a theoretical deadlock if an asynchronous exception
-- happens during the readMVar while the MVar is empty. In that case
Expand Down
16 changes: 15 additions & 1 deletion broadcast-chan/BroadcastChan/Throw.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
module BroadcastChan.Throw
( BChanError(..)
, readBChan
, tryReadBChan
, writeBChan
-- * Re-exports from "BroadcastChan"
-- ** Datatypes
Expand Down Expand Up @@ -49,7 +50,7 @@ import Control.Monad.IO.Unlift (MonadIO(..))
import Control.Exception (Exception, throwIO)
import Data.Typeable (Typeable)

import BroadcastChan hiding (writeBChan, readBChan)
import BroadcastChan hiding (writeBChan, readBChan, tryReadBChan)
import qualified BroadcastChan as Internal

-- | Exception type for 'BroadcastChan' operations.
Expand All @@ -73,6 +74,19 @@ readBChan ch = do
Nothing -> liftIO $ throwIO ReadFailed
Just x -> return x
{-# INLINE readBChan #-}
--
-- | Like 'Internal.tryReadBChan', but throws a 'ReadFailed' exception when
-- reading from a closed and empty 'BroadcastChan'.
--
-- @since 0.3.0
tryReadBChan :: MonadIO m => BroadcastChan Out a -> m (Maybe a)
tryReadBChan ch = do
result <- Internal.tryReadBChan ch
case result of
Nothing -> return Nothing
Just Nothing -> liftIO $ throwIO ReadFailed
Just v -> return v
{-# INLINE tryReadBChan #-}

-- | Like 'Internal.writeBChan', but throws a 'WriteFailed' exception when
-- writing to closed 'BroadcastChan'.
Expand Down
2 changes: 2 additions & 0 deletions broadcast-chan/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
* Fixed Haddock links.
* Generalised `readBChan`/`writeBChan` in `BroadcastChan.Throw` to use
`MonadIO`.
* Add non-blocking `tryReadBChan` in `BroadcastChan` and `BroadcastChan.Throw`,
fixes [#12](https://github.com/merijn/broadcast-chan/issues/12).

0.2.1.2 [2022.08.24]
--------------------
Expand Down

0 comments on commit 2842a60

Please sign in to comment.