Skip to content

Commit

Permalink
Fixed dyn-stats unused-metric-ttl bug which was was expected to garba…
Browse files Browse the repository at this point in the history
…ge-collect accumulators that were unused for time-period >= ttl, but used to wipe all accumulators. This worked from memory-footprint and metric-life point-of-view, but used to reset accumulators even when they were configured to not be reset. Now it maintains survivor(shadow) table which keeps the accumulator value around from ttl up-to (2 * ttl) and reclaims it only if no dyn_inc call resurrects it by the end of this period. This allows us to resurrect surviving counters if they are used within ttl duration. This keeps a counter alive up-to (2 * ttl) in the worst case(late eviction) and ttl(timely eviction) in the best case.
  • Loading branch information
janmejay committed Apr 14, 2016
1 parent 1613249 commit 5f071af
Show file tree
Hide file tree
Showing 12 changed files with 305 additions and 131 deletions.
216 changes: 133 additions & 83 deletions runtime/dynstats.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,35 @@ dynstatsClassInit(void) {
}

static inline void
dynstats_destroyCtr(dynstats_bucket_t *b, dynstats_ctr_t *ctr, uint8_t destructStatsCtr) {
if (destructStatsCtr) {
dynstats_destroyCtr(dynstats_bucket_t *b, dynstats_ctr_t *ctr, uint8_t linked) {
if (linked) {
statsobj.DestructCounter(b->stats, ctr->pCtr);
} else {
statsobj.DestructUnlinkedCounter(ctr->pCtr);
}
free(ctr->metric);
free(ctr);
}

static inline void /* assumes exclusive access to bucket */
dynstats_destroyCounters(dynstats_bucket_t *b) {
dynstats_destroyCountersIn(dynstats_bucket_t *b, htable *table, dynstats_ctr_t *ctrs) {
dynstats_ctr_t *ctr;

hashtable_destroy(b->table, 0);
statsobj.DestructAllCounters(b->stats);
while(1) {
ctr = b->ctrs;
if (ctr == NULL) {
break;
} else {
b->ctrs = ctr->next;
dynstats_destroyCtr(b, ctr, 0);
}
int ctrs_purged = 0;
hashtable_destroy(table, 0);
while (ctrs != NULL) {
ctr = ctrs;
ctrs = ctrs->next;
dynstats_destroyCtr(b, ctr, 0);
ctrs_purged++;
}
STATSCOUNTER_BUMP(b->ctrMetricsPurged, b->mutCtrMetricsPurged, b->metricCount);
STATSCOUNTER_ADD(b->ctrMetricsPurged, b->mutCtrMetricsPurged, ctrs_purged);
ATOMIC_SUB(&b->metricCount, ctrs_purged, &b->mutMetricCount);
}

static inline void /* assumes exclusive access to bucket */
dynstats_destroyCounters(dynstats_bucket_t *b) {
statsobj.UnlinkAllCounters(b->stats);
dynstats_destroyCountersIn(b, b->table, b->ctrs);
}

void
Expand All @@ -104,6 +109,7 @@ dynstats_destroyBucket(dynstats_bucket_t* b) {

pthread_rwlock_wrlock(&b->lock);
dynstats_destroyCounters(b);
dynstats_destroyCountersIn(b, b->survivor_table, b->survivor_ctrs);
statsobj.Destruct(&b->stats);
free(b->name);
pthread_rwlock_unlock(&b->lock);
Expand Down Expand Up @@ -137,37 +143,37 @@ dynstats_addBucketMetrics(dynstats_buckets_t *bkts, dynstats_bucket_t *b, const
ustrncpy(metric_suffix, suffix_litteral, DYNSTATS_MAX_BUCKET_NS_METRIC_LENGTH);
STATSCOUNTER_INIT(b->ctrOpsOverflow, b->mutCtrOpsOverflow);
CHKiRet(statsobj.AddManagedCounter(bkts->global_stats, metric_name_buff, ctrType_IntCtr,
CTR_FLAG_RESETTABLE, &(b->ctrOpsOverflow), &b->pOpsOverflowCtr));
CTR_FLAG_RESETTABLE, &(b->ctrOpsOverflow), &b->pOpsOverflowCtr, 1));

suffix_litteral = UCHAR_CONSTANT("new_metric_add");
ustrncpy(metric_suffix, suffix_litteral, DYNSTATS_MAX_BUCKET_NS_METRIC_LENGTH);
STATSCOUNTER_INIT(b->ctrNewMetricAdd, b->mutCtrNewMetricAdd);
CHKiRet(statsobj.AddManagedCounter(bkts->global_stats, metric_name_buff, ctrType_IntCtr,
CTR_FLAG_RESETTABLE, &(b->ctrNewMetricAdd), &b->pNewMetricAddCtr));
CTR_FLAG_RESETTABLE, &(b->ctrNewMetricAdd), &b->pNewMetricAddCtr, 1));

suffix_litteral = UCHAR_CONSTANT("no_metric");
ustrncpy(metric_suffix, suffix_litteral, DYNSTATS_MAX_BUCKET_NS_METRIC_LENGTH);
STATSCOUNTER_INIT(b->ctrNoMetric, b->mutCtrNoMetric);
CHKiRet(statsobj.AddManagedCounter(bkts->global_stats, metric_name_buff, ctrType_IntCtr,
CTR_FLAG_RESETTABLE, &(b->ctrNoMetric), &b->pNoMetricCtr));
CTR_FLAG_RESETTABLE, &(b->ctrNoMetric), &b->pNoMetricCtr, 1));

suffix_litteral = UCHAR_CONSTANT("metrics_purged");
ustrncpy(metric_suffix, suffix_litteral, DYNSTATS_MAX_BUCKET_NS_METRIC_LENGTH);
STATSCOUNTER_INIT(b->ctrMetricsPurged, b->mutCtrMetricsPurged);
CHKiRet(statsobj.AddManagedCounter(bkts->global_stats, metric_name_buff, ctrType_IntCtr,
CTR_FLAG_RESETTABLE, &(b->ctrMetricsPurged), &b->pMetricsPurgedCtr));
CTR_FLAG_RESETTABLE, &(b->ctrMetricsPurged), &b->pMetricsPurgedCtr, 1));

suffix_litteral = UCHAR_CONSTANT("ops_ignored");
ustrncpy(metric_suffix, suffix_litteral, DYNSTATS_MAX_BUCKET_NS_METRIC_LENGTH);
STATSCOUNTER_INIT(b->ctrOpsIgnored, b->mutCtrOpsIgnored);
CHKiRet(statsobj.AddManagedCounter(bkts->global_stats, metric_name_buff, ctrType_IntCtr,
CTR_FLAG_RESETTABLE, &(b->ctrOpsIgnored), &b->pOpsIgnoredCtr));
CTR_FLAG_RESETTABLE, &(b->ctrOpsIgnored), &b->pOpsIgnoredCtr, 1));

suffix_litteral = UCHAR_CONSTANT("purge_triggered");
ustrncpy(metric_suffix, suffix_litteral, DYNSTATS_MAX_BUCKET_NS_METRIC_LENGTH);
STATSCOUNTER_INIT(b->ctrPurgeTriggered, b->mutCtrPurgeTriggered);
CHKiRet(statsobj.AddManagedCounter(bkts->global_stats, metric_name_buff, ctrType_IntCtr,
CTR_FLAG_RESETTABLE, &(b->ctrPurgeTriggered), &b->pPurgeTriggeredCtr));
CTR_FLAG_RESETTABLE, &(b->ctrPurgeTriggered), &b->pPurgeTriggeredCtr, 1));

finalize_it:
free(metric_name_buff);
Expand All @@ -187,7 +193,7 @@ dynstats_addBucketMetrics(dynstats_buckets_t *bkts, dynstats_bucket_t *b, const
if (b->pOpsIgnoredCtr != NULL) {
statsobj.DestructCounter(bkts->global_stats, b->pOpsIgnoredCtr);
}
if (b->pPurgeTriggeredCtr != NULL) {
if (b->pPurgeTriggeredCtr != NULL) {
statsobj.DestructCounter(bkts->global_stats, b->pPurgeTriggeredCtr);
}
}
Expand All @@ -197,32 +203,60 @@ dynstats_addBucketMetrics(dynstats_buckets_t *bkts, dynstats_bucket_t *b, const
static void
no_op_free(void __attribute__((unused)) *ignore) {}

static rsRetVal
dynstats_resetBucket(dynstats_bucket_t *b, uint8_t do_purge) {
static rsRetVal /* assumes exclusive access to bucket */
dynstats_rebuildSurvivorTable(dynstats_bucket_t *b) {
htable *survivor_table = NULL;
htable *new_table = NULL;
dynstats_ctr_t ctr;
uchar *metric = NULL;
size_t htab_sz;
DEFiRet;

htab_sz = (size_t) (DYNSTATS_HASHTABLE_SIZE_OVERPROVISIONING * b->maxCardinality + 1);
pthread_rwlock_wrlock(&b->lock);
if (do_purge) {
dynstats_destroyCounters(b);
if (b->table == NULL) {
CHKmalloc(survivor_table = create_hashtable(htab_sz, hash_from_string, key_equals_string, no_op_free));
}
ATOMIC_STORE_0_TO_INT(&b->metricCount, &b->mutMetricCount);
STATSCOUNTER_INC(b->ctrPurgeTriggered, b->mutCtrPurgeTriggered);
b->ctrs = NULL;
if ((b->table = create_hashtable(htab_sz, hash_from_string, key_equals_string, no_op_free)) == NULL) {
errmsg.LogError(errno, RS_RET_INTERNAL_ERROR, "error trying to initialize hash-table for dyn-stats bucket named: %s", b->name);
ABORT_FINALIZE(RS_RET_INTERNAL_ERROR);
CHKmalloc(new_table = create_hashtable(htab_sz, hash_from_string, key_equals_string, no_op_free));
statsobj.UnlinkAllCounters(b->stats);
if (b->survivor_table != NULL) {
dynstats_destroyCountersIn(b, b->survivor_table, b->survivor_ctrs);
}

timeoutComp(&b->metricCleanupTimeout, b->unusedMetricLife);
b->survivor_table = (b->table == NULL) ? survivor_table : b->table;
b->survivor_ctrs = b->ctrs;
b->table = new_table;
b->ctrs = NULL;
finalize_it:
pthread_rwlock_unlock(&b->lock);
if (iRet != RS_RET_OK) {
statsobj.Destruct(&b->stats);
errmsg.LogError(errno, RS_RET_INTERNAL_ERROR, "error trying to evict TTL-expired metrics of dyn-stats bucket named: %s", b->name);
if (new_table == NULL) {
errmsg.LogError(errno, RS_RET_INTERNAL_ERROR, "error trying to initialize hash-table for dyn-stats bucket named: %s", b->name);
} else {
hashtable_destroy(new_table, 0);
}
if (b->table == NULL) {
if (survivor_table == NULL) {
errmsg.LogError(errno, RS_RET_INTERNAL_ERROR, "error trying to initialize ttl-survivor hash-table for dyn-stats bucket named: %s", b->name);
} else {
hashtable_destroy(survivor_table, 0);
}
}
}
RETiRet;
}

static rsRetVal
dynstats_resetBucket(dynstats_bucket_t *b) {
htable *new_table = NULL;
DEFiRet;
pthread_rwlock_wrlock(&b->lock);
CHKiRet(dynstats_rebuildSurvivorTable(b));
STATSCOUNTER_INC(b->ctrPurgeTriggered, b->mutCtrPurgeTriggered);
timeoutComp(&b->metricCleanupTimeout, b->unusedMetricLife);
finalize_it:
pthread_rwlock_unlock(&b->lock);
RETiRet;
}

static inline void
dynstats_resetIfExpired(dynstats_bucket_t *b) {
long timeout;
Expand All @@ -231,7 +265,7 @@ dynstats_resetIfExpired(dynstats_bucket_t *b) {
pthread_rwlock_unlock(&b->lock);
if (timeout == 0) {
errmsg.LogMsg(0, RS_RET_TIMED_OUT, LOG_INFO, "dynstats: bucket '%s' is being reset", b->name);
dynstats_resetBucket(b, 1);
dynstats_resetBucket(b);
}
}

Expand Down Expand Up @@ -292,17 +326,17 @@ dynstats_newBucket(const uchar* name, uint8_t resettable, uint32_t maxCardinalit

CHKiRet(dynstats_initNewBucketStats(b));

CHKiRet(dynstats_resetBucket(b, 0));
CHKiRet(dynstats_resetBucket(b));

CHKiRet(dynstats_addBucketMetrics(bkts, b, name));

pthread_rwlock_wrlock(&bkts->lock);
if (bkts->list == NULL) {
bkts->list = b;
} else {
b->next = bkts->list;
bkts->list = b;
}
if (bkts->list == NULL) {
bkts->list = b;
} else {
b->next = bkts->list;
bkts->list = b;
}
pthread_rwlock_unlock(&bkts->lock);
} else {
errmsg.LogError(0, RS_RET_INTERNAL_ERROR, "dynstats: bucket creation failed, as global-initialization of buckets was unsuccessful");
Expand All @@ -317,8 +351,7 @@ dynstats_newBucket(const uchar* name, uint8_t resettable, uint32_t maxCardinalit
pthread_rwlock_destroy(&b->lock);
}
if (b != NULL) {
free(b->name);
free(b);
dynstats_destroyBucket(b);
}
}
RETiRet;
Expand Down Expand Up @@ -355,9 +388,9 @@ dynstats_processCnf(struct cnfobj *o) {
"param '%s'\n", modpblk.descr[i].name);
}
}
if (name != NULL) {
CHKiRet(dynstats_newBucket(name, resettable, maxCardinality, unusedMetricLife));
}
if (name != NULL) {
CHKiRet(dynstats_newBucket(name, resettable, maxCardinality, unusedMetricLife));
}

finalize_it:
free(name);
Expand Down Expand Up @@ -400,7 +433,7 @@ dynstats_destroyAllBuckets() {
if (b == NULL) {
break;
} else {
bkts->list = b->next;
bkts->list = b->next;
dynstats_destroyBucket(b);
}
}
Expand All @@ -416,13 +449,13 @@ dynstats_findBucket(const uchar* name) {
bkts = &loadConf->dynstats_buckets;
if (bkts->initialized) {
pthread_rwlock_rdlock(&bkts->lock);
b = bkts->list;
while(b != NULL) {
if (! ustrcmp(name, b->name)) {
b = bkts->list;
while(b != NULL) {
if (! ustrcmp(name, b->name)) {
break;
}
b = b->next;
}
b = b->next;
}
pthread_rwlock_unlock(&bkts->lock);
} else {
b = NULL;
Expand All @@ -441,22 +474,22 @@ dynstats_createCtr(dynstats_bucket_t *b, const uchar* metric, dynstats_ctr_t **c
STATSCOUNTER_INIT((*ctr)->ctr, (*ctr)->mutCtr);
CHKiRet(statsobj.AddManagedCounter(b->stats, metric, ctrType_IntCtr,
b->resettable ? CTR_FLAG_MUST_RESET : CTR_FLAG_NONE,
&(*ctr)->ctr, &(*ctr)->pCtr));
&(*ctr)->ctr, &(*ctr)->pCtr, 0));
finalize_it:
if (iRet != RS_RET_OK) {
if ((*ctr) != NULL) {
free((*ctr)->metric);
free(*ctr);
*ctr = NULL;
}
if ((*ctr) != NULL) {
free((*ctr)->metric);
free(*ctr);
*ctr = NULL;
}
}
RETiRet;
}

static rsRetVal
dynstats_addNewCtr(dynstats_bucket_t *b, const uchar* metric, uint8_t doInitialIncrement) {
dynstats_ctr_t *ctr;
dynstats_ctr_t *found_ctr;
dynstats_ctr_t *found_ctr, *survivor_ctr, *effective_ctr;
int created;
uchar *copy_of_key = NULL;
DEFiRet;
Expand All @@ -471,45 +504,62 @@ dynstats_addNewCtr(dynstats_bucket_t *b, const uchar* metric, uint8_t doInitialI
CHKiRet(dynstats_createCtr(b, metric, &ctr));

pthread_rwlock_wrlock(&b->lock);
found_ctr = (dynstats_ctr_t*) hashtable_search(b->table, ctr->metric);
found_ctr = (dynstats_ctr_t*) hashtable_search(b->table, ctr->metric);
if (found_ctr != NULL) {
if (doInitialIncrement) {
STATSCOUNTER_INC(found_ctr->ctr, found_ctr->mutCtr);
}
} else {
copy_of_key = ustrdup(ctr->metric);
if (copy_of_key != NULL) {
created = hashtable_insert(b->table, copy_of_key, ctr);
}
copy_of_key = ustrdup(ctr->metric);
if (copy_of_key != NULL) {
survivor_ctr = (dynstats_ctr_t*) hashtable_search(b->survivor_table, ctr->metric);
if (survivor_ctr == NULL) {
effective_ctr = ctr;
} else {
effective_ctr = survivor_ctr;
if (survivor_ctr->prev != NULL) {
survivor_ctr->prev->next = survivor_ctr->next;
}
if (survivor_ctr->next != NULL) {
survivor_ctr->next->prev = survivor_ctr->prev;
}
if (survivor_ctr == b->survivor_ctrs) {
b->survivor_ctrs = survivor_ctr->next;
}
}
if (created = hashtable_insert(b->table, copy_of_key, effective_ctr)) {
statsobj.AddPreCreatedCtr(b->stats, effective_ctr->pCtr);
}
}
if (created) {
if (b->ctrs == NULL) {
b->ctrs = ctr;
} else {
ctr->next = b->ctrs;
b->ctrs = ctr;
}
if (b->ctrs != NULL) {
b->ctrs->prev = effective_ctr;
}
effective_ctr->prev = NULL;
effective_ctr->next = b->ctrs;
b->ctrs = effective_ctr;
if (doInitialIncrement) {
STATSCOUNTER_INC(ctr->ctr, ctr->mutCtr);
STATSCOUNTER_INC(effective_ctr->ctr, effective_ctr->mutCtr);
}
}
}
pthread_rwlock_unlock(&b->lock);

if (found_ctr != NULL) {
//ignore
} else if (created) {
} else if (created && (effective_ctr != survivor_ctr)) {
ATOMIC_INC(&b->metricCount, &b->mutMetricCount);
STATSCOUNTER_INC(b->ctrNewMetricAdd, b->mutCtrNewMetricAdd);
} else {
if (copy_of_key != NULL) {
free(copy_of_key);
}
} else if (! created) {
if (copy_of_key != NULL) {
free(copy_of_key);
}
ABORT_FINALIZE(RS_RET_OUT_OF_MEMORY);
}

finalize_it:
if ((! created) && (ctr != NULL)) {
dynstats_destroyCtr(b, ctr, 1);
if (((! created) || (effective_ctr != ctr)) && (ctr != NULL)) {
dynstats_destroyCtr(b, ctr, 0);
}
RETiRet;
}
Expand All @@ -529,7 +579,7 @@ dynstats_inc(dynstats_bucket_t *b, uchar* metric) {
}

if (pthread_rwlock_tryrdlock(&b->lock) == 0) {
ctr = (dynstats_ctr_t *) hashtable_search(b->table, metric);
ctr = (dynstats_ctr_t *) hashtable_search(b->table, metric);
if (ctr != NULL) {
STATSCOUNTER_INC(ctr->ctr, ctr->mutCtr);
}
Expand Down
Loading

0 comments on commit 5f071af

Please sign in to comment.