Skip to content

Commit

Permalink
Refactor loadbalancer code to make it easily extensible
Browse files Browse the repository at this point in the history
Reduce code duplication between various loadbalancers
  • Loading branch information
thesiddharth committed Jul 8, 2015
1 parent 78c3652 commit 15d996e
Show file tree
Hide file tree
Showing 10 changed files with 346 additions and 390 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,6 @@ object SystemClock extends Clock {
def getCurrentTimeMilliseconds = System.currentTimeMillis
}

object SystemClockComponent extends ClockComponent {
trait SystemClockComponent extends ClockComponent {
val clock = SystemClock
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import client.NetworkClientConfig
import common._
import norbertutils._
import network.client.ResponseHandler
import norbertutils.{Clock, SystemClock, SystemClockComponent}
import norbertutils.{Clock, SystemClock}
import java.util.{Map => JMap}
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import util.ProtoUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import netty.NettyPartitionedNetworkClient
import client.NetworkClientConfig
import cluster.{Node, ClusterDisconnectedException, InvalidClusterException, ClusterClientComponent}
import scala.util.Random
import java.util

object RoutingConfigs {
val defaultRoutingConfigs = new RoutingConfigs(false, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class PartitionedNetworkClientFactory[PartitionedId](clientName: String,
if(retryStrategy == null)
throw new IllegalArgumentException("Retry strategy needs to be provided if you enable selective retry")
else
config.retryStrategy = Some(retryStrategy)
config.retryStrategy = Some(retryStrategy)
}
val partitionedNetworkClient = PartitionedNetworkClient(config, partitionedLoadBalancerFactory)

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,12 @@ package loadbalancer
import cluster.{InvalidClusterException, Node}
import common.Endpoint
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import annotation.tailrec
import client.loadbalancer.LoadBalancerHelpers
import logging.Logging

/**
* A mixin trait that provides functionality to help implement a hash based <code>Router</code>.
*/
trait DefaultLoadBalancerHelper extends LoadBalancerHelpers with Logging {
/**
* A mapping from partition id to the <code>Node</code>s which can service that partition.
*/
protected val partitionToNodeMap: Map[Int, (IndexedSeq[Endpoint], AtomicInteger, Array[AtomicBoolean])]
trait DefaultLoadBalancerHelper extends PartitionedLoadBalancerHelpers with Logging {

/**
* Given the currently available <code>Node</code>s and the total number of partitions in the cluster, this method
Expand All @@ -45,7 +39,7 @@ trait DefaultLoadBalancerHelper extends LoadBalancerHelpers with Logging {
* @throws InvalidClusterException thrown if every partition doesn't have at least one available <code>Node</code>
* assigned to it
*/
protected def generatePartitionToNodeMap(nodes: Set[Endpoint], numPartitions: Int, serveRequestsIfPartitionMissing: Boolean): Map[Int, (IndexedSeq[Endpoint], AtomicInteger, Array[AtomicBoolean])] = {
def generatePartitionToNodeMap(nodes: Set[Endpoint], numPartitions: Int, serveRequestsIfPartitionMissing: Boolean): Map[Int, (IndexedSeq[Endpoint], AtomicInteger, Array[AtomicBoolean])] = {
val partitionToNodeMap = (for (n <- nodes; p <- n.node.partitionIds) yield(p, n)).foldLeft(Map.empty[Int, IndexedSeq[Endpoint]]) {
case (map, (partitionId, node)) => map + (partitionId -> (node +: map.get(partitionId).getOrElse(Vector.empty[Endpoint])))
}
Expand Down Expand Up @@ -78,21 +72,20 @@ trait DefaultLoadBalancerHelper extends LoadBalancerHelpers with Logging {
* @return <code>Some</code> with the <code>Node</code> which can service the partition id, <code>None</code>
* if there are no available <code>Node</code>s for the partition requested
*/
protected def nodeForPartition(partitionId: Int, capability: Option[Long] = None, persistentCapability: Option[Long] = None): Option[Node] = {
def nodeForPartition(partitionId: Int, capability: Option[Long] = None, persistentCapability: Option[Long] = None): Option[Node] = {
partitionToNodeMap.get(partitionId) match {
case None =>
return None
None
case Some((endpoints, counter, states)) =>
import math._
val es = endpoints.size
counter.compareAndSet(java.lang.Integer.MAX_VALUE, 0)
val idx = counter.getAndIncrement
var i = idx
var loopCount = 0
do {
val endpoint = endpoints(i % es)
if(endpoint.canServeRequests && endpoint.node.isCapableOf(capability, persistentCapability)) {
compensateCounter(idx, loopCount, counter);
if(isEndpointViable(capability, persistentCapability, endpoint)) {
compensateCounter(idx, loopCount, counter)
return Some(endpoint.node)
}

Expand All @@ -101,16 +94,12 @@ trait DefaultLoadBalancerHelper extends LoadBalancerHelpers with Logging {
loopCount = loopCount + 1
} while (loopCount <= es)

compensateCounter(idx, loopCount, counter);
return Some(endpoints(idx % es).node)
compensateCounter(idx, loopCount, counter)
Some(endpoints(idx % es).node)
}
}

def compensateCounter(idx: Int, count:Int, counter:AtomicInteger) {
if (idx + 1 + count <= 0) {
// Integer overflow
counter.set(idx + 1 - java.lang.Integer.MAX_VALUE + count)
}
counter.set(idx + 1 + count)
def isEndpointViable(capability: Option[Long], persistentCapability: Option[Long], endpoint: Endpoint): Boolean = {
endpoint.canServeRequests && endpoint.node.isCapableOf(capability, persistentCapability)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,138 +22,15 @@ import logging.Logging
import cluster.{Node, InvalidClusterException}
import common.Endpoint
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import scala.util.Random
import scala.util.control.Breaks._

/**
* This class is intended for applications where there is a mapping from partitions -> servers able to respond to those requests. Requests are round-robined
* between the partitions
*/
abstract class DefaultPartitionedLoadBalancerFactory[PartitionedId](numPartitions: Int, serveRequestsIfPartitionMissing: Boolean = true) extends PartitionedLoadBalancerFactory[PartitionedId] with Logging {
def newLoadBalancer(endpoints: Set[Endpoint]): PartitionedLoadBalancer[PartitionedId] = new PartitionedLoadBalancer[PartitionedId] with DefaultLoadBalancerHelper {
val partitionToNodeMap = generatePartitionToNodeMap(endpoints, numPartitions, serveRequestsIfPartitionMissing)

val setCoverCounter = new AtomicInteger(0)

def nextNode(id: PartitionedId, capability: Option[Long] = None, persistentCapability: Option[Long] = None) = nodeForPartition(partitionForId(id), capability, persistentCapability)


def nodesForPartitionedId(id: PartitionedId, capability: Option[Long] = None, persistentCapability: Option[Long] = None) = {
partitionToNodeMap.getOrElse(partitionForId(id), (Vector.empty[Endpoint], new AtomicInteger(0), new Array[AtomicBoolean](0)))._1.filter(_.node.isCapableOf(capability, persistentCapability)).toSet.map
{ (endpoint: Endpoint) => endpoint.node }
}

def nodesForOneReplica(id: PartitionedId, capability: Option[Long] = None, persistentCapability: Option[Long] = None) = {
nodesForPartitions(id, partitionToNodeMap, capability, persistentCapability)
}

def nodesForPartitions(id: PartitionedId, partitions: Set[Int], capability: Option[Long] = None, persistentCapability: Option[Long] = None) = {
nodesForPartitions(id, partitionToNodeMap.filterKeys(partitions contains _), capability, persistentCapability)
}

def nodesForPartitions(id: PartitionedId, partitionToNodeMap: Map[Int, (IndexedSeq[Endpoint], AtomicInteger, Array[AtomicBoolean])], capability: Option[Long], persistentCapability: Option[Long]) = {
partitionToNodeMap.keys.foldLeft(Map.empty[Node, Set[Int]]) { (map, partition) =>
val nodeOption = nodeForPartition(partition, capability, persistentCapability)
if(nodeOption.isDefined) {
val n = nodeOption.get
map + (n -> (map.getOrElse(n, Set.empty[Int]) + partition))
} else if(serveRequestsIfPartitionMissing) {
log.warn("Partition %s is unavailable, attempting to continue serving requests to other partitions.".format(partition))
map
} else
throw new InvalidClusterException("Partition %s is unavailable, cannot serve requests.".format(partition))
}
}

/**
* Use greedy set cover to minimize the nodes that serve the requested partitioned Ids
*
* @param partitionedIds
* @param capability
* @param persistentCapability
* @return
*/
override def nodesForPartitionedIds(partitionedIds: Set[PartitionedId], capability: Option[Long], persistentCapability: Option[Long] = None) = {

// calculates partition Ids from the set of partitioned Ids
val partitionsMap = partitionedIds.foldLeft(Map.empty[Int, Set[PartitionedId]])
{ case (map, id) =>
val partition = partitionForId(id)
map + (partition -> (map.getOrElse(partition, Set.empty[PartitionedId]) + id))
}

// set to be covered
var partitionIds = partitionsMap.keys.toSet[Int]
val res = collection.mutable.Map.empty[Node, collection.Set[Int]]

breakable {
while (!partitionIds.isEmpty) {
var intersect = Set.empty[Int]
var endpoint : Endpoint = null

// take one element in the set, locate only nodes that serving this partition
partitionToNodeMap.get(partitionIds.head) match {
case None =>
break
case Some((endpoints, counter, states)) =>
import math._
val es = endpoints.size
counter.compareAndSet(java.lang.Integer.MAX_VALUE, 0)
val idx = counter.getAndIncrement % es
var i = idx

// This is a modified version of greedy set cover algorithm, instead of finding the node that covers most of
// the partitionIds set, we only check it across nodes that serving the selected partition. This guarantees
// we will pick a node at least cover 1 more partition, but also in case of multiple replicas of partitions,
// this helps to locate nodes long to the same replica.
breakable {
do {
val ep = endpoints(i)

// perform intersection between the set to be covered and the set the node is covering
val s = ep.node.partitionIds intersect partitionIds

// record the largest intersect
if (s.size > intersect.size && ep.canServeRequests && ep.node.isCapableOf(capability, persistentCapability))
{
intersect = s
endpoint = ep
if (partitionIds.size == s.size)
break
}
i = (i+1 ) % es
} while(i != idx)
}
}

if (endpoint == null)
{
if (serveRequestsIfPartitionMissing)
{
intersect = intersect + partitionIds.head
}
else
throw new NoNodesAvailableException("Unable to satisfy request, no node available for partition Id %s".format(partitionIds.head))
} else
res += (endpoint.node -> intersect)

// remove covered set; remove the node providing that coverage
partitionIds = ( partitionIds -- intersect)
}
}

res.foldLeft(Map.empty[Node, Set[PartitionedId]]) {
case (map, (n, pIds)) =>
{
map + (n -> pIds.foldLeft(Set.empty[PartitionedId]) {
case (s, pId) =>
s ++ partitionsMap.getOrElse(pId, Set.empty[PartitionedId])
})
}
}
}
}

override def newLoadBalancer(endpoints: Set[Endpoint]): PartitionedLoadBalancer[PartitionedId] = DefaultPartitionedLoadBalancer(endpoints, partitionForId, numPartitions, serveRequestsIfPartitionMissing)
/**
* Calculates the id of the partition on which the specified <code>Id</code> resides.
*
Expand All @@ -178,3 +55,139 @@ abstract class DefaultPartitionedLoadBalancerFactory[PartitionedId](numPartition
def getNumPartitions(endpoints: Set[Endpoint]): Int

}


object DefaultPartitionedLoadBalancer {

def apply[PartitionedId](endpoints: Set[Endpoint], partitionForId: PartitionedId => Int, numPartitions: Int, serveRequestsIfPartitionMissing: Boolean) = {

//generatePartitionToNodeMap requires the Logging constructor to be called, and so must be evaluated lazily.
new DefaultPartitionedLoadBalancer[PartitionedId](endpoints, partitionForId, numPartitions, serveRequestsIfPartitionMissing) with DefaultLoadBalancerHelper {
val partitionToNodeMap = generatePartitionToNodeMap(endpoints, numPartitions, serveRequestsIfPartitionMissing)
}
}
}


abstract class DefaultPartitionedLoadBalancer[PartitionedId](endpoints: Set[Endpoint], partitionForId: PartitionedId => Int, numPartitions: Int, serveRequestsIfPartitionMissing: Boolean) extends PartitionedLoadBalancer[PartitionedId] {

this: PartitionedLoadBalancerHelpers with Logging =>

def nextNode(id: PartitionedId, capability: Option[Long] = None, persistentCapability: Option[Long] = None) = nodeForPartition(partitionForId(id), capability, persistentCapability)

def nodesForPartitionedId(id: PartitionedId, capability: Option[Long] = None, persistentCapability: Option[Long] = None) = {
partitionToNodeMap.getOrElse(partitionForId(id), (Vector.empty[Endpoint], new AtomicInteger(0), new Array[AtomicBoolean](0)))._1.filter(_.node.isCapableOf(capability, persistentCapability)).toSet.map
{ (endpoint: Endpoint) => endpoint.node }
}

def nodesForOneReplica(id: PartitionedId, capability: Option[Long] = None, persistentCapability: Option[Long] = None) = {
nodesForPartitions(id, partitionToNodeMap, capability, persistentCapability)
}

def nodesForPartitions(id: PartitionedId, partitions: Set[Int], capability: Option[Long] = None, persistentCapability: Option[Long] = None) = {
nodesForPartitions(id, partitionToNodeMap.filterKeys(partitions contains _), capability, persistentCapability)
}

def nodesForPartitions(id: PartitionedId, partitionToNodeMap: Map[Int, (IndexedSeq[Endpoint], AtomicInteger, Array[AtomicBoolean])], capability: Option[Long], persistentCapability: Option[Long]) = {
partitionToNodeMap.keys.foldLeft(Map.empty[Node, Set[Int]]) { (map, partition) =>
val nodeOption = nodeForPartition(partition, capability, persistentCapability)
if(nodeOption.isDefined) {
val n = nodeOption.get
map + (n -> (map.getOrElse(n, Set.empty[Int]) + partition))
} else if(serveRequestsIfPartitionMissing) {
log.warn("Partition %s is unavailable, attempting to continue serving requests to other partitions.".format(partition))
map
} else
throw new InvalidClusterException("Partition %s is unavailable, cannot serve requests.".format(partition))
}
}

/**
* Use greedy set cover to minimize the nodes that serve the requested partitioned Ids
*
* @param partitionedIds
* @param capability
* @param persistentCapability
* @return
*/
override def nodesForPartitionedIds(partitionedIds: Set[PartitionedId], capability: Option[Long], persistentCapability: Option[Long] = None) = {

// calculates partition Ids from the set of partitioned Ids
val partitionsMap = partitionedIds.foldLeft(Map.empty[Int, Set[PartitionedId]])
{ case (map, id) =>
val partition = partitionForId(id)
map + (partition -> (map.getOrElse(partition, Set.empty[PartitionedId]) + id))
}

// set to be covered
var partitionIds = partitionsMap.keys.toSet[Int]
val res = collection.mutable.Map.empty[Node, collection.Set[Int]]

breakable {
while (!partitionIds.isEmpty) {
var intersect = Set.empty[Int]
var endpoint : Endpoint = null

// take one element in the set, locate only nodes that serving this partition
partitionToNodeMap.get(partitionIds.head) match {
case None =>
break
case Some((endpoints, counter, states)) =>
import math._
val es = endpoints.size
counter.compareAndSet(java.lang.Integer.MAX_VALUE, 0)
val idx = counter.getAndIncrement % es
var i = idx

// This is a modified version of greedy set cover algorithm, instead of finding the node that covers most of
// the partitionIds set, we only check it across nodes that serving the selected partition. This guarantees
// we will pick a node at least cover 1 more partition, but also in case of multiple replicas of partitions,
// this helps to locate nodes long to the same replica.
breakable {
do {
val ep = endpoints(i)

// perform intersection between the set to be covered and the set the node is covering
val s = ep.node.partitionIds intersect partitionIds

// record the largest intersect
if (s.size > intersect.size && isEndpointViable(capability, persistentCapability, ep))
{
intersect = s
endpoint = ep
if (partitionIds.size == s.size)
break
}
i = (i+1 ) % es
} while(i != idx)
}
}

if (endpoint == null)
{
if (serveRequestsIfPartitionMissing)
{
intersect = intersect + partitionIds.head
}
else
throw new NoNodesAvailableException("Unable to satisfy request, no node available for partition Id %s".format(partitionIds.head))
} else
res += (endpoint.node -> intersect)

// remove covered set; remove the node providing that coverage
partitionIds = (partitionIds -- intersect)
}
}

res.foldLeft(Map.empty[Node, Set[PartitionedId]]) {
case (map, (n, pIds)) =>
{
map + (n -> pIds.foldLeft(Set.empty[PartitionedId]) {
case (s, pId) =>
s ++ partitionsMap.getOrElse(pId, Set.empty[PartitionedId])
})
}
}
}
}

Loading

0 comments on commit 15d996e

Please sign in to comment.