Skip to content

Commit

Permalink
connectd: use gossmap streaming interface.
Browse files Browse the repository at this point in the history
This is more efficient in a few ways:
1. It's trivial to get to the end of the gossip_store, we don't have
   to iterate.
2. It tends to be mmaped so we don't have to call pread().

Signed-off-by: Rusty Russell <[email protected]>
  • Loading branch information
rustyrussell committed Jul 10, 2024
1 parent ba2bb55 commit d60977f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 67 deletions.
2 changes: 2 additions & 0 deletions connectd/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ CONNECTD_COMMON_OBJS := \
common/dev_disconnect.o \
common/ecdh_hsmd.o \
common/features.o \
common/fp16.o \
common/hmac.o \
common/status_wiregen.o \
common/gossip_store.o \
common/gossmap.o \
common/key_derive.o \
common/memleak.o \
common/msg_queue.o \
Expand Down
48 changes: 46 additions & 2 deletions connectd/connectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <common/dev_disconnect.h>
#include <common/ecdh_hsmd.h>
#include <common/gossip_store.h>
#include <common/gossmap.h>
#include <common/jsonrpc_errors.h>
#include <common/memleak.h>
#include <common/status.h>
Expand Down Expand Up @@ -2011,7 +2012,7 @@ static void dev_report_fds(struct daemon *daemon, const u8 *msg)
status_info("dev_report_fds: %i -> dev_disconnect_fd", fd);
continue;
}
if (fd == daemon->gossip_store_fd) {
if (daemon->gossmap_raw && fd == gossmap_fd(daemon->gossmap_raw)) {
status_info("dev_report_fds: %i -> gossip_store", fd);
continue;
}
Expand Down Expand Up @@ -2048,6 +2049,49 @@ static void dev_report_fds(struct daemon *daemon, const u8 *msg)
}
}

/* It's so common to ask for "recent" gossip (we ask for 10 minutes
* ago, LND and Eclair ask for now, LDK asks for 1 hour ago) that it's
* worth keeping track of where that starts, so we can skip most of
* the store. */
void update_recent_timestamp(struct daemon *daemon, struct gossmap *gossmap)
{
/* 2 hours allows for some clock drift, not too much gossip */
u32 recent = time_now().ts.tv_sec - 7200;

/* Only update every minute */
if (daemon->gossip_recent_time + 60 > recent)
return;

daemon->gossip_recent_time = recent;
gossmap_iter_fast_forward(gossmap,
daemon->gossmap_iter_recent,
recent);
}

/* This is called once we need it: otherwise, the gossip_store may not exist,
* since we start at the same time as gossipd itself. */
static void setup_gossip_store(struct daemon *daemon)
{
daemon->gossmap_raw = gossmap_load(daemon, GOSSIP_STORE_FILENAME, NULL);
if (!daemon->gossmap_raw)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Loading gossip_store %s: %s",
GOSSIP_STORE_FILENAME, strerror(errno));

daemon->gossip_recent_time = 0;
daemon->gossmap_iter_recent = gossmap_iter_new(daemon, daemon->gossmap_raw);
update_recent_timestamp(daemon, daemon->gossmap_raw);
}

struct gossmap *get_gossmap(struct daemon *daemon)
{
if (!daemon->gossmap_raw)
setup_gossip_store(daemon);
else
gossmap_refresh(daemon->gossmap_raw, NULL);
return daemon->gossmap_raw;
}

static void dev_exhaust_fds(struct daemon *daemon, const u8 *msg)
{
int fd;
Expand Down Expand Up @@ -2238,7 +2282,7 @@ int main(int argc, char *argv[])
daemon->connecting = tal(daemon, struct connecting_htable);
connecting_htable_init(daemon->connecting);
timers_init(&daemon->timers, time_mono());
daemon->gossip_store_fd = -1;
daemon->gossmap_raw = NULL;
daemon->shutting_down = false;
daemon->dev_suppress_gossip = false;
daemon->custom_msgs = NULL;
Expand Down
16 changes: 11 additions & 5 deletions connectd/connectd.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct gossip_state {
/* I think this is called "echo cancellation" */
struct gossip_rcvd_filter *grf;
/* Offset within the gossip_store file */
size_t off;
struct gossmap_iter *iter;
/* Bytes sent in the last second. */
size_t bytes_this_second;
/* When that second starts */
Expand Down Expand Up @@ -248,11 +248,11 @@ struct daemon {
/* If non-zero, port to listen for websocket connections. */
u16 websocket_port;

/* The gossip_store */
int gossip_store_fd;
size_t gossip_store_end;
/* The gossip store (access via get_gossmap!) */
struct gossmap *gossmap_raw;
/* Iterator which we keep at "recent" time */
u32 gossip_recent_time;
size_t gossip_store_recent_off;
struct gossmap_iter *gossmap_iter_recent;

/* We only announce websocket addresses if !deprecated_apis */
bool announce_websocket;
Expand Down Expand Up @@ -280,6 +280,12 @@ struct daemon {
/* Called by io_tor_connect once it has a connection out. */
struct io_plan *connection_out(struct io_conn *conn, struct connecting *connect);

/* Get and refresh gossmap */
struct gossmap *get_gossmap(struct daemon *daemon);

/* Catch up with recent changes */
void update_recent_timestamp(struct daemon *daemon, struct gossmap *gossmap);

/* add erros to error list */
void add_errors_to_error_list(struct connecting *connect, const char *error);

Expand Down
89 changes: 29 additions & 60 deletions connectd/multiplex.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <common/dev_disconnect.h>
#include <common/features.h>
#include <common/gossip_constants.h>
#include <common/gossmap.h>
#include <common/memleak.h>
#include <common/per_peer_state.h>
#include <common/ping.h>
Expand Down Expand Up @@ -208,56 +209,14 @@ static struct oneshot *gossip_stream_timer(struct peer *peer)
wake_gossip, peer);
}

/* It's so common to ask for "recent" gossip (we ask for 10 minutes
* ago, LND and Eclair ask for now, LDK asks for 1 hour ago) that it's
* worth keeping track of where that starts, so we can skip most of
* the store. */
static void update_recent_timestamp(struct daemon *daemon)
{
/* 2 hours allows for some clock drift, not too much gossip */
u32 recent = time_now().ts.tv_sec - 7200;

/* Only update every minute */
if (daemon->gossip_recent_time + 60 > recent)
return;

daemon->gossip_recent_time = recent;
daemon->gossip_store_recent_off
= find_gossip_store_by_timestamp(daemon->gossip_store_fd,
daemon->gossip_store_recent_off,
daemon->gossip_recent_time);
}

/* This is called once we need it: otherwise, the gossip_store may not exist,
* since we start at the same time as gossipd itself. */
static void setup_gossip_store(struct daemon *daemon)
{
daemon->gossip_store_fd = open(GOSSIP_STORE_FILENAME, O_RDONLY);
if (daemon->gossip_store_fd < 0)
status_failed(STATUS_FAIL_INTERNAL_ERROR,
"Opening gossip_store %s: %s",
GOSSIP_STORE_FILENAME, strerror(errno));

daemon->gossip_recent_time = 0;
daemon->gossip_store_recent_off = 1;
update_recent_timestamp(daemon);

/* gossipd will be writing to this, and it's not atomic! Safest
* way to find the "end" is to walk through. */
daemon->gossip_store_end
= find_gossip_store_end(daemon->gossip_store_fd,
daemon->gossip_store_recent_off);
}

void setup_peer_gossip_store(struct peer *peer,
const struct feature_set *our_features,
const u8 *their_features)
{
/* Lazy setup */
if (peer->daemon->gossip_store_fd == -1)
setup_gossip_store(peer->daemon);
struct gossmap *gossmap = get_gossmap(peer->daemon);

peer->gs.grf = new_gossip_rcvd_filter(peer);
peer->gs.iter = gossmap_iter_new(peer, gossmap);
peer->gs.bytes_this_second = 0;
peer->gs.bytes_start_time = time_mono();

Expand All @@ -269,7 +228,6 @@ void setup_peer_gossip_store(struct peer *peer,
*/
peer->gs.gossip_timer = NULL;
peer->gs.active = false;
peer->gs.off = 1;
return;
}

Expand Down Expand Up @@ -462,10 +420,12 @@ static void wake_gossip(struct peer *peer)
}

/* If we are streaming gossip, get something from gossip store */
static u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
static const u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
{
u8 *msg;
const u8 *msg;
struct timemono now;
struct gossmap *gossmap;
u32 timestamp;

/* dev-mode can suppress all gossip */
if (peer->daemon->dev_suppress_gossip)
Expand Down Expand Up @@ -499,18 +459,23 @@ static u8 *maybe_from_gossip_store(const tal_t *ctx, struct peer *peer)
return NULL;
}

gossmap = get_gossmap(peer->daemon);

again:
msg = gossip_store_next(ctx, &peer->daemon->gossip_store_fd,
peer->gs.timestamp_min,
peer->gs.timestamp_max,
&peer->gs.off,
&peer->daemon->gossip_store_end);
/* Don't send back gossip they sent to us! */
msg = gossmap_stream_next(ctx, gossmap, peer->gs.iter, &timestamp);
if (msg) {
/* Don't send back gossip they sent to us! */
if (gossip_rcvd_filter_del(peer->gs.grf, msg)) {
msg = tal_free(msg);
goto again;
}
/* Check timestamp (zero for channel_announcement with
* no update yet!): FIXME: we could ignore this! */
if (timestamp
&& (timestamp < peer->gs.timestamp_min || timestamp > peer->gs.timestamp_max)) {
msg = tal_free(msg);
goto again;
}
peer->gs.bytes_this_second += tal_bytelen(msg);
status_peer_io(LOG_IO_OUT, &peer->id, msg);
return msg;
Expand Down Expand Up @@ -646,6 +611,7 @@ static void handle_gossip_timestamp_filter_in(struct peer *peer, const u8 *msg)
{
struct bitcoin_blkid chain_hash;
u32 first_timestamp, timestamp_range;
struct gossmap *gossmap = get_gossmap(peer->daemon);

if (!fromwire_gossip_timestamp_filter(msg, &chain_hash,
&first_timestamp,
Expand Down Expand Up @@ -679,15 +645,18 @@ static void handle_gossip_timestamp_filter_in(struct peer *peer, const u8 *msg)
*/
/* For us, this means we only sweep the gossip store for messages
* if the first_timestamp is 0 */
if (first_timestamp == 0)
peer->gs.off = 1;
else if (first_timestamp == 0xFFFFFFFF)
peer->gs.off = peer->daemon->gossip_store_end;
else {
tal_free(peer->gs.iter);
if (first_timestamp == 0) {
peer->gs.iter = gossmap_iter_new(peer, gossmap);
} else if (first_timestamp == 0xFFFFFFFF) {
peer->gs.iter = gossmap_iter_new(peer, gossmap);
gossmap_iter_end(gossmap, peer->gs.iter);
} else {
/* We are actually a bit nicer than the spec, and we include
* "recent" gossip here. */
update_recent_timestamp(peer->daemon);
peer->gs.off = peer->daemon->gossip_store_recent_off;
update_recent_timestamp(peer->daemon, gossmap);
peer->gs.iter = gossmap_iter_dup(peer,
peer->daemon->gossmap_iter_recent);
}

/* BOLT #7:
Expand Down

0 comments on commit d60977f

Please sign in to comment.