Skip to content

Commit

Permalink
Merge pull request graphite-project#628 from iksaif/pr-strategy
Browse files Browse the repository at this point in the history
Fix CACHE_WRITE_STRATEGY and add TimeSortedStrategy
  • Loading branch information
iksaif authored Feb 19, 2017
2 parents e489d6a + 0e6b9e3 commit abff76a
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 20 deletions.
6 changes: 6 additions & 0 deletions conf/carbon.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,12 @@ LOG_CACHE_QUEUE_SORTS = False
# moment of the list's creation. Metrics will then be flushed from the cache to
# disk in that order.
#
# timesorted - All metrics in the list will be looked at and sorted according
# to the timestamp of there datapoints. The metric that were the least recently
# written will be written first. This is an hybrid strategy between max and
# sorted which is particularly adapted to sets of metrics with non-uniform
# resolutions.
#
# max - The writer thread will always pop and flush the metric from cache
# that has the most datapoints. This will give a strong flush preference to
# frequently updated metrics and will also reduce random file-io. Infrequently
Expand Down
71 changes: 59 additions & 12 deletions lib/carbon/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ def by_timestamp((timestamp, value)): # useful sort key function
class CacheFeedingProcessor(Processor):
plugin_name = 'write'

def __init__(self, *args, **kwargs):
super(Processor, self).__init__(*args, **kwargs)
self.cache = MetricCache()

def process(self, metric, datapoint):
MetricCache.store(metric, datapoint)
self.cache.store(metric, datapoint)
return Processor.NO_OUTPUT


Expand Down Expand Up @@ -94,6 +98,30 @@ def _generate_queue():
log.msg("Sorted %d cache queues in %.6f seconds" % (len(metric_counts), time.time() - t))
while metric_counts:
yield itemgetter(0)(metric_counts.pop())
log.msg("Queue consumed in %.6f seconds" % (time.time() - t))

self.queue = _generate_queue()

def choose_item(self):
return self.queue.next()


class TimeSortedStrategy(DrainStrategy):
""" This strategy prefers metrics wich are lagging behind
guarantees every point gets written exactly once during
a loop of the cache """
def __init__(self, cache):
super(TimeSortedStrategy, self).__init__(cache)

def _generate_queue():
while True:
t = time.time()
metric_lw = sorted(self.cache.watermarks, key=lambda x: x[1], reverse=True)
if settings.LOG_CACHE_QUEUE_SORTS:
log.msg("Sorted %d cache queues in %.6f seconds" % (len(metric_lw), time.time() - t))
while metric_lw:
yield itemgetter(0)(metric_lw.pop())
log.msg("Queue consumed in %.6f seconds" % (time.time() - t))

self.queue = _generate_queue()

Expand All @@ -115,6 +143,12 @@ def __init__(self, strategy=None):
def counts(self):
return [(metric, len(datapoints)) for (metric, datapoints) in self.items()]

@property
def watermarks(self):
return [(metric, min(datapoints.keys()), max(datapoints.keys()))
for (metric, datapoints) in self.items()
if datapoints]

@property
def is_full(self):
if settings.MAX_CACHE_SIZE == float('inf'):
Expand Down Expand Up @@ -167,18 +201,31 @@ def store(self, metric, datapoint):
self[metric][timestamp] = value


# Initialize a singleton cache instance
write_strategy = None
if settings.CACHE_WRITE_STRATEGY == 'naive':
write_strategy = NaiveStrategy
if settings.CACHE_WRITE_STRATEGY == 'max':
write_strategy = MaxStrategy
if settings.CACHE_WRITE_STRATEGY == 'sorted':
write_strategy = SortedStrategy
if settings.CACHE_WRITE_STRATEGY == 'random':
write_strategy = RandomStrategy
_Cache = None

def MetricCache():
global _Cache
if _Cache is not None:
return _Cache

# Initialize a singleton cache instance
# TODO: use plugins.
write_strategy = None
if settings.CACHE_WRITE_STRATEGY == 'naive':
write_strategy = NaiveStrategy
if settings.CACHE_WRITE_STRATEGY == 'max':
write_strategy = MaxStrategy
if settings.CACHE_WRITE_STRATEGY == 'sorted':
write_strategy = SortedStrategy
if settings.CACHE_WRITE_STRATEGY == 'timesorted':
write_strategy = TimeSortedStrategy
if settings.CACHE_WRITE_STRATEGY == 'random':
write_strategy = RandomStrategy

_Cache = _MetricCache(write_strategy)
return _Cache


MetricCache = _MetricCache(write_strategy)

# Avoid import circularities
from carbon import state
2 changes: 1 addition & 1 deletion lib/carbon/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ def postOptions(self):
print "Error: missing required config %s" % storage_schemas
sys.exit(1)

if settings.CACHE_WRITE_STRATEGY not in ('sorted', 'max', 'naive'):
if settings.CACHE_WRITE_STRATEGY not in ('timesorted', 'sorted', 'max', 'naive'):
log.err("%s is not a valid value for CACHE_WRITE_STRATEGY, defaulting to %s" %
(settings.CACHE_WRITE_STRATEGY, defaults['CACHE_WRITE_STRATEGY']))
else:
Expand Down
6 changes: 3 additions & 3 deletions lib/carbon/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ def recordMetrics():

# Calculate cache-data-structure-derived metrics prior to storing anything
# in the cache itself -- which would otherwise affect said metrics.
cache_size = cache.MetricCache.size
cache_queues = len(cache.MetricCache)
cache_size = cache.MetricCache().size
cache_queues = len(cache.MetricCache())
record('cache.size', cache_size)
record('cache.queues', cache_queues)

Expand Down Expand Up @@ -167,7 +167,7 @@ def cache_record(metric, value):
else:
fullMetric = '%s.agents.%s-%s.%s' % (prefix, HOSTNAME, settings.instance, metric)
datapoint = (time.time(), value)
cache.MetricCache.store(fullMetric, datapoint)
cache.MetricCache().store(fullMetric, datapoint)


def relay_record(metric, value):
Expand Down
36 changes: 35 additions & 1 deletion lib/carbon/tests/test_cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from unittest import TestCase
from mock import Mock, PropertyMock, patch
from carbon.cache import _MetricCache, DrainStrategy, MaxStrategy, RandomStrategy, SortedStrategy
from carbon.cache import MetricCache, _MetricCache, DrainStrategy, MaxStrategy, RandomStrategy, SortedStrategy, TimeSortedStrategy


class MetricCacheTest(TestCase):
Expand All @@ -17,6 +17,16 @@ def setUp(self):
def tearDown(self):
self._settings_patch.stop()

def test_constructor(self):
settings = {
'CACHE_WRITE_STRATEGY': 'max',
}
settings_patch = patch.dict('carbon.conf.settings', settings)
settings_patch.start()
cache = MetricCache()
self.assertNotEqual(cache, None)
self.assertTrue(isinstance(cache.strategy, MaxStrategy))

def test_cache_is_a_dict(self):
self.assertTrue(issubclass(_MetricCache, dict))

Expand Down Expand Up @@ -224,6 +234,30 @@ def test_sorted_strategy_changing_sizes(self):
self.assertEqual('bar', sorted_strategy.choose_item())
self.assertEqual('baz', sorted_strategy.choose_item())

def test_time_sorted_strategy(self):
self.metric_cache.store('foo', (123456, 1.0))
self.metric_cache.store('foo', (123457, 2.0))
self.metric_cache.store('foo', (123458, 3.0))
self.metric_cache.store('bar', (123459, 4.0))
self.metric_cache.store('bar', (123460, 5.0))
self.metric_cache.store('baz', (123461, 6.0))

time_sorted_strategy = TimeSortedStrategy(self.metric_cache)
# In order: foo, bar, baz
self.assertEqual('foo', time_sorted_strategy.choose_item())

# 'baz' gets older points.
self.metric_cache.store('baz', (123450, 6.0))
self.metric_cache.store('baz', (123451, 6.0))
# But 'bar' is popped anyway, because sort has already happened
self.assertEqual('bar', time_sorted_strategy.choose_item())
self.assertEqual('baz', time_sorted_strategy.choose_item())

# Sort happens again
self.assertEqual('baz', time_sorted_strategy.choose_item())
self.assertEqual('foo', time_sorted_strategy.choose_item())
self.assertEqual('bar', time_sorted_strategy.choose_item())


class RandomStrategyTest(TestCase):
def setUp(self):
Expand Down
8 changes: 5 additions & 3 deletions lib/carbon/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@
def optimalWriteOrder():
"""Generates metrics with the most cached values first and applies a soft
rate limit on new metrics"""
while MetricCache:
(metric, datapoints) = MetricCache.drain_metric()
cache = MetricCache()
while cache:
(metric, datapoints) = cache.drain_metric()
dbFileExists = state.database.exists(metric)

if not dbFileExists and CREATE_BUCKET:
Expand All @@ -75,7 +76,8 @@ def optimalWriteOrder():
def writeCachedDataPoints():
"Write datapoints until the MetricCache is completely empty"

while MetricCache:
cache = MetricCache()
while cache:
dataWritten = False

for (metric, datapoints, dbFileExists) in optimalWriteOrder():
Expand Down

0 comments on commit abff76a

Please sign in to comment.