Skip to content

Commit

Permalink
redis exec. exec class to begin sharing code
Browse files Browse the repository at this point in the history
  • Loading branch information
glutamate committed Oct 5, 2017
1 parent 91c7b78 commit 28d56f4
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
FunFlow
========
3 changes: 3 additions & 0 deletions funflow.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Library
, Control.FunFlow.Steps
, Control.FunFlow.Pretty
, Control.FunFlow.Exec.Local
, Control.FunFlow.Exec.Class
, Control.FunFlow.Exec.Redis

Build-depends:
base >= 4.6 && <5
Expand All @@ -42,6 +44,7 @@ Library
, random
, pretty
, bytestring
, hedis

Test-suite test-funflow
type: exitcode-stdio-1.0
Expand Down
78 changes: 78 additions & 0 deletions src/Control/FunFlow/Exec/Class.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
{-# LANGUAGE Arrows, GADTs, OverloadedStrings, TupleSections, TypeFamilies #-}

module Control.FunFlow.Exec.Class where

import Control.FunFlow.Base

import Data.Store
import Data.Either (lefts)
import Data.List
import qualified Data.Text as T
import Control.Monad.State.Strict
import Data.Monoid ((<>))
import Control.Exception
import Control.Monad.IO.Class ()

class MonadIO m => FlowM m where
type FlowS m
fresh :: m T.Text
lookupSym :: Store a => T.Text -> m (Maybe a)
putSym :: Store a => T.Text -> a -> m ()
getState :: m (FlowS m)
restoreState :: FlowS m -> m ()

puttingSym :: (FlowM m, Store a) => T.Text -> Either String a -> m (Either String a)
puttingSym _ l@(Left _) = return l
puttingSym n r@(Right x) = putSym n x >> return r

proceedFlow :: FlowM m => Flow a b -> a -> m (Either String b)
proceedFlow (Name n' f) x = do
n <- (n'<>) <$> fresh
mv <- lookupSym n
case mv of
Just y -> return $ Right y
Nothing -> puttingSym n =<< proceedFlow f x

proceedFlow (Step f) x = do
n <- fresh
mv <- lookupSym n
case mv of
Just y -> return $ Right y
Nothing -> do
ey <- liftIO $ fmap Right (f x)
`catch`
(\e -> return $ Left (show (e::SomeException)))
puttingSym n ey

proceedFlow (Arr f) x = return $ Right $ f x
proceedFlow (Compose f g) x = do
ey <- proceedFlow f x
case ey of
Left s -> return $ Left s
Right y -> proceedFlow g y
proceedFlow (Par f g) (x,y) = do
ew <- proceedFlow f x
ez <- proceedFlow g y
case (ew, ez) of
(Right w, Right z) -> return $ Right (w,z)
_ -> return $ Left $ intercalate " and also " $ lefts [ew] ++ lefts [ez]
proceedFlow (First f) (x,d) = do
ey <- proceedFlow f x
return $ fmap (,d) ey
proceedFlow (Fanin f _) (Left x) = do
proceedFlow f x
proceedFlow (Fanin _ g) (Right x) = do
proceedFlow g x
proceedFlow (Fold fstep) (lst,acc) = go lst acc where
go [] y = return $ Right y
go (x:xs) y0 = do
ey1 <- proceedFlow fstep (x,y0)
case ey1 of
Left err -> return $ Left err
Right y1 -> go xs y1
proceedFlow (Catch f h) x = do
st <- getState
ey <- proceedFlow f x
case ey of
Right y -> return $ Right y
Left err -> restoreState st >> proceedFlow h (x,err)
144 changes: 144 additions & 0 deletions src/Control/FunFlow/Exec/Redis.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
{-# LANGUAGE Arrows, GADTs, OverloadedStrings, TupleSections #-}

module Control.FunFlow.Exec.Redis where

import Control.FunFlow.Base
import Control.FunFlow

import Data.Store
import Data.Either (lefts)
import Data.List
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import qualified Data.Text.Encoding as DTE
import Control.Monad.State.Strict
import Data.Monoid ((<>))
import Control.Exception
import Data.ByteString (ByteString)
import qualified Database.Redis as R

type PureCtx = M.Map T.Text ByteString
type NameSpace = ByteString

--sadly i seem unable to use a EitherT or ErrorT monad
type FlowM a = StateT FlowST R.Redis a

type FlowST = (Freshers, PureCtx, NameSpace)

nameForKey :: T.Text -> FlowM ByteString
nameForKey k = return $ DTE.encodeUtf8 k

fresh :: FlowM T.Text
fresh = do
(fs, ctx, ns) <- get
let (f,nfs) = popFreshers fs
put (nfs, ctx, ns)
return f

fetchSym :: Store a => T.Text -> FlowM (Maybe a)
fetchSym k = do
mv <- lift . R.get =<< nameForKey k
let process (Left _) = return Nothing
process (Right x) = do
putLocal k x
return $ Just x
case mv of
Right (Just v) -> process $ decode v
_ -> return Nothing

eitherToMaybe :: Either a b -> Maybe b
eitherToMaybe (Left _) = Nothing
eitherToMaybe (Right x) = Just x


lookupSym :: Store a => T.Text -> FlowM (Maybe a)
lookupSym k = do
(_, ctx, _) <- get
case M.lookup k ctx of
Nothing -> fetchSym k
Just yv -> case decode yv of
Right y -> return $ Just y
Left _ -> fetchSym k

putSym :: Store a => T.Text -> a -> FlowM ()
putSym k x = do
_ <- lift . (`R.set` (encode x)) =<< nameForKey k
putLocal k x

putLocal :: Store a => T.Text -> a -> FlowM ()
putLocal k x = do
(fs, ctx, ns) <- get
put $ (fs, M.insert k (encode x) ctx, ns)

puttingSym :: Store a => T.Text -> Either String a -> FlowM (Either String a)
puttingSym _ l@(Left _) = return l
puttingSym n r@(Right x) = putSym n x >> return r

{-
runTillDone :: Flow a b -> a -> IO b
runTillDone f x = go M.empty where
go st0 = do
ey <- resumeFlow f x st0
case ey of
Right y -> return y
Left (err, st1) -> do putStrLn $ "Flow failed with "++err
go st1
resumeFlow :: Flow a b -> a -> PureCtx -> IO (Either (String, PureCtx) b)
resumeFlow f ini ctx = do
(ex, st) <- runStateT (proceedFlow f ini) (initFreshers, ctx)
case ex of
Left err -> return $ Left (err,snd st)
Right x -> return $ Right x
-}
proceedFlow :: Flow a b -> a -> FlowM (Either String b)
proceedFlow (Name n' f) x = do
n <- (n'<>) <$> fresh
mv <- lookupSym n
case mv of
Just y -> return $ Right y
Nothing -> puttingSym n =<< proceedFlow f x

proceedFlow (Step f) x = do
n <- fresh
mv <- lookupSym n
case mv of
Just y -> return $ Right y
Nothing -> do
ey <- liftIO $ fmap Right (f x)
`catch`
(\e -> return $ Left (show (e::SomeException)))
puttingSym n ey

proceedFlow (Arr f) x = return $ Right $ f x
proceedFlow (Compose f g) x = do
ey <- proceedFlow f x
case ey of
Left s -> return $ Left s
Right y -> proceedFlow g y
proceedFlow (Par f g) (x,y) = do
ew <- proceedFlow f x
ez <- proceedFlow g y
case (ew, ez) of
(Right w, Right z) -> return $ Right (w,z)
_ -> return $ Left $ intercalate " and also " $ lefts [ew] ++ lefts [ez]
proceedFlow (First f) (x,d) = do
ey <- proceedFlow f x
return $ fmap (,d) ey
proceedFlow (Fanin f _) (Left x) = do
proceedFlow f x
proceedFlow (Fanin _ g) (Right x) = do
proceedFlow g x
proceedFlow (Fold fstep) (lst,acc) = go lst acc where
go [] y = return $ Right y
go (x:xs) y0 = do
ey1 <- proceedFlow fstep (x,y0)
case ey1 of
Left err -> return $ Left err
Right y1 -> go xs y1
proceedFlow (Catch f h) x = do
st <- get
ey <- proceedFlow f x
case ey of
Right y -> return $ Right y
Left err -> put st >> proceedFlow h (x,err)
8 changes: 7 additions & 1 deletion src/Control/FunFlow/Steps.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,27 @@ worstBernoulli = Step $ \p -> do
then return r
else error $ "worstBernoulli fail with "++ show r++ " > "++show p


-- | pause for a given number of seconds. Thread through a value to ensure
-- delay does not happen inparallel with other processing
pauseWith :: Store a => Flow (Int, a) a
pauseWith = Step $ \(secs,a) -> do
threadDelay (secs*1000000)
return a

-- | Map a Flow over a list
mapF :: Flow a b -> Flow [a] [b]
mapF f = (,[]) ^>> (Fold $ proc (x,ys) -> do
y <- f -< x
returnA -< y:ys) >>> arr reverse

-- | Filter a list
filterF :: Flow a Bool -> Flow [a] [a]
filterF f = (,[]) ^>> (Fold $ proc (x,ys) -> do
b <- f -< x
returnA -< if b then x:ys else ys) >>> arr reverse

-- | `retry n s f` reruns `f` on failure at most n times with a delay of `s` seconds
-- between retries
retry :: (Store a) => Int -> Int -> Flow a b -> Flow a b
retry 0 _ f = f
retry n secs f = Catch f $ proc (x,_) -> do
Expand Down

0 comments on commit 28d56f4

Please sign in to comment.