Skip to content

Commit

Permalink
gossipd: don't keep channel_updates in memory.
Browse files Browse the repository at this point in the history
This requires some trickiness when we want to re-add unannounced channels
to the store after compaction, so we extract a common "copy_message" to
transfer from old store to new.

MCP results from 5 runs, min-max(mean +/- stddev):
	store_load_msec:36034-37853(37109.8+/-5.9e+02)
	vsz_kb:577456
	store_rewrite_sec:12.490000-13.250000(12.862+/-0.27)
	listnodes_sec:1.250000-1.480000(1.364+/-0.09)
	listchannels_sec:30.820000-31.480000(31.068+/-0.24)
	routing_sec:26.940000-27.990000(27.616+/-0.39)
	peer_write_all_sec:65.690000-68.600000(66.698+/-0.99)

MCP notable changes from previous patch (>1 stddev):
	-vsz_kb:1202316
	+vsz_kb:577456

Signed-off-by: Rusty Russell <[email protected]>
  • Loading branch information
rustyrussell authored and niftynei committed Apr 12, 2019
1 parent 0370ed2 commit fdb42c3
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 53 deletions.
66 changes: 57 additions & 9 deletions gossipd/gossip_store.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,56 @@ static bool gossip_store_append(int fd,
write(fd, msg, msglen) == msglen);
}

/* Copy a whole message from one gossip_store to another. Returns
* total msg length including header, or 0 on error. */
static size_t copy_message(int in_fd, int out_fd, unsigned offset)
{
beint32_t belen, becsum;
u32 msglen;
u8 *msg;

/* FIXME: optimize both read and allocation */
if (lseek(in_fd, offset, SEEK_SET) < 0
|| read(in_fd, &belen, sizeof(belen)) != sizeof(belen)
|| read(in_fd, &becsum, sizeof(becsum)) != sizeof(becsum)) {
status_broken("Failed reading header from to gossip store @%u"
": %s",
offset, strerror(errno));
return 0;
}

msglen = be32_to_cpu(belen);
msg = tal_arr(NULL, u8, sizeof(belen) + sizeof(becsum) + msglen);
memcpy(msg, &belen, sizeof(belen));
memcpy(msg + sizeof(belen), &becsum, sizeof(becsum));
if (read(in_fd, msg + sizeof(belen) + sizeof(becsum), msglen)
!= msglen) {
status_broken("Failed reading %u from to gossip store @%u"
": %s",
msglen, offset, strerror(errno));
tal_free(msg);
return 0;
}

if (write(out_fd, msg, msglen + sizeof(belen) + sizeof(becsum))
!= msglen + sizeof(belen) + sizeof(becsum)) {
status_broken("Failed writing to gossip store: %s",
strerror(errno));
tal_free(msg);
return 0;
}

tal_free(msg);
return msglen + sizeof(belen) + sizeof(becsum);
}

/* Local unannounced channels don't appear in broadcast map, but we need to
* remember them anyway, so we manually append to the store.
*
* Note these do *not* add to gs->count, since that's compared with
* the broadcast map count.
*/
static bool add_local_unnannounced(int fd,
static bool add_local_unnannounced(int in_fd, int out_fd,
struct routing_state *rstate,
struct node *self,
u64 *len)
Expand All @@ -182,18 +225,23 @@ static bool add_local_unnannounced(int fd,

msg = towire_gossipd_local_add_channel(tmpctx, &c->scid,
&peer->id, c->sat);
if (!gossip_store_append(fd, rstate, msg, len))
if (!gossip_store_append(out_fd, rstate, msg, len))
return false;

for (size_t i = 0; i < 2; i++) {
u32 idx;
msg = c->half[i].channel_update;
if (!msg)
size_t len_with_header;

if (!is_halfchan_defined(&c->half[i]))
continue;
idx = *len;
if (!gossip_store_append(fd, rstate, msg, len))

len_with_header = copy_message(in_fd, out_fd,
c->half[i].bcast.index);
if (!len_with_header)
return false;
c->half[i].bcast.index = idx;

c->half[i].bcast.index = *len;

*len += len_with_header;
}
}

Expand Down Expand Up @@ -283,7 +331,7 @@ bool gossip_store_compact(struct gossip_store *gs,

/* Local unannounced channels are not in the store! */
self = get_node(gs->rstate, &gs->rstate->local_id);
if (self && !add_local_unnannounced(fd, gs->rstate, self,
if (self && !add_local_unnannounced(gs->fd, fd, gs->rstate, self,
&len)) {
status_broken("Failed writing unannounced to gossip store: %s",
strerror(errno));
Expand Down
18 changes: 11 additions & 7 deletions gossipd/gossipd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1113,10 +1113,10 @@ static void maybe_create_next_scid_reply(struct peer *peer)
continue;

queue_peer_from_store(peer, &chan->bcast);
if (chan->half[0].channel_update)
queue_peer_msg(peer, chan->half[0].channel_update);
if (chan->half[1].channel_update)
queue_peer_msg(peer, chan->half[1].channel_update);
if (is_halfchan_defined(&chan->half[0]))
queue_peer_from_store(peer, &chan->half[0].bcast);
if (is_halfchan_defined(&chan->half[1]))
queue_peer_from_store(peer, &chan->half[1].bcast);

/* Record node ids for later transmission of node_announcement */
tal_arr_expand(&peer->scid_query_nodes, chan->nodes[0]->id);
Expand Down Expand Up @@ -1374,7 +1374,7 @@ static void maybe_update_local_channel(struct daemon *daemon,
bool local_disabled;

/* Don't generate a channel_update for an uninitialized channel. */
if (!hc->channel_update)
if (!is_halfchan_defined(hc))
return;

/* Nothing to update? */
Expand Down Expand Up @@ -1455,9 +1455,13 @@ static bool handle_get_update(struct peer *peer, const u8 *msg)
/* Since we're going to send it out, make sure it's up-to-date. */
maybe_update_local_channel(peer->daemon, chan, direction);

/* It's possible this is NULL, if we've never sent a channel_update
/* It's possible this is zero, if we've never sent a channel_update
* for that channel. */
update = chan->half[direction].channel_update;
if (!is_halfchan_defined(&chan->half[direction]))
update = NULL;
else
update = gossip_store_get(tmpctx, rstate->broadcasts->gs,
chan->half[direction].bcast.index);
out:
status_trace("peer %s schanid %s: %s update",
type_to_string(tmpctx, struct node_id, &peer->id),
Expand Down
44 changes: 17 additions & 27 deletions gossipd/routing.c
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,6 @@ static void init_half_chan(struct routing_state *rstate,
{
struct half_chan *c = &chan->half[channel_idx];

c->channel_update = NULL;

/* Set the channel direction */
c->channel_flags = channel_idx;
// TODO: wireup message_flags
Expand Down Expand Up @@ -971,11 +969,17 @@ bool routing_add_channel_announcement(struct routing_state *rstate,
* add fresh ones. But if we're loading off disk right now, we can't
* do that. */
if (chan && index == 0) {
/* Steal any private updates */
private_updates[0]
= tal_steal(NULL, chan->half[0].channel_update);
private_updates[1]
= tal_steal(NULL, chan->half[1].channel_update);
/* Reload any private updates */
if (chan->half[0].bcast.index)
private_updates[0]
= gossip_store_get(NULL,
rstate->broadcasts->gs,
chan->half[0].bcast.index);
if (chan->half[1].bcast.index)
private_updates[1]
= gossip_store_get(NULL,
rstate->broadcasts->gs,
chan->half[1].bcast.index);
}

/* Pretend it didn't exist, for the moment. */
Expand Down Expand Up @@ -1330,6 +1334,10 @@ bool routing_add_channel_update(struct routing_state *rstate,
if (taken(update))
tal_steal(tmpctx, update);

/* In case it's free in a failure path */
if (taken(update))
tal_steal(tmpctx, update);

if (!fromwire_channel_update(update, &signature, &chain_hash,
&short_channel_id, &timestamp,
&message_flags, &channel_flags,
Expand Down Expand Up @@ -1388,18 +1396,6 @@ bool routing_add_channel_update(struct routing_state *rstate,
/* Discard older updates */
hc = &chan->half[direction];
if (is_halfchan_defined(hc) && timestamp <= hc->bcast.timestamp) {
/* They're not supposed to do this! */
if (timestamp == hc->bcast.timestamp
&& !memeq(hc->channel_update, tal_count(hc->channel_update),
update, tal_count(update))) {
status_debug("Bad gossip repeated timestamp for %s(%u): %s then %s",
type_to_string(tmpctx,
struct short_channel_id,
&short_channel_id),
channel_flags,
tal_hex(tmpctx, hc->channel_update),
tal_hex(tmpctx, update));
}
SUPERVERBOSE("Ignoring outdated update.");
/* Ignoring != failing */
return true;
Expand All @@ -1416,15 +1412,9 @@ bool routing_add_channel_update(struct routing_state *rstate,
message_flags, channel_flags,
timestamp, htlc_minimum, htlc_maximum);

/* Replace any old one. */
tal_free(chan->half[direction].channel_update);
/* Safe even if was never added */
broadcast_del(rstate->broadcasts, &chan->half[direction].bcast);

chan->half[direction].channel_update
= tal_dup_arr(chan, u8, update, tal_count(update), 0);


/* BOLT #7:
* - MUST consider the `timestamp` of the `channel_announcement` to be
* the `timestamp` of a corresponding `channel_update`.
Expand All @@ -1444,7 +1434,7 @@ bool routing_add_channel_update(struct routing_state *rstate,
assert(is_local_channel(rstate, chan));
if (!index) {
hc->bcast.index = gossip_store_add(rstate->broadcasts->gs,
hc->channel_update);
update);
} else
hc->bcast.index = index;
return true;
Expand All @@ -1454,7 +1444,7 @@ bool routing_add_channel_update(struct routing_state *rstate,
chan->half[direction].bcast.index = index;

insert_broadcast(&rstate->broadcasts,
chan->half[direction].channel_update,
update,
&chan->half[direction].bcast);

if (uc) {
Expand Down
5 changes: 1 addition & 4 deletions gossipd/routing.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
#include <wire/wire.h>

struct half_chan {
/* Cached `channel_update` which initialized below (or NULL) */
const u8 *channel_update;

/* millisatoshi. */
u32 base_fee;
/* millionths */
Expand Down Expand Up @@ -66,7 +63,7 @@ static inline bool is_chan_public(const struct chan *chan)

static inline bool is_halfchan_defined(const struct half_chan *hc)
{
return hc->channel_update != NULL;
return hc->bcast.index != 0;
}

static inline bool is_halfchan_enabled(const struct half_chan *hc)
Expand Down
4 changes: 2 additions & 2 deletions gossipd/test/run-bench-find_route.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ static void add_connection(struct routing_state *rstate,
c->proportional_fee = proportional_fee;
c->delay = delay;
c->channel_flags = node_id_idx(&nodes[from], &nodes[to]);
/* This must be non-NULL, otherwise we consider it disabled! */
c->channel_update = tal(chan, u8);
/* This must be non-zero, otherwise we consider it disabled! */
c->bcast.index = 1;
c->htlc_maximum = AMOUNT_MSAT(-1ULL);
c->htlc_minimum = AMOUNT_MSAT(0);
}
Expand Down
4 changes: 2 additions & 2 deletions gossipd/test/run-find_route-specific.c
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ get_or_make_connection(struct routing_state *rstate,
if (!chan)
chan = new_chan(rstate, &scid, from_id, to_id, satoshis);

/* Make sure it's seen as initialized (update non-NULL). */
chan->half[idx].channel_update = (void *)chan;
/* Make sure it's seen as initialized (index non-zero). */
chan->half[idx].bcast.index = 1;
chan->half[idx].htlc_minimum = AMOUNT_MSAT(0);
if (!amount_sat_to_msat(&chan->half[idx].htlc_maximum, satoshis))
abort();
Expand Down
4 changes: 2 additions & 2 deletions gossipd/test/run-find_route.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ static void add_connection(struct routing_state *rstate,
chan = new_chan(rstate, &scid, from, to, satoshis);

c = &chan->half[node_id_idx(from, to)];
/* Make sure it's seen as initialized (update non-NULL). */
c->channel_update = (void *)c;
/* Make sure it's seen as initialized (index non-zero). */
c->bcast.index = 1;
c->base_fee = base_fee;
c->proportional_fee = proportional_fee;
c->delay = delay;
Expand Down

0 comments on commit fdb42c3

Please sign in to comment.