Skip to content

Commit

Permalink
Add command dispatch logic
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjg committed Dec 17, 2019
1 parent f36caae commit 92dc3df
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 30 deletions.
1 change: 1 addition & 0 deletions hedis.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ library
Database.Redis.Connection,
Database.Redis.Cluster,
Database.Redis.Cluster.HashSlot,
Database.Redis.Cluster.Command,
Database.Redis.ProtocolPipelining,
Database.Redis.Protocol,
Database.Redis.PubSub,
Expand Down
67 changes: 42 additions & 25 deletions src/Database/Redis/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ module Database.Redis.Cluster
, Shard(..)
, connect
, disconnect
--, request
, requestPipelined
, nodes
) where

import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as Char8
import qualified Data.IORef as IOR
import Data.Maybe(listToMaybe)
import Data.Maybe(listToMaybe, mapMaybe)
import Data.List(nub, sortBy)
import Data.Map(fromListWith, assocs)
import Data.Function(on)
Expand All @@ -37,8 +36,9 @@ import System.IO.Unsafe(unsafeInterleaveIO)
import Say(sayString)

import Database.Redis.Protocol(Reply(Error), renderRequest, reply)
import qualified Database.Redis.Cluster.Command as CMD

-- This modules implements a clustered connection whilst maintaining
-- This module implements a clustered connection whilst maintaining
-- compatibility with the original Hedis codebase. In particular it still
-- performs implicit pipelining using `unsafeInterleaveIO` as the single node
-- codebase does. To achieve this each connection carries around with it a
Expand All @@ -49,7 +49,7 @@ import Database.Redis.Protocol(Reply(Error), renderRequest, reply)

-- | A connection to a redis cluster, it is compoesed of a map from Node IDs to
-- | 'NodeConnection's, a 'Pipeline', and a 'ShardMap'
data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap)
data Connection = Connection (HM.HashMap NodeID NodeConnection) (MVar Pipeline) (MVar ShardMap) CMD.InfoMap

-- | A connection to a single node in the cluster, similar to 'ProtocolPipelining.Connection'
data NodeConnection = NodeConnection CC.ConnectionContext (IOR.IORef (Maybe B.ByteString)) NodeID
Expand All @@ -72,22 +72,27 @@ data Node = Node NodeID NodeRole Host Port deriving (Show, Eq, Ord)

type MasterNode = Node
type SlaveNode = Node
data Shard = Shard MasterNode [SlaveNode] deriving Show
data Shard = Shard MasterNode [SlaveNode] deriving (Show, Eq, Ord)

newtype ShardMap = ShardMap (IntMap.IntMap Shard) deriving (Show)

newtype MissingNodeException = MissingNodeException [B.ByteString] deriving (Show, Typeable)

instance Exception MissingNodeException

newtype UnsupportedClusterCommandException = UnsupportedClusterCommandException [B.ByteString] deriving (Show, Typeable)
instance Exception UnsupportedClusterCommandException

newtype CrossSlotException = CrossSlotException [B.ByteString] deriving (Show, Typeable)
instance Exception CrossSlotException

connect :: MVar ShardMap -> Maybe Int -> IO Connection
connect shardMapVar timeoutOpt = do

connect :: [CMD.CommandInfo] -> MVar ShardMap -> Maybe Int -> IO Connection
connect commandInfos shardMapVar timeoutOpt = do
shardMap <- readMVar shardMapVar
stateVar <- newMVar $ Pending []
pipelineVar <- newMVar $ Pipeline stateVar
nodeConns <- nodeConnections shardMap
return $ Connection nodeConns pipelineVar shardMapVar where
return $ Connection nodeConns pipelineVar shardMapVar (CMD.newInfoMap commandInfos) where
nodeConnections :: ShardMap -> IO (HM.HashMap NodeID NodeConnection)
nodeConnections shardMap = HM.fromList <$> mapM connectNode (nub $ nodes shardMap)
connectNode :: Node -> IO (NodeID, NodeConnection)
Expand All @@ -97,14 +102,14 @@ connect shardMapVar timeoutOpt = do
return (n, NodeConnection ctx ref n)

disconnect :: Connection -> IO ()
disconnect (Connection nodeConnMap _ _) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnect (Connection nodeConnMap _ _ _) = mapM_ disconnectNode (HM.elems nodeConnMap) where
disconnectNode (NodeConnection nodeCtx _ _) = CC.disconnect nodeCtx

-- Add a request to the current pipeline for this connection. The pipeline will
-- be executed implicitly as soon as any result returned from this function is
-- evaluated.
requestPipelined :: IO ShardMap -> Connection -> [B.ByteString] -> IO Reply
requestPipelined refreshAction conn@(Connection _ pipelineVar shardMapVar) nextRequest = modifyMVar pipelineVar $ \(Pipeline stateVar) -> do
requestPipelined refreshAction conn@(Connection _ pipelineVar shardMapVar _) nextRequest = modifyMVar pipelineVar $ \(Pipeline stateVar) -> do
(newStateVar, repliesIndex) <- hasLocked "locked adding to pipeline" $ modifyMVar stateVar $ \case
Pending requests -> return (Pending (nextRequest:requests), (stateVar, length requests))
e@(Executed _) -> do
Expand Down Expand Up @@ -136,6 +141,17 @@ rawResponse (CompletedRequest _ _ r) = r
requestForResponse :: CompletedRequest -> [B.ByteString]
requestForResponse (CompletedRequest _ r _) = r

-- The approach we take here is similar to that taken by the redis-py-cluster
-- library, which is described at https://redis-py-cluster.readthedocs.io/en/master/pipelines.html
--
-- Essentially we group all the commands by node (based on the current shardmap)
-- and then execute a pipeline for each node (maintaining the order of commands
-- on a per node basis but not between nodes). Once we've done this, if any of
-- the commands have resulted in a MOVED error we refresh the shard map, then
-- we run through all the responses and retry any MOVED or ASK errors. This retry
-- step is not pipelined, there is a request per error. This is probably
-- acceptable in most cases as these errors should only occur in the case of
-- cluster reconfiguration events, which should be rare.
evaluatePipeline :: MVar ShardMap -> IO ShardMap -> Connection -> [[B.ByteString]] -> IO [Reply]
evaluatePipeline shardMapVar refreshShardmapAction conn requests = do
shardMap <- hasLocked "reading shardmap in evaluatePipeline" $ readMVar shardMapVar
Expand All @@ -150,7 +166,7 @@ evaluatePipeline shardMapVar refreshShardmapAction conn requests = do
return $ assocs $ fromListWith (++) commandsWithNodes
requestWithNode :: ShardMap -> Int -> [B.ByteString] -> IO (NodeConnection, [PendingRequest])
requestWithNode shardMap index request = do
nodeConn <- nodeConnectionForCommandOrThrow shardMap conn request
nodeConn <- nodeConnectionForCommand conn shardMap request
return (nodeConn, [PendingRequest index request])
executeRequests :: NodeConnection -> [PendingRequest] -> IO [CompletedRequest]
executeRequests nodeConn nodeRequests = do
Expand All @@ -161,7 +177,7 @@ evaluatePipeline shardMapVar refreshShardmapAction conn requests = do
retryReply <- case thisReply of
(Error errString) | B.isPrefixOf "MOVED" errString -> do
shardMap <- hasLocked "reading shard map in retry MOVED" $ readMVar shardMapVar
nodeConn <- nodeConnectionForCommandOrThrow shardMap conn (requestForResponse resp)
nodeConn <- nodeConnectionForCommand conn shardMap (requestForResponse resp)
head <$> requestNode nodeConn [request]
(askingRedirection -> Just (host, port)) -> do
shardMap <- hasLocked "reading shardmap in retry ASK" $ readMVar shardMapVar
Expand Down Expand Up @@ -197,21 +213,22 @@ moved _ = False


nodeConnWithHostAndPort :: ShardMap -> Connection -> Host -> Port -> Maybe NodeConnection
nodeConnWithHostAndPort shardMap (Connection nodeConns _ _) host port = do
nodeConnWithHostAndPort shardMap (Connection nodeConns _ _ _) host port = do
node <- nodeWithHostAndPort shardMap host port
HM.lookup (nodeId node) nodeConns

nodeConnectionForCommandOrThrow :: ShardMap -> Connection -> [B.ByteString] -> IO NodeConnection
nodeConnectionForCommandOrThrow shardMap (Connection nodeConns _ _) command = maybe (throwIO $ MissingNodeException command) return maybeNode where
maybeNode = do
node <- nodeForCommand shardMap command
HM.lookup (nodeId node) nodeConns

nodeForCommand :: ShardMap -> [B.ByteString] -> Maybe Node
nodeForCommand (ShardMap shards) (_:key:_) = do
(Shard master _) <- IntMap.lookup (fromEnum $ keyToSlot key) shards
Just master
nodeForCommand _ _ = Nothing
nodeConnectionForCommand :: Connection -> ShardMap -> [B.ByteString] -> IO NodeConnection
nodeConnectionForCommand (Connection nodeConns _ _ infoMap) (ShardMap shardMap) request = do
keys <- case CMD.keysForRequest infoMap request of
Nothing -> throwIO $ UnsupportedClusterCommandException request
Just k -> return k
let shards = nub $ mapMaybe ((flip IntMap.lookup shardMap) . fromEnum . keyToSlot) keys
node <- case shards of
[] -> throwIO $ MissingNodeException request
[Shard master _] -> return master
_ -> throwIO $ CrossSlotException request
maybe (throwIO $ MissingNodeException request) return (HM.lookup (nodeId node) nodeConns)



requestNode :: NodeConnection -> [[B.ByteString]] -> IO [Reply]
Expand Down
131 changes: 131 additions & 0 deletions src/Database/Redis/Cluster/Command.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
{-# LANGUAGE OverloadedStrings, RecordWildCards #-}
module Database.Redis.Cluster.Command where

import Data.Char(toLower)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as Char8
import qualified Data.HashMap.Strict as HM
import Database.Redis.Types(RedisResult(decode))
import Database.Redis.Protocol(Reply(..))

data Flag
= Write
| ReadOnly
| DenyOOM
| Admin
| PubSub
| NoScript
| Random
| SortForScript
| Loading
| Stale
| SkipMonitor
| Asking
| Fast
| MovableKeys
| Other BS.ByteString deriving (Show, Eq)


data AritySpec = Required Integer | MinimumRequired Integer deriving (Show)

data LastKeyPositionSpec = LastKeyPosition Integer | UnlimitedKeys deriving (Show)

newtype InfoMap = InfoMap (HM.HashMap String CommandInfo)

-- Represents the result of the COMMAND command, which returns information
-- about the position of keys in a request
data CommandInfo = CommandInfo
{ name :: BS.ByteString
, arity :: AritySpec
, flags :: [Flag]
, firstKeyPosition :: Integer
, lastKeyPosition :: LastKeyPositionSpec
, stepCount :: Integer
} deriving (Show)

instance RedisResult CommandInfo where
decode (MultiBulk (Just
[ Bulk (Just commandName)
, Integer aritySpec
, MultiBulk (Just replyFlags)
, Integer firstKeyPos
, Integer lastKeyPos
, Integer replyStepCount])) = do
parsedFlags <- mapM parseFlag replyFlags
lastKey <- parseLastKeyPos
return $ CommandInfo
{ name = commandName
, arity = parseArity aritySpec
, flags = parsedFlags
, firstKeyPosition = firstKeyPos
, lastKeyPosition = lastKey
, stepCount = replyStepCount
} where
parseArity int = case int of
i | i >= 0 -> Required i
i -> MinimumRequired $ abs i
parseFlag :: Reply -> Either Reply Flag
parseFlag (SingleLine flag) = return $ case flag of
"write" -> Write
"readonly" -> ReadOnly
"denyoom" -> DenyOOM
"admin" -> Admin
"pubsub" -> PubSub
"noscript" -> NoScript
"random" -> Random
"sort_for_script" -> SortForScript
"loading" -> Loading
"stale" -> Stale
"skip_monitor" -> SkipMonitor
"asking" -> Asking
"fast" -> Fast
"movablekeys" -> MovableKeys
other -> Other other
parseFlag bad = Left bad
parseLastKeyPos :: Either Reply LastKeyPositionSpec
parseLastKeyPos = return $ case lastKeyPos of
i | i == -1 -> UnlimitedKeys
i -> LastKeyPosition i

decode e = Left e

newInfoMap :: [CommandInfo] -> InfoMap
newInfoMap = InfoMap . HM.fromList . map (\c -> (Char8.unpack $ name c, c))

keysForRequest :: InfoMap -> [BS.ByteString] -> Maybe [BS.ByteString]
keysForRequest (InfoMap infoMap) request@(command:_) = do
info <- HM.lookup (map toLower $ Char8.unpack command) infoMap
if isMovable info then return $ parseMovable request else do
let possibleKeys = case lastKeyPosition info of
LastKeyPosition end -> take (fromEnum $ 1 + end - firstKeyPosition info) $ drop (fromEnum $ firstKeyPosition info) request
UnlimitedKeys -> drop (fromEnum $ firstKeyPosition info) request
return $ takeEvery (fromEnum $ stepCount info) possibleKeys
keysForRequest _ [] = Nothing

isMovable :: CommandInfo -> Bool
isMovable CommandInfo{..} = MovableKeys `elem` flags

parseMovable :: [BS.ByteString] -> [BS.ByteString]
parseMovable ("SORT":key:_) = [key]
parseMovable ("EVAL":_:rest) = readNumKeys rest
parseMovable ("EVALSH":_:rest) = readNumKeys rest
parseMovable ("ZUNIONSTORE":_:rest) = readNumKeys rest
parseMovable ("ZINTERSTORE":_:rest) = readNumKeys rest
parseMovable _ = []


readNumKeys :: [BS.ByteString] -> [BS.ByteString]
readNumKeys (rawNumKeys:rest) = case readMaybe (Char8.unpack rawNumKeys) of
Just numKeys -> take numKeys rest
Nothing -> []
readNumKeys _ = []

takeEvery :: Int -> [a] -> [a]
takeEvery n xs = case drop (n-1) xs of
(y:ys) -> y : takeEvery n ys
[] -> []

readMaybe :: Read a => String -> Maybe a
readMaybe s = case reads s of
[(val, "")] -> Just val
_ -> Nothing
3 changes: 2 additions & 1 deletion src/Database/Redis/Commands.hs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ clusterSetSlotNode,
clusterSetSlotStable,
clusterSetSlotImporting,
clusterSetSlotMigrating,
clusterGetKeysInSlot
clusterGetKeysInSlot,
command
-- * Unimplemented Commands
-- |These commands are not implemented, as of now. Library
-- users can implement these or other commands from
Expand Down
13 changes: 9 additions & 4 deletions src/Database/Redis/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import Database.Redis.Commands
, select
, auth
, clusterSlots
, command
, ClusterSlotsResponse(..)
, ClusterSlotsResponseEntry(..)
, ClusterSlotsNode(..))
Expand Down Expand Up @@ -191,12 +192,16 @@ connectCluster :: ConnectInfo -> IO Connection
connectCluster bootstrapConnInfo = do
conn <- createConnection bootstrapConnInfo
slotsResponse <- runRedisInternal conn clusterSlots
case slotsResponse of
shardMapVar <- case slotsResponse of
Left e -> throwIO $ ClusterConnectError e
Right slots -> do
shardMap <- shardMapFromClusterSlotsResponse slots
shardMapVar <- newMVar shardMap
pool <- createPool (Cluster.connect shardMapVar Nothing) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
newMVar shardMap
commandInfos <- runRedisInternal conn command
case commandInfos of
Left e -> throwIO $ ClusterConnectError e
Right infos -> do
pool <- createPool (Cluster.connect infos shardMapVar Nothing) Cluster.disconnect 1 (connectMaxIdleTime bootstrapConnInfo) (connectMaxConnections bootstrapConnInfo)
return $ ClusteredConnection shardMapVar pool

shardMapFromClusterSlotsResponse :: ClusterSlotsResponse -> IO ShardMap
Expand All @@ -217,7 +222,7 @@ shardMapFromClusterSlotsResponse ClusterSlotsResponse{..} = ShardMap <$> foldr m
Cluster.Node clusterSlotsNodeID role hostname (toEnum clusterSlotsNodePort)

refreshShardMap :: Cluster.Connection -> IO ShardMap
refreshShardMap (Cluster.Connection nodeConns _ _) = do
refreshShardMap (Cluster.Connection nodeConns _ _ _) = do
let (Cluster.NodeConnection ctx _ _) = head $ HM.elems nodeConns
pipelineConn <- PP.fromCtx ctx
_ <- PP.beginReceiving pipelineConn
Expand Down
4 changes: 4 additions & 0 deletions src/Database/Redis/ManualCommands.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Data.Maybe (maybeToList, catMaybes)
import Database.Redis.Core
import Database.Redis.Protocol
import Database.Redis.Types
import qualified Database.Redis.Cluster.Command as CMD


objectRefcount
Expand Down Expand Up @@ -1374,3 +1375,6 @@ clusterGetKeysInSlot
-> Integer
-> m (f [ByteString])
clusterGetKeysInSlot slot count = sendRequest ["CLUSTER", "GETKEYSINSLOT", (encode slot), (encode count)]

command :: (RedisCtx m f) => m (f [CMD.CommandInfo])
command = sendRequest ["COMMAND"]

0 comments on commit 92dc3df

Please sign in to comment.