Skip to content

Commit

Permalink
added relay instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
cdavis committed Sep 10, 2009
1 parent 8eaa81a commit 7064ae5
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 8 deletions.
4 changes: 2 additions & 2 deletions bin/carbon-cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, CacheQueryHandler, startListener
from carbon.cache import MetricCache
from carbon.writer import startWriter
from carbon.instrumentation import recorder
from carbon.instrumentation import startRecordingCacheMetrics
from carbon.events import metricReceived


Expand Down Expand Up @@ -146,7 +146,7 @@ def shutdown():
startListener(settings.PICKLE_RECEIVER_INTERFACE, settings.PICKLE_RECEIVER_PORT, MetricPickleReceiver)
startListener(settings.CACHE_QUERY_INTERFACE, settings.CACHE_QUERY_PORT, CacheQueryHandler)
startWriter()
recorder.start(60, now=False)
startRecordingCacheMetrics()


# Run the twisted reactor
Expand Down
2 changes: 2 additions & 0 deletions bin/carbon-relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
from carbon.listeners import MetricLineReceiver, MetricPickleReceiver, startListener
from carbon.relay import startRelaying, relay
from carbon.events import metricReceived
from carbon.instrumentation import startRecordingRelayMetrics


# Parse command line options
Expand Down Expand Up @@ -146,6 +147,7 @@ def shutdown():

cacheServers = [ server.strip() for server in settings.CACHE_SERVERS.split(',') ]
startRelaying(cacheServers, options.rules)
startRecordingRelayMetrics()


# Run the twisted reactor
Expand Down
47 changes: 44 additions & 3 deletions lib/carbon/instrumentation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import time, socket
from twisted.internet.task import LoopingCall
from carbon.cache import MetricCache
from carbon.relay import relay, RelayServers


stats = {}
HOSTNAME = socket.gethostname().replace('.','_')
recordTask = None


def increment(stat, increase=1):
Expand All @@ -21,10 +23,19 @@ def append(stat, value):
stats[stat] = [value]


def record():
# Cache metrics
def startRecordingCacheMetrics():
global recordTask
assert not recordTask, "Already recording metrics"
recordTask = LoopingCall(recordCacheMetrics)
recordTask.start(60, now=False)


def recordCacheMetrics():
myStats = stats.copy()
stats.clear()

metricsReceived = myStats.get('metricsReceived', 0)
updateTimes = myStats.get('updateTimes', [])
committedPoints = myStats.get('committedPoints', 0)
creates = myStats.get('creates', 0)
Expand All @@ -38,7 +49,8 @@ def record():
if committedPoints:
pointsPerUpdate = len(updateTimes) / committedPoints
store('pointsPerUpdate', pointsPerUpdate)


store('metricsReceived', metricsReceived)
store('updateOperations', len(updateTimes))
store('committedPoints', committedPoints)
store('creates', creates)
Expand All @@ -55,4 +67,33 @@ def store(metric, value):
MetricCache.store(fullMetric, datapoint)


recorder = LoopingCall(record)
# Relay metrics
def startRecordingRelayMetrics():
global recordTask
assert not recordTask, "Already recording metrics"
recordTask = LoopingCall(recordRelayMetrics)
recordTask.start(60, now=False)


def recordRelayMetrics():
myStats = stats.copy()
stats.clear()

# global metrics
send('metricsReceived', myStats.get('metricsReceived', 0))

# per-destination metrics
for server in RelayServers:
prefix = 'destinations.%s.' % server.destinationName

for metric in ('attemptedRelays', 'sent', 'queuedUntilReady', 'queuedUntilConnected', 'fullQueueDrops'):
metric = prefix + metric
send(metric, myStats.get(metric, 0))

send(prefix + 'queueSize', len(server.queue))


def send(metric, value):
fullMetric = 'carbon.relays.%s.%s' % (HOSTNAME, metric)
datapoint = (time.time(), value)
relay(fullMetric, datapoint)
2 changes: 2 additions & 0 deletions lib/carbon/listeners.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def lineReceived(self, line):
self.transport.loseConnection()
return

increment('metricsReceived')
metricReceived(metric, datapoint)


Expand All @@ -51,6 +52,7 @@ def stringReceived(self, data):
self.transport.loseConnection()
return

increment('metricsReceived')
metricReceived(metric, datapoint)


Expand Down
22 changes: 19 additions & 3 deletions lib/carbon/relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from twisted.internet import reactor
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.protocols.basic import Int32StringReceiver
from carbon.instrumentation import increment #XXX distinguish relay vs. cache instrumentation. then instrument the relay appropriately. what metrics?
from carbon.instrumentation import increment
from carbon.rules import getDestinations, loadRules
from carbon.conf import settings
from carbon import log
Expand All @@ -17,11 +17,10 @@


def relay(metric, datapoint):
log.relay('relay(metric=%s)' % metric)
increment('metricsReceived')
packet = pickle.dumps( (metric,datapoint) )

for server in getServers(metric):
log.relay('-> %s' % server.host)
server.send(packet)


Expand All @@ -38,6 +37,9 @@ def connectionMade(self):
self.paused = False
self.transport.registerProducer(self, streaming=True)
self.flushQueue()
# Define internal metric names
self.queuedUntilReady = 'destinations.%s.queuedUntilReady' % self.factory.destinationName
self.sent = 'destinations.%s.sent' % self.factory.destinationName

def pauseProducing(self):
self.paused = True
Expand All @@ -56,8 +58,11 @@ def flushQueue(self):
def send(self, data):
if self.paused:
self.queue.append(data)
increment(self.queuedUntilReady)

else:
self.sendString(data)
increment(self.sent)


class MetricSenderFactory(ReconnectingClientFactory):
Expand All @@ -69,6 +74,11 @@ def __init__(self, host, port):
self.port = port
self.remoteAddr = "%s:%d" % (host, port)
self.queue = deque()
# Define internal metric names
self.destinationName = host.replace('.','_')
self.attemptedRelays = 'destinations.%s.attemptedRelays' % self.destinationName
self.fullQueueDrops = 'destinations.%s.fullQueueDrops' % self.destinationName
self.queuedUntilConnected = 'destinations.%s.queuedUntilConnected' % self.destinationName

def startedConnecting(self, connector):
log.relay('connecting to %s' % self.remoteAddr)
Expand All @@ -81,12 +91,18 @@ def buildProtocol(self, addr):
return self.connectedProtocol

def send(self, data):
increment(self.attemptedRelays)

if len(self.queue) >= settings.MAX_QUEUE_SIZE:
log.relay('relay queue full for %s, dropping data' % self.remoteAddr)
increment(self.fullQueueDrops)

elif self.connectedProtocol:
self.connectedProtocol.send(data)

else:
self.queue.append(data)
increment(self.queuedUntilConnected)

def clientConnectionLost(self, connector, reason):
ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
Expand Down

0 comments on commit 7064ae5

Please sign in to comment.