Skip to content

Commit

Permalink
Send the command to another node in case of any exception.
Browse files Browse the repository at this point in the history
The new node will give `moved` as response with new node info which will be sent
to the right one.
  • Loading branch information
Ag committed Jan 11, 2023
1 parent df342fc commit dd99e12
Showing 1 changed file with 18 additions and 3 deletions.
21 changes: 18 additions & 3 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import qualified Data.ByteString as B
import Data.Char(toLower)
import qualified Data.ByteString.Char8 as Char8
import qualified Data.IORef as IOR
import Data.Maybe(mapMaybe, fromMaybe)
import Data.List(nub, sortBy, find)
import Data.Maybe(listToMaybe, mapMaybe, fromMaybe)
import Data.List(nub, sortBy, find, findIndex)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
import Control.Exception(Exception, SomeException, throwIO, BlockedIndefinitelyOnMVar(..), catches, Handler(..), try)
Expand Down Expand Up @@ -196,11 +196,26 @@ evaluatePipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString
evaluatePipeline shardMapVar refreshShardmapAction conn requests = do
shardMap <- hasLocked "reading shardmap in evaluatePipeline" $ readMVar shardMapVar
requestsByNode <- getRequestsByNode shardMap
resps <- concat <$> mapM (uncurry executeRequests) requestsByNode
-- catch the exception thrown at each node level
-- send the command to random node.
-- merge the current responses with new responses.
eresps <- mapM (try . uncurry executeRequests) requestsByNode
-- take a random connection where there are no exceptions.
let (nc, _) = (requestsByNode !!) $ fromMaybe 0 $ findIndex isRight eresps
-- PERF_CONCERN: Since usually we send only one request at time, this won't be
-- heavy perf issue. but still should be evaluated and figured out with complete rewrite.
resps <- concat <$> mapM (\(resp, (_, r)) -> case resp of
Right v -> return v
Left (_ :: SomeException) -> executeRequests nc r
) (zip eresps requestsByNode)
-- check for any moved in both responses and continue the flow.
when (any (moved . rawResponse) resps) (refreshShardMapVar "locked refreshing due to moved responses")
retriedResps <- mapM (retry 0) resps
return $ map rawResponse $ sortBy (on compare responseIndex) retriedResps
where
isRight :: Either a b -> Bool
isRight (Right _) = True
isRight _ = False
getRequestsByNode :: ShardMap -> IO [(NodeConnection, [PendingRequest])]
getRequestsByNode shardMap = do
commandsWithNodes <- zipWithM (requestWithNode shardMap) (reverse [0..(length requests - 1)]) requests
Expand Down

0 comments on commit dd99e12

Please sign in to comment.