Skip to content

Commit

Permalink
refactor chunk chaining for memory efficiency
Browse files Browse the repository at this point in the history
Memory chunk chains would simply stitch multiple chunks of the highest slab
class together. If your item was 17k and the chunk limit is 16k, the item
would use 32k of space instead of a bit over 17k.

This refactor simplifies the slab allocation path and pulls the allocation of
chunks into the upload process. A "large" item gets a small chunk assigned as
an object header, rather than attempting to inline a slab chunk into a parent
chunk. It then gets chunks individually allocated and added into the chain
while the object uploads.

This solves a lot of issues:

1) When assembling new, potentially very large items, we don't have to sit and
spin evicting objects all at once. If there are 20 16k chunks in the tail and
we allocate a 1 meg item, the new item will evict one of those chunks
inbetween each read, rather than trying to guess how many loops to run before
giving up. Very large objects take time to read from the socket anyway.

2) Simplifies code around the initial chunk. Originally embedding data into
the top chunk and embedding data at the same time required a good amount of
fiddling. (Though this might flip back to embedding the initial chunk if I can
clean it up a bit more).

3) Pulling chunks individually means the slabber code can be flatened to not
think about chunks aside from freeing them, which culled a lot of code and
removed branches from a hot path.

4) The size of the final chunk is naturally set to the remaining about of
bytes that need to be stored, which means chunks from another slab class can
be pulled to "cap off" a large item, reducing memory overhead.
  • Loading branch information
dormando committed Mar 20, 2017
1 parent 2f9f51d commit ae84d77
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 185 deletions.
123 changes: 85 additions & 38 deletions items.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,39 +169,15 @@ static size_t item_make_header(const uint8_t nkey, const unsigned int flags, con
return sizeof(item) + nkey + *nsuffix + nbytes;
}

item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
const rel_time_t exptime, const int nbytes) {
int i;
uint8_t nsuffix;
static item *do_item_alloc_pull(const size_t ntotal, const unsigned int id) {
item *it = NULL;
char suffix[40];
// Avoid potential underflows.
if (nbytes < 2)
return 0;

size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
if (settings.use_cas) {
ntotal += sizeof(uint64_t);
}

unsigned int id = slabs_clsid(ntotal);
if (id == 0)
return 0;

int i;
/* If no memory is available, attempt a direct LRU juggle/eviction */
/* This is a race in order to simplify lru_pull_tail; in cases where
* locked items are on the tail, you want them to fall out and cause
* occasional OOM's, rather than internally work around them.
* This also gives one fewer code path for slab alloc/free
*/
/* TODO: if power_largest, try a lot more times? or a number of times
* based on how many chunks the new object should take up?
* or based on the size of an object lru_pull_tail() says it evicted?
* This is a classical GC problem if "large items" are of too varying of
* sizes. This is actually okay here since the larger the data, the more
* bandwidth it takes, the more time we can loop in comparison to serving
* and replacing small items.
*/
for (i = 0; i < 10; i++) {
uint64_t total_bytes;
/* Try to reclaim memory first */
Expand All @@ -214,13 +190,12 @@ item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
total_bytes -= temp_lru_size(id);

if (it == NULL) {
if (settings.lru_segmented) {
if (lru_pull_tail(id, COLD_LRU, total_bytes, LRU_PULL_EVICT, 0) <= 0) {
if (lru_pull_tail(id, COLD_LRU, total_bytes, LRU_PULL_EVICT, 0) <= 0) {
if (settings.lru_segmented) {
lru_pull_tail(id, HOT_LRU, total_bytes, 0, 0);
}
} else {
if (lru_pull_tail(id, COLD_LRU, 0, LRU_PULL_EVICT, 0) <= 0)
} else {
break;
}
}
} else {
break;
Expand All @@ -233,6 +208,80 @@ item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
pthread_mutex_unlock(&lru_locks[id]);
}

return it;
}

/* Chain another chunk onto this chunk. */
/* slab mover: if it finds a chunk without ITEM_CHUNK flag, and no ITEM_LINKED
* flag, it counts as busy and skips.
* I think it might still not be safe to do linking outside of the slab lock
*/
item_chunk *do_item_alloc_chunk(item_chunk *ch, const size_t bytes_remain) {
// TODO: Should be a cleaner way of finding real size with slabber calls
size_t size = bytes_remain + sizeof(item_chunk);
if (size > settings.slab_chunk_size_max)
size = settings.slab_chunk_size_max;
unsigned int id = slabs_clsid(size);

item_chunk *nch = (item_chunk *) do_item_alloc_pull(size, id);
if (nch == NULL)
return NULL;

// link in.
// ITEM_CHUNK[ED] bits need to be protected by the slabs lock.
slabs_mlock();
nch->head = ch->head;
ch->next = nch;
nch->prev = ch;
nch->next = 0;
nch->used = 0;
nch->slabs_clsid = id;
nch->size = size - sizeof(item_chunk);
nch->it_flags |= ITEM_CHUNK;
slabs_munlock();
return nch;
}

item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
const rel_time_t exptime, const int nbytes) {
uint8_t nsuffix;
item *it = NULL;
char suffix[40];
// Avoid potential underflows.
if (nbytes < 2)
return 0;

size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
if (settings.use_cas) {
ntotal += sizeof(uint64_t);
}

unsigned int id = slabs_clsid(ntotal);
unsigned int hdr_id = 0;
if (id == 0)
return 0;

/* This is a large item. Allocate a header object now, lazily allocate
* chunks while reading the upload.
*/
if (ntotal > settings.slab_chunk_size_max) {
/* We still link this item into the LRU for the larger slab class, but
* we're pulling a header from an entirely different slab class. The
* free routines handle large items specifically.
*/
int htotal = nkey + 1 + nsuffix + sizeof(item) + sizeof(item_chunk);
if (settings.use_cas) {
htotal += sizeof(uint64_t);
}
hdr_id = slabs_clsid(htotal);
it = do_item_alloc_pull(htotal, hdr_id);
/* setting ITEM_CHUNKED is fine here because we aren't LINKED yet. */
if (it != NULL)
it->it_flags |= ITEM_CHUNKED;
} else {
it = do_item_alloc_pull(ntotal, id);
}

if (it == NULL) {
pthread_mutex_lock(&lru_locks[id]);
itemstats[id].outofmemory++;
Expand Down Expand Up @@ -273,18 +322,16 @@ item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags,
}
it->nsuffix = nsuffix;

/* Need to shuffle the pointer stored in h_next into it->data. */
/* Initialize internal chunk. */
if (it->it_flags & ITEM_CHUNKED) {
item_chunk *chunk = (item_chunk *) ITEM_data(it);

chunk->next = (item_chunk *) it->h_next;
chunk->next = 0;
chunk->prev = 0;
chunk->head = it;
/* Need to chain back into the head's chunk */
chunk->next->prev = chunk;
chunk->size = chunk->next->size - ((char *)chunk - (char *)it);
chunk->used = 0;
assert(chunk->size > 0);
chunk->size = 0;
chunk->head = it;
chunk->orig_clsid = hdr_id;
}
it->h_next = 0;

Expand Down
1 change: 1 addition & 0 deletions items.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ uint64_t get_cas_id(void);

/*@null@*/
item *do_item_alloc(char *key, const size_t nkey, const unsigned int flags, const rel_time_t exptime, const int nbytes);
item_chunk *do_item_alloc_chunk(item_chunk *ch, const size_t bytes_remain);
void item_free(item *it);
bool item_size_ok(const size_t nkey, const int flags, const int nbytes);

Expand Down
115 changes: 81 additions & 34 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -1398,17 +1398,10 @@ static void complete_update_bin(conn *c) {
item_chunk *ch = (item_chunk *) c->ritem;
if (ch->size == ch->used)
ch = ch->next;
if (ch->size - ch->used > 1) {
ch->data[ch->used + 1] = '\r';
ch->data[ch->used + 2] = '\n';
ch->used += 2;
} else {
ch->data[ch->used + 1] = '\r';
ch->next->data[0] = '\n';
ch->used++;
ch->next->used++;
assert(ch->size == ch->used);
}
assert(ch->size - ch->used >= 2);
ch->data[ch->used + 1] = '\r';
ch->data[ch->used + 2] = '\n';
ch->used += 2;
}

ret = store_item(it, c->cmd, c);
Expand Down Expand Up @@ -2534,11 +2527,15 @@ static void complete_nread(conn *c) {

/* Destination must always be chunked */
/* This should be part of item.c */
static void _store_item_copy_chunks(item *d_it, item *s_it, const int len) {
static int _store_item_copy_chunks(item *d_it, item *s_it, const int len) {
item_chunk *dch = (item_chunk *) ITEM_data(d_it);
/* Advance dch until we find free space */
while (dch->size == dch->used) {
dch = dch->next;
if (dch->next) {
dch = dch->next;
} else {
break;
}
}

if (s_it->it_flags & ITEM_CHUNKED) {
Expand All @@ -2560,7 +2557,12 @@ static void _store_item_copy_chunks(item *d_it, item *s_it, const int len) {
remain -= todo;
assert(dch->used <= dch->size);
if (dch->size == dch->used) {
dch = dch->next;
item_chunk *tch = do_item_alloc_chunk(dch, remain);
if (tch) {
dch = tch;
} else {
return -1;
}
}
assert(copied <= sch->used);
if (copied == sch->used) {
Expand All @@ -2576,37 +2578,49 @@ static void _store_item_copy_chunks(item *d_it, item *s_it, const int len) {
while (len > done && dch) {
int todo = (dch->size - dch->used < len - done)
? dch->size - dch->used : len - done;
assert(dch->size - dch->used != 0);
//assert(dch->size - dch->used != 0);
memcpy(dch->data + dch->used, ITEM_data(s_it) + done, todo);
done += todo;
dch->used += todo;
assert(dch->used <= dch->size);
if (dch->size == dch->used)
dch = dch->next;
if (dch->size == dch->used) {
item_chunk *tch = do_item_alloc_chunk(dch, len - done);
if (tch) {
dch = tch;
} else {
return -1;
}
}
}
assert(len == done);
}
return 0;
}

static void _store_item_copy_data(int comm, item *old_it, item *new_it, item *add_it) {
static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add_it) {
if (comm == NREAD_APPEND) {
if (new_it->it_flags & ITEM_CHUNKED) {
_store_item_copy_chunks(new_it, old_it, old_it->nbytes - 2);
_store_item_copy_chunks(new_it, add_it, add_it->nbytes);
if (_store_item_copy_chunks(new_it, old_it, old_it->nbytes - 2) == -1 ||
_store_item_copy_chunks(new_it, add_it, add_it->nbytes) == -1) {
return -1;
}
} else {
memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(add_it), add_it->nbytes);
}
} else {
/* NREAD_PREPEND */
if (new_it->it_flags & ITEM_CHUNKED) {
_store_item_copy_chunks(new_it, add_it, add_it->nbytes - 2);
_store_item_copy_chunks(new_it, old_it, old_it->nbytes);
if (_store_item_copy_chunks(new_it, add_it, add_it->nbytes - 2) == -1 ||
_store_item_copy_chunks(new_it, old_it, old_it->nbytes) == -1) {
return -1;
}
} else {
memcpy(ITEM_data(new_it), ITEM_data(add_it), add_it->nbytes);
memcpy(ITEM_data(new_it) + add_it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
}
}
return 0;
}

/*
Expand Down Expand Up @@ -2690,13 +2704,14 @@ enum store_item_type do_store_item(item *it, int comm, conn *c, const uint32_t h

new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);

if (new_it == NULL) {
/* copy data from it and old_it to new_it */
if (new_it == NULL || _store_item_copy_data(comm, old_it, new_it, it) == -1) {
failed_alloc = 1;
stored = NOT_STORED;
// failed data copy, free up.
if (new_it != NULL)
item_remove(new_it);
} else {
/* copy data from it and old_it to new_it */
_store_item_copy_data(comm, old_it, new_it, it);

it = new_it;
}
}
Expand Down Expand Up @@ -4608,6 +4623,26 @@ static int read_into_chunked_item(conn *c) {

while (c->rlbytes > 0) {
item_chunk *ch = (item_chunk *)c->ritem;
assert(ch->used <= ch->size);
if (ch->size == ch->used) {
// FIXME: ch->next is currently always 0. remove this?
if (ch->next) {
c->ritem = (char *) ch->next;
} else {
/* Allocate next chunk. Binary protocol needs 2b for \r\n */
c->ritem = (char *) do_item_alloc_chunk(ch, c->rlbytes +
((c->protocol == binary_prot) ? 2 : 0));
if (!c->ritem) {
// We failed an allocation. Let caller handle cleanup.
total = -2;
break;
}
// ritem has new chunk, restart the loop.
continue;
//assert(c->rlbytes == 0);
}
}

int unused = ch->size - ch->used;
/* first check if we have leftovers in the conn_read buffer */
if (c->rbytes > 0) {
Expand Down Expand Up @@ -4642,15 +4677,19 @@ static int read_into_chunked_item(conn *c) {
break;
}
}
}

assert(ch->used <= ch->size);
if (ch->size == ch->used) {
if (ch->next) {
c->ritem = (char *) ch->next;
} else {
/* No space left. */
assert(c->rlbytes == 0);
break;
/* At some point I will be able to ditch the \r\n from item storage and
remove all of these kludges.
The above binprot check ensures inline space for \r\n, but if we do
exactly enough allocs there will be no additional chunk for \r\n.
*/
if (c->rlbytes == 0 && c->protocol == binary_prot && total >= 0) {
item_chunk *ch = (item_chunk *)c->ritem;
if (ch->size - ch->used < 2) {
c->ritem = (char *) do_item_alloc_chunk(ch, 2);
if (!c->ritem) {
total = -2;
}
}
}
Expand Down Expand Up @@ -4864,6 +4903,14 @@ static void drive_machine(conn *c) {
stop = true;
break;
}

/* Memory allocation failure */
if (res == -2) {
out_of_memory(c, "SERVER_ERROR Out of memory during read");
c->sbytes = c->rlbytes;
c->write_and_go = conn_swallow;
break;
}
/* otherwise we have a real error, on which we close the connection */
if (settings.verbose > 0) {
fprintf(stderr, "Failed to read, and not due to blocking:\n"
Expand Down
2 changes: 1 addition & 1 deletion memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ typedef struct _strchunk {
int used; /* chunk space used */
int nbytes; /* used. */
unsigned short refcount; /* used? */
uint8_t nsuffix; /* unused */
uint8_t orig_clsid; /* For obj hdr chunks slabs_clsid is fake. */
uint8_t it_flags; /* ITEM_* above. */
uint8_t slabs_clsid; /* Same as above. */
char data[];
Expand Down
Loading

0 comments on commit ae84d77

Please sign in to comment.