Skip to content

Commit

Permalink
Add DESTINATIONS_POOL_REPLICAS
Browse files Browse the repository at this point in the history
This allows to have multiple connections per destinations, this will
pool all the replicas of a single host in the same queue and distribute
points accross these replicas instead of replicating them.
The following example will balance the load between :0 and :1.

   DESTINATIONS = foo:2001:0, foo:2001:1
   RELAY_METHOD = rules
  • Loading branch information
Corentin Chary committed Aug 24, 2018
1 parent 3fc3dca commit 031cca0
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 6 deletions.
7 changes: 7 additions & 0 deletions conf/carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ DESTINATIONS = 127.0.0.1:2004
# DESTINATION_TRANSPORT = none
# DESTINATION_SSL_CA=/path/to/private-ca.crt

# This allows to have multiple connections per destinations, this will
# pool all the replicas of a single host in the same queue and distribute
# points accross these replicas instead of replicating them.
# The following example will balance the load between :0 and :1.
## DESTINATIONS = foo:2001:0, foo:2001:1
## RELAY_METHOD = rules
# DESTINATIONS_POOL_REPLICAS = True

# When using consistent hashing it sometime makes sense to make
# the ring dynamic when you don't want to loose points when a
Expand Down
42 changes: 36 additions & 6 deletions lib/carbon/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from collections import deque
from collections import deque, defaultdict
from time import time
from six import with_metaclass

from twisted.application.service import Service
from twisted.internet import reactor
from twisted.internet.base import BlockingResolver
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.protocols.basic import LineOnlyReceiver, Int32StringReceiver

from carbon.conf import settings
from carbon.util import pickle
from carbon import instrumentation, log, pipeline, state
Expand All @@ -28,6 +30,8 @@
log.debug("Couldn't import signal module")


resolver = BlockingResolver()

SEND_QUEUE_LOW_WATERMARK = settings.MAX_QUEUE_SIZE * settings.QUEUE_LOW_WATERMARK_PCT


Expand Down Expand Up @@ -209,6 +213,7 @@ def __init__(self, destination, router):
self.router = router
self.destinationName = ('%s:%d:%s' % destination).replace('.', '_')
self.host, self.port, self.carbon_instance = destination
self.resolved_host = self.host
self.addr = (self.host, self.port)
self.started = False
# This factory maintains protocol state across reconnects
Expand Down Expand Up @@ -265,6 +270,7 @@ def buildProtocol(self, addr):

def startConnecting(self): # calling this startFactory yields recursion problems
self.started = True

if settings['DESTINATION_TRANSPORT'] == "ssl":
if not SSL or not ssl:
print("SSL destination transport request, but no Python OpenSSL available.")
Expand All @@ -283,8 +289,20 @@ def startConnecting(self): # calling this startFactory yields recursion problem
else:
client = CAReplaceClientContextFactory(settings['DESTINATION_SSL_CA'])
self.connector = reactor.connectSSL(self.host, self.port, self, client)
elif settings['DESTINATION_POOL_REPLICAS']:
# If we decide to open multiple TCP connection to a replica, we probably
# want to try to also load-balance accross hosts.
d = resolver.getHostByName(self.host, timeout=1)

def _store_result(result):
log.clients("Resolved %s to %s" % (self.host, result))
self.resolved_host = result

d.addCallback(_store_result)
d.addErrback(log.err)
self.connector = reactor.connectTCP(self.resolved_host, self.port, self)
else:
self.connector = reactor.connectTCP(self.host, self.port, self)
self.connector = reactor.connectTCP(self.host, self.port, self)

def stopConnecting(self):
self.started = False
Expand Down Expand Up @@ -515,6 +533,8 @@ class CarbonClientManager(Service):
def __init__(self, router):
self.router = router
self.client_factories = {} # { destination : CarbonClientFactory() }
# { destination[0:2]: set(CarbonClientFactory()) }
self.pooled_factories = defaultdict(set)

# This fake factory will be used as a buffer when we did not manage
# to connect to any destination.
Expand Down Expand Up @@ -558,6 +578,7 @@ def startClient(self, destination):

factory = self.createFactory(destination)
self.client_factories[destination] = factory
self.pooled_factories[destination[0:2]].add(factory)

connectAttempted = DeferredList(
[factory.connectionMade, factory.connectFailed],
Expand All @@ -580,6 +601,7 @@ def stopClient(self, destination):

def disconnectClient(self, destination):
factory = self.client_factories.pop(destination)
self.pooled_factories[destination[0:2]].remove(factory)
c = factory.connector
if c and c.state == 'connecting' and not factory.hasQueuedDatapoints():
c.stopConnecting()
Expand All @@ -600,13 +622,21 @@ def getDestinations(self, metric):
return [None]
return destinations

def getFactories(self, metric):
destinations = self.getDestinations(metric)
if not settings['DESTINATION_POOL_REPLICAS']:
return [self.client_factories[d] for d in destinations]
else:
return set([min(self.pooled_factories[d[0:2]], key=lambda f: f.queueSize)
for d in destinations])

def sendDatapoint(self, metric, datapoint):
for destination in self.getDestinations(metric):
self.client_factories[destination].sendDatapoint(metric, datapoint)
for factory in self.getFactories(metric):
factory.sendDatapoint(metric, datapoint)

def sendHighPriorityDatapoint(self, metric, datapoint):
for destination in self.getDestinations(metric):
self.client_factories[destination].sendHighPriorityDatapoint(metric, datapoint)
for factory in self.getFactories(metric):
factory.sendHighPriorityDatapoint(metric, datapoint)

def __str__(self):
return "<%s[%x]>" % (self.__class__.__name__, id(self))
Expand Down
1 change: 1 addition & 0 deletions lib/carbon/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
DESTINATION_PROTOCOL="pickle",
DESTINATION_TRANSPORT="none",
DESTINATION_SSL_CA=None,
DESTINATION_POOL_REPLICAS=False,
USE_FLOW_CONTROL=True,
USE_INSECURE_UNPICKLER=False,
USE_WHITELIST=False,
Expand Down

0 comments on commit 031cca0

Please sign in to comment.