Skip to content

Commit

Permalink
control system
Browse files Browse the repository at this point in the history
nothing internally magically fires it off yet, but now there is an external
command:

lru_crawler crawl [classid]
... will signal the thread to wake up and immediately reap through a
particular class.

need some thought/feedback for internal kickoffs (plugins?)
  • Loading branch information
dormando committed Apr 17, 2014
1 parent 649f7f0 commit 6be2b6c
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 31 deletions.
93 changes: 68 additions & 25 deletions items.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ static crawler crawlers[LARGEST_ID];
static itemstats_t itemstats[LARGEST_ID];
static unsigned int sizes[LARGEST_ID];

static int crawler_count = 0;
static volatile int do_run_lru_crawler_thread = 0;
static int lru_crawler_initialized = 0;
static pthread_mutex_t lru_crawler_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t lru_crawler_cond = PTHREAD_COND_INITIALIZER;

void item_stats_reset(void) {
mutex_lock(&cache_lock);
Expand Down Expand Up @@ -744,62 +748,52 @@ static item *crawler_crawl_q(item *it) {
write their own algorithm (removal based on keys, content, etc).
*/
static void *item_crawler_thread(void *arg) {
int crawler_count = 0;
int i;

if (settings.verbose > 2)
fprintf(stderr, "Starting LRU crawler background thread\n");
while (do_run_lru_crawler_thread) {
for (i = 0; i < LARGEST_ID; i++) {
if (tails[i] != NULL) {
if (settings.verbose > 2)
fprintf(stderr, "Kicking LRU crawler off for slab %d\n", i);
crawlers[i].nbytes = 0;
crawlers[i].nkey = 0;
crawlers[i].it_flags = 1; /* For a crawler, this means enabled. */
crawlers[i].next = 0;
crawlers[i].prev = 0;
crawlers[i].slabs_clsid = i;
mutex_lock(&cache_lock);
crawler_link_q((item *)&crawlers[i]);
mutex_unlock(&cache_lock);
crawler_count++;
}
}
pthread_cond_wait(&lru_crawler_cond, &lru_crawler_lock);
/* TODO: Don't need to hold the crawler lock once we've decided to crawl
* some stuff. This isn't very flexible though: Should be able to signal
* and stop crawlers while they're running.
*/
STATS_LOCK();
stats.lru_crawler_running = true;
STATS_UNLOCK();

/* Not right: Should be able to kick off new crawlers anytime */
while (crawler_count) {
item *search = NULL;
void *hold_lock = NULL;
rel_time_t oldest_live = settings.oldest_live;

for (i = 0; i < LARGEST_ID; i++) {
if (crawlers[i].it_flags == 1) {
mutex_lock(&cache_lock);
pthread_mutex_lock(&cache_lock);
search = crawler_crawl_q((item *)&crawlers[i]);
if (search == NULL) {
if (settings.verbose > 2)
fprintf(stderr, "Nothing left to crawl for %d\n", i);
crawlers[i].it_flags = 0;
crawler_count--;
crawler_unlink_q((item *)&crawlers[i]);
mutex_unlock(&cache_lock);
pthread_mutex_unlock(&cache_lock);
continue;
}
uint32_t hv = hash(ITEM_key(search), search->nkey, 0);
/* Attempt to hash item lock the "search" item. If locked, no
* other callers can incr the refcount
*/
if ((hold_lock = item_trylock(hv)) == NULL) {
mutex_unlock(&cache_lock);
pthread_mutex_unlock(&cache_lock);
continue;
}
/* Now see if the item is refcount locked */
if (refcount_incr(&search->refcount) != 2) {
refcount_decr(&search->refcount);
if (hold_lock)
item_trylock_unlock(hold_lock);
mutex_unlock(&cache_lock);
pthread_mutex_unlock(&cache_lock);
continue;
}

Expand Down Expand Up @@ -828,14 +822,17 @@ static void *item_crawler_thread(void *arg) {
}
if (hold_lock)
item_trylock_unlock(hold_lock);
mutex_unlock(&cache_lock);
pthread_mutex_unlock(&cache_lock);
}
}
usleep(100);
}
if (settings.verbose > 2)
fprintf(stderr, "LRU crawler thread sleeping\n");
sleep(1);
STATS_LOCK();
stats.lru_crawler_running = false;
STATS_UNLOCK();
pthread_mutex_unlock(&lru_crawler_lock);
}
if (settings.verbose > 2)
fprintf(stderr, "LRU crawler thread stopping\n");
Expand All @@ -847,7 +844,10 @@ static pthread_t item_crawler_tid;

int stop_item_crawler_thread(void) {
int ret;
pthread_mutex_lock(&lru_crawler_lock);
do_run_lru_crawler_thread = 0;
pthread_cond_signal(&lru_crawler_cond);
pthread_mutex_unlock(&lru_crawler_lock);
if ((ret = pthread_join(item_crawler_tid, NULL)) != 0) {
fprintf(stderr, "Failed to stop LRU crawler thread: %s\n", strerror(ret));
return -1;
Expand All @@ -857,12 +857,55 @@ int stop_item_crawler_thread(void) {

int start_item_crawler_thread(void) {
int ret;

pthread_mutex_lock(&lru_crawler_lock);
do_run_lru_crawler_thread = 1;
if ((ret = pthread_create(&item_crawler_tid, NULL,
item_crawler_thread, NULL)) != 0) {
fprintf(stderr, "Can't create LRU crawler thread: %s\n",
strerror(ret));
strerror(ret));
pthread_mutex_unlock(&lru_crawler_lock);
return -1;
}
pthread_mutex_unlock(&lru_crawler_lock);

return 0;
}

enum crawler_result_type lru_crawler_crawl(int sid) {
if (pthread_mutex_trylock(&lru_crawler_lock) != 0) {
return CRAWLER_RUNNING;
}
if (sid < POWER_SMALLEST || sid > POWER_LARGEST) {
return CRAWLER_BADCLASS;
}
pthread_mutex_lock(&cache_lock);
if (tails[sid] != NULL) {
if (settings.verbose > 2)
fprintf(stderr, "Kicking LRU crawler off for slab %d\n", sid);
crawlers[sid].nbytes = 0;
crawlers[sid].nkey = 0;
crawlers[sid].it_flags = 1; /* For a crawler, this means enabled. */
crawlers[sid].next = 0;
crawlers[sid].prev = 0;
crawlers[sid].slabs_clsid = sid;
crawler_link_q((item *)&crawlers[sid]);
crawler_count++;
}
pthread_mutex_unlock(&cache_lock);
pthread_cond_signal(&lru_crawler_cond);
pthread_mutex_unlock(&lru_crawler_lock);
return CRAWLER_OK;
}

int init_lru_crawler(void) {
if (lru_crawler_initialized == 0) {
if (pthread_cond_init(&lru_crawler_cond, NULL) != 0) {
fprintf(stderr, "Can't initialize lru crawler condition\n");
return -1;
}
pthread_mutex_init(&lru_crawler_lock, NULL);
lru_crawler_initialized = 1;
}
return 0;
}
6 changes: 6 additions & 0 deletions items.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,11 @@ void item_stats_reset(void);
extern pthread_mutex_t cache_lock;
void item_stats_evictions(uint64_t *evicted);

enum crawler_result_type {
CRAWLER_OK=0, CRAWLER_RUNNING, CRAWLER_BADCLASS
};

int start_item_crawler_thread(void);
int stop_item_crawler_thread(void);
int init_lru_crawler(void);
enum crawler_result_type lru_crawler_crawl(int sid);
37 changes: 34 additions & 3 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ static void stats_init(void) {
stats.slabs_moved = 0;
stats.accepting_conns = true; /* assuming we start in this state. */
stats.slab_reassign_running = false;
stats.lru_crawler_running = false;

/* make the time we started always be 2 seconds before we really
did, so time(0) - time.started is never zero. if so, things
Expand Down Expand Up @@ -2614,6 +2615,9 @@ static void server_stats(ADD_STAT add_stats, conn *c) {
APPEND_STAT("slab_reassign_running", "%u", stats.slab_reassign_running);
APPEND_STAT("slabs_moved", "%llu", stats.slabs_moved);
}
if (settings.lru_crawler) {
APPEND_STAT("lru_crawler_running", "%u", stats.lru_crawler_running);
}
APPEND_STAT("malloc_fails", "%llu",
(unsigned long long)stats.malloc_fails);
STATS_UNLOCK();
Expand Down Expand Up @@ -2648,9 +2652,9 @@ static void process_stat_settings(ADD_STAT add_stats, void *c) {
APPEND_STAT("item_size_max", "%d", settings.item_size_max);
APPEND_STAT("maxconns_fast", "%s", settings.maxconns_fast ? "yes" : "no");
APPEND_STAT("hashpower_init", "%d", settings.hashpower_init);
APPEND_STAT("lru_crawler", "%s", settings.lru_crawler ? "yes" : "no");
APPEND_STAT("slab_reassign", "%s", settings.slab_reassign ? "yes" : "no");
APPEND_STAT("slab_automove", "%d", settings.slab_automove);
APPEND_STAT("lru_crawler", "%s", settings.lru_crawler ? "yes" : "no");
APPEND_STAT("tail_repair_time", "%d", settings.tail_repair_time);
APPEND_STAT("flush_enabled", "%s", settings.flush_enabled ? "yes" : "no");
APPEND_STAT("hash_algorithm", "%s", settings.hash_algorithm);
Expand Down Expand Up @@ -3558,8 +3562,32 @@ static void process_command(conn *c, char *command) {
out_string(c, "ERROR");
}
} else if (ntokens > 1 && strcmp(tokens[COMMAND_TOKEN].value, "lru_crawler") == 0) {
/* FIXME: Need a lock to serialize item crawler commands */
if (ntokens == 3) {
if (ntokens == 4 && strcmp(tokens[COMMAND_TOKEN + 1].value, "crawl") == 0) {
int sid, rv;
if (settings.lru_crawler == false) {
out_string(c, "CLIENT_ERROR lru crawler disabled");
return;
}
sid = strtol(tokens[2].value, NULL, 10);

if (errno == ERANGE) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
rv = lru_crawler_crawl(sid);
switch(rv) {
case CRAWLER_OK:
out_string(c, "OK");
break;
case CRAWLER_RUNNING:
out_string(c, "BUSY currently processing crawler request");
break;
case CRAWLER_BADCLASS:
out_string(c, "BADCLASS invalid class id");
break;
}
return;
} else if (ntokens == 3) {
if ((strcmp(tokens[COMMAND_TOKEN + 1].value, "enable") == 0)) {
if (start_item_crawler_thread() == 0) {
settings.lru_crawler = true;
Expand Down Expand Up @@ -5464,6 +5492,9 @@ int main (int argc, char **argv) {
exit(EXIT_FAILURE);
}

/* Run regardless of initializing it later */
init_lru_crawler();

/* initialise clock event */
clock_handler(0, 0, 0);

Expand Down
1 change: 1 addition & 0 deletions memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ struct stats {
uint64_t evicted_unfetched; /* items evicted but never touched */
bool slab_reassign_running; /* slab reassign in progress */
uint64_t slabs_moved; /* times slabs were moved around */
bool lru_crawler_running; /* crawl in progress */
};

#define MAX_VERBOSITY_LEVEL 2
Expand Down
12 changes: 9 additions & 3 deletions t/lru-crawler.t
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use strict;
use warnings;
use Test::More tests => 188;
use Test::More tests => 189;
use FindBin qw($Bin);
use lib "$Bin/lib";
use MemcachedTest;
Expand Down Expand Up @@ -36,15 +36,21 @@ for (1 .. 30) {
is($slabs->{"1:used_chunks"}, 90, "slab1 has 90 used chunks");
}

sleep 3;

print $sock "lru_crawler enable\r\n";
is(scalar <$sock>, "OK\r\n", "enabled lru crawler");
{
my $stats = mem_stats($server->sock, ' settings');
is($stats->{lru_crawler}, "yes");
}

# TODO: counter for how often it's run? then poll that?
sleep 5;
print $sock "lru_crawler crawl 1\r\n";
is(scalar <$sock>, "OK\r\n", "kicked lru crawler");
while (1) {
my $stats = mem_stats($sock);
last unless $stats->{lru_crawler_running};
}

{
my $slabs = mem_stats($sock, "slabs");
Expand Down

0 comments on commit 6be2b6c

Please sign in to comment.