Skip to content

Commit

Permalink
gossipd/gossip_store: fix compacting, don't use broadcast ordering.
Browse files Browse the repository at this point in the history
We have a problem: if we get halfway through writing the compacted store
and run out of disk space, we've already changed half the indexes.

This changes it so we do nothing until writing is finished: then we
iterate through and update indexes.  It also weans us off broadcast
ordering, which we can now eliminated.

Signed-off-by: Rusty Russell <[email protected]>
  • Loading branch information
rustyrussell committed Jun 4, 2019
1 parent 5161b79 commit dd83453
Showing 1 changed file with 113 additions and 77 deletions.
190 changes: 113 additions & 77 deletions gossipd/gossip_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,52 +160,54 @@ static size_t transfer_store_msg(int from_fd, size_t from_off, int to_fd,
return sizeof(hdr) + msglen;
}

/* Local unannounced channels don't appear in broadcast map, but we need to
* remember them anyway, so we manually append to the store.
*/
static bool add_local_unnannounced(int in_fd, int out_fd,
struct node *self,
u64 *len, size_t *count)
{
struct chan_map_iter i;
struct chan *c;

for (c = first_chan(self, &i); c; c = next_chan(self, &i)) {
struct node *peer = other_node(self, c);
const u8 *msg;

/* Ignore already announced. */
if (is_chan_public(c))
continue;

msg = towire_gossipd_local_add_channel(tmpctx, &c->scid,
&peer->id, c->sat);
if (!append_msg(out_fd, msg, 0, len))
return false;
(*count)++;
/* We keep a htable map of old gossip_store offsets to new ones. */
struct offset_map {
size_t from, to;
};

for (size_t i = 0; i < 2; i++) {
size_t len_with_header;
int type;
static size_t offset_map_key(const struct offset_map *omap)
{
return omap->from;
}

if (!is_halfchan_defined(&c->half[i]))
continue;
static size_t hash_offset(size_t from)
{
/* Crappy fast hash is "good enough" */
return (from >> 5) ^ from;
}

len_with_header = transfer_store_msg(in_fd,
c->half[i].bcast.index,
out_fd,
&type);
if (!len_with_header)
return false;
static bool offset_map_eq(const struct offset_map *omap, const size_t from)
{
return omap->from == from;
}
HTABLE_DEFINE_TYPE(struct offset_map,
offset_map_key, hash_offset, offset_map_eq, offmap);

static void move_broadcast(struct offmap *offmap,
struct broadcast_state *oldb,
struct broadcast_state *newb,
struct broadcastable *bcast,
const char *what)
{
struct offset_map *omap;

c->half[i].bcast.index = *len;
if (!bcast->index)
return;

*len += len_with_header;
(*count)++;
}
}
omap = offmap_get(offmap, bcast->index);
if (!omap)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Could not relocate %s at offset %u",
what, bcast->index);
broadcast_del(oldb, bcast);
bcast->index = omap->to;
insert_broadcast_nostore(newb, bcast);
offmap_del(offmap, omap);
}

return true;
static void destroy_offmap(struct offmap *offmap)
{
offmap_clear(offmap);
}

/**
Expand All @@ -221,14 +223,16 @@ bool gossip_store_compact(struct gossip_store *gs,
struct broadcast_state **bs,
u32 *offset)
{
size_t count = 0;
size_t count = 0, deleted = 0;
int fd;
struct node *self;
u64 len = sizeof(gs->version);
struct broadcastable *bcast;
u64 off, len = sizeof(gs->version), idx;
struct broadcast_state *oldb = *bs;
struct broadcast_state *newb;
u32 idx = 0;
struct offmap *offmap;
struct gossip_hdr hdr;
struct offmap_iter oit;
struct node_map_iter nit;
struct offset_map *omap;

if (gs->disable_compaction)
return false;
Expand All @@ -238,7 +242,6 @@ bool gossip_store_compact(struct gossip_store *gs,
"Compacting gossip_store with %zu entries, %zu of which are stale",
gs->count, gs->deleted);

newb = new_broadcast_state(gs->rstate, gs, oldb->peers);
fd = open(GOSSIP_STORE_TEMP_FILENAME, O_RDWR|O_APPEND|O_CREAT, 0600);

if (fd < 0) {
Expand All @@ -253,52 +256,85 @@ bool gossip_store_compact(struct gossip_store *gs,
goto unlink_disable;
}

/* Copy entries one at a time. */
while ((bcast = next_broadcast_raw(oldb, &idx)) != NULL) {
u64 old_index = bcast->index;
/* Walk old file, copy everything and remember new offsets. */
offmap = tal(tmpctx, struct offmap);
offmap_init_sized(offmap, gs->count);
tal_add_destructor(offmap, destroy_offmap);

/* Start by writing all channel announcements and updates. */
off = 1;
while (pread(gs->fd, &hdr, sizeof(hdr), off) == sizeof(hdr)) {
u32 msglen, wlen;
int msgtype;
size_t msg_len;

msg_len = transfer_store_msg(gs->fd, bcast->index, fd, &msgtype);
if (msg_len == 0)
goto unlink_disable;
msglen = (be32_to_cpu(hdr.len) & ~GOSSIP_STORE_LEN_DELETED_BIT);
if (be32_to_cpu(hdr.len) & GOSSIP_STORE_LEN_DELETED_BIT) {
off += sizeof(hdr) + msglen;
deleted++;
continue;
}

broadcast_del(oldb, bcast);
bcast->index = len;
insert_broadcast_nostore(newb, bcast);
len += msg_len;
count++;
wlen = transfer_store_msg(gs->fd, off, fd, &msgtype);
if (wlen == 0)
goto unlink_disable;

/* channel_announcement always followed by amount: copy too */
if (msgtype == WIRE_CHANNEL_ANNOUNCEMENT) {
msg_len = transfer_store_msg(gs->fd, old_index + msg_len,
fd, &msgtype);
if (msg_len == 0)
goto unlink_disable;
if (msgtype != WIRE_GOSSIP_STORE_CHANNEL_AMOUNT) {
status_broken("gossip_store: unexpected type %u",
msgtype);
goto unlink_disable;
}
len += msg_len;
count++;
/* We track location of all these message types. */
if (msgtype == WIRE_GOSSIPD_LOCAL_ADD_CHANNEL
|| msgtype == WIRE_GOSSIP_STORE_PRIVATE_UPDATE
|| msgtype == WIRE_CHANNEL_ANNOUNCEMENT
|| msgtype == WIRE_CHANNEL_UPDATE
|| msgtype == WIRE_NODE_ANNOUNCEMENT) {
omap = tal(offmap, struct offset_map);
omap->from = off;
omap->to = len;
offmap_add(offmap, omap);
}
len += wlen;
off += wlen;
}

/* Local unannounced channels are not in the store! */
self = get_node(gs->rstate, &gs->rstate->local_id);
if (self && !add_local_unnannounced(gs->fd, fd, self, &len, &count)) {
status_broken("Failed writing unannounced to gossip store: %s",
strerror(errno));
goto unlink_disable;
/* OK, now we've written file successfully, we can remap broadcast. */
newb = new_broadcast_state(gs->rstate, gs, oldb->peers);

/* Remap node announcements. */
for (struct node *n = node_map_first(gs->rstate->nodes, &nit);
n;
n = node_map_next(gs->rstate->nodes, &nit)) {
move_broadcast(offmap, oldb, newb, &n->bcast, "node_announce");
}

/* Remap channel announcements and updates */
for (struct chan *c = uintmap_first(&gs->rstate->chanmap, &idx);
c;
c = uintmap_after(&gs->rstate->chanmap, &idx)) {
move_broadcast(offmap, oldb, newb, &c->bcast,
"channel_announce");
move_broadcast(offmap, oldb, newb, &c->half[0].bcast,
"channel_update");
move_broadcast(offmap, oldb, newb, &c->half[1].bcast,
"channel_update");
}

/* That should be everything. */
omap = offmap_first(offmap, &oit);
if (omap)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"gossip_store: Entry at %zu->%zu not updated?",
omap->from, omap->to);

if (count != gs->count - gs->deleted) {
status_broken("Expected %zu msgs in new gossip store, got %zu",
gs->count - gs->deleted, count);
goto unlink_disable;
}

if (deleted != gs->deleted) {
status_broken("Expected %zu deleted msgs in old gossip store, got %zu",
gs->deleted, deleted);
goto unlink_disable;
}

if (rename(GOSSIP_STORE_TEMP_FILENAME, GOSSIP_STORE_FILENAME) == -1) {
status_broken(
"Error swapping compacted gossip_store into place: %s",
Expand All @@ -308,7 +344,7 @@ bool gossip_store_compact(struct gossip_store *gs,

status_trace(
"Compaction completed: dropped %zu messages, new count %zu, len %"PRIu64,
gs->deleted, count, len);
deleted, count, len);
gs->count = count;
gs->deleted = 0;
*offset = gs->len - len;
Expand Down

0 comments on commit dd83453

Please sign in to comment.