Skip to content

Commit

Permalink
connectd: add counters to each peer connection.
Browse files Browse the repository at this point in the history
This allows us to detect when lightningd hasn't seen our latest
disconnect/reconnect; in particular, we would hit the following pattern:

1. lightningd says to connect a subd.
2. connectd disconnects and reconnects.
3. connectd reads message, connects subd.
4. lightningd reads disconnect and reconnect, sends msg to connect to subd again.
5. connectd asserts because subd is alreacy connected.

This way connectd can tell if lightningd is talking about the previous
connection, and ignoere it.

Signed-off-by: Rusty Russell <[email protected]>
  • Loading branch information
rustyrussell authored and niftynei committed Jul 19, 2022
1 parent 41b379e commit d314202
Show file tree
Hide file tree
Showing 12 changed files with 82 additions and 30 deletions.
23 changes: 17 additions & 6 deletions connectd/connectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ void destroy_peer(struct peer *peer)
/* Tell lightningd it's really disconnected */
daemon_conn_send(peer->daemon->master,
take(towire_connectd_peer_disconnect_done(NULL,
&peer->id)));
&peer->id,
peer->counter)));
}

/*~ This is where we create a new peer. */
Expand All @@ -238,6 +239,7 @@ static struct peer *new_peer(struct daemon *daemon,

peer->daemon = daemon;
peer->id = *id;
peer->counter = daemon->connection_counter++;
peer->cs = *cs;
peer->subds = tal_arr(peer, struct subd *, 0);
peer->peer_in = NULL;
Expand Down Expand Up @@ -347,7 +349,8 @@ struct io_plan *peer_connected(struct io_conn *conn,
setup_peer_gossip_store(peer, daemon->our_features, their_features);

/* Create message to tell master peer has connected. */
msg = towire_connectd_peer_connected(NULL, id, addr, remote_addr,
msg = towire_connectd_peer_connected(NULL, id, peer->counter,
addr, remote_addr,
incoming, their_features);

/*~ daemon_conn is a message queue for inter-daemon communication: we
Expand Down Expand Up @@ -1841,16 +1844,20 @@ static void connect_to_peer(struct daemon *daemon, const u8 *msg)
static void peer_discard(struct daemon *daemon, const u8 *msg)
{
struct node_id id;
u64 counter;
struct peer *peer;

if (!fromwire_connectd_discard_peer(msg, &id))
if (!fromwire_connectd_discard_peer(msg, &id, &counter))
master_badmsg(WIRE_CONNECTD_DISCARD_PEER, msg);

/* We should stay in sync with lightningd, but this can happen
* under stress. */
peer = peer_htable_get(&daemon->peers, &id);
if (!peer)
return;
/* If it's reconnected already, it will learn soon. */
if (peer->counter != counter)
return;
status_peer_debug(&id, "disconnect");
tal_free(peer);
}
Expand All @@ -1861,14 +1868,17 @@ static void peer_final_msg(struct io_conn *conn,
{
struct peer *peer;
struct node_id id;
u64 counter;
u8 *finalmsg;

if (!fromwire_connectd_peer_final_msg(tmpctx, msg, &id, &finalmsg))
if (!fromwire_connectd_peer_final_msg(tmpctx, msg, &id, &counter,
&finalmsg))
master_badmsg(WIRE_CONNECTD_PEER_FINAL_MSG, msg);

/* This can happen if peer hung up on us. */
/* This can happen if peer hung up on us (or wrong counter
* if it reconnected). */
peer = peer_htable_get(&daemon->peers, &id);
if (peer) {
if (peer && peer->counter == counter) {
/* Log message for peer. */
status_peer_io(LOG_IO_OUT, &id, finalmsg);
multiplex_final_msg(peer, take(finalmsg));
Expand Down Expand Up @@ -2039,6 +2049,7 @@ int main(int argc, char *argv[])

/* Allocate and set up our simple top-level structure. */
daemon = tal(NULL, struct daemon);
daemon->connection_counter = 1;
peer_htable_init(&daemon->peers);
memleak_add_helper(daemon, memleak_daemon_cb);
list_head_init(&daemon->connecting);
Expand Down
6 changes: 6 additions & 0 deletions connectd/connectd.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ struct peer {
/* Connection to the peer */
struct io_conn *to_peer;

/* Counter to distinguish this connection from the next re-connection */
u64 counter;

/* Is this draining? If so, just keep writing until queue empty */
bool draining;

Expand Down Expand Up @@ -130,6 +133,9 @@ struct daemon {
/* pubkey equivalent. */
struct pubkey mykey;

/* Counter from which we derive connection identifiers. */
u64 connection_counter;

/* Base for timeout timers, and how long to wait for init msg */
struct timers timers;
u32 timeout_secs;
Expand Down
6 changes: 6 additions & 0 deletions connectd/connectd_wire.csv
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ msgdata,connectd_connect_failed,addrhint,?wireaddr_internal,
# Connectd -> master: we got a peer.
msgtype,connectd_peer_connected,2002
msgdata,connectd_peer_connected,id,node_id,
msgdata,connectd_peer_connected,counter,u64,
msgdata,connectd_peer_connected,addr,wireaddr_internal,
msgdata,connectd_peer_connected,remote_addr,?wireaddr,
msgdata,connectd_peer_connected,incoming,bool,
Expand All @@ -72,25 +73,30 @@ msgdata,connectd_peer_connected,features,u8,flen
# connectd -> master: peer disconnected.
msgtype,connectd_peer_disconnect_done,2006
msgdata,connectd_peer_disconnect_done,id,node_id,
msgdata,connectd_peer_disconnect_done,counter,u64,

# Master -> connectd: make peer active immediately (we want to talk) (+ fd to subd).
msgtype,connectd_peer_connect_subd,2004
msgdata,connectd_peer_connect_subd,id,node_id,
msgdata,connectd_peer_connect_subd,counter,u64,
msgdata,connectd_peer_connect_subd,channel_id,channel_id,

# Connectd -> master: peer said something interesting
msgtype,connectd_peer_spoke,2005
msgdata,connectd_peer_spoke,id,node_id,
msgdata,connectd_peer_spoke,counter,u64,
msgdata,connectd_peer_spoke,msgtype,u16,
msgdata,connectd_peer_spoke,channel_id,channel_id,

# master -> connectd: peer no longer wanted, you can disconnect.
msgtype,connectd_discard_peer,2015
msgdata,connectd_discard_peer,id,node_id,
msgdata,connectd_discard_peer,counter,u64,

# master -> connectd: give message to peer and disconnect.
msgtype,connectd_peer_final_msg,2003
msgdata,connectd_peer_final_msg,id,node_id,
msgdata,connectd_peer_final_msg,counter,u64,
msgdata,connectd_peer_final_msg,len,u16,
msgdata,connectd_peer_final_msg,msg,u8,len

Expand Down
8 changes: 5 additions & 3 deletions connectd/multiplex.c
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ static struct io_plan *read_body_from_peer_done(struct io_conn *peer_conn,
/* We tell lightningd to fire up a subdaemon to handle this! */
daemon_conn_send(peer->daemon->master,
take(towire_connectd_peer_spoke(NULL, &peer->id,
peer->counter,
t,
&channel_id)));
}
Expand Down Expand Up @@ -1173,16 +1174,17 @@ struct io_plan *multiplex_peer_setup(struct io_conn *peer_conn,
void peer_connect_subd(struct daemon *daemon, const u8 *msg, int fd)
{
struct node_id id;
u64 counter;
struct peer *peer;
struct channel_id channel_id;
struct subd *subd;

if (!fromwire_connectd_peer_connect_subd(msg, &id, &channel_id))
if (!fromwire_connectd_peer_connect_subd(msg, &id, &counter, &channel_id))
master_badmsg(WIRE_CONNECTD_PEER_CONNECT_SUBD, msg);

/* Races can happen: this might be gone by now. */
/* Races can happen: this might be gone by now (or reconnected!). */
peer = peer_htable_get(&daemon->peers, &id);
if (!peer) {
if (!peer || peer->counter != counter) {
close(fd);
return;
}
Expand Down
1 change: 1 addition & 0 deletions lightningd/channel_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ static void peer_got_shutdown(struct channel *channel, const u8 *msg)
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL,
&channel->peer->id,
channel->peer->connectd_counter,
warning)));
channel_fail_reconnect(channel, "Bad shutdown scriptpubkey %s",
tal_hex(tmpctx, scriptpubkey));
Expand Down
3 changes: 2 additions & 1 deletion lightningd/connect_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,8 @@ void maybe_disconnect_peer(struct lightningd *ld, struct peer *peer)
* tell it to discard again: it might have reconnected! */
if (peer->connected == PEER_CONNECTED)
subd_send_msg(ld->connectd,
take(towire_connectd_discard_peer(NULL, &peer->id)));
take(towire_connectd_discard_peer(NULL, &peer->id,
peer->connectd_counter)));
}

static struct command_result *json_sendcustommsg(struct command *cmd,
Expand Down
3 changes: 3 additions & 0 deletions lightningd/dual_open_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,7 @@ static void handle_peer_wants_to_close(struct subd *dualopend,
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL,
&channel->peer->id,
channel->peer->connectd_counter,
warning)));
channel_fail_reconnect(channel, "Bad shutdown scriptpubkey %s",
tal_hex(tmpctx, scriptpubkey));
Expand Down Expand Up @@ -2764,6 +2765,7 @@ static struct command_result *json_openchannel_init(struct command *cmd,
subd_send_msg(peer->ld->connectd,
take(towire_connectd_peer_connect_subd(NULL,
&peer->id,
peer->connectd_counter,
&channel->cid)));
subd_send_fd(peer->ld->connectd, fds[1]);
return command_still_pending(cmd);
Expand Down Expand Up @@ -3250,6 +3252,7 @@ static struct command_result *json_queryrates(struct command *cmd,
subd_send_msg(peer->ld->connectd,
take(towire_connectd_peer_connect_subd(NULL,
&peer->id,
peer->connectd_counter,
&channel->cid)));
subd_send_fd(peer->ld->connectd, fds[1]);
return command_still_pending(cmd);
Expand Down
1 change: 1 addition & 0 deletions lightningd/opening_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ static struct command_result *json_fundchannel_start(struct command *cmd,
subd_send_msg(peer->ld->connectd,
take(towire_connectd_peer_connect_subd(NULL,
&peer->id,
peer->connectd_counter,
&peer->uncommitted_channel->cid)));
subd_send_fd(peer->ld->connectd, fds[1]);
return command_still_pending(cmd);
Expand Down
38 changes: 28 additions & 10 deletions lightningd/peer_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,7 @@ static void connect_activate_subd(struct lightningd *ld, struct channel *channel
subd_send_msg(ld->connectd,
take(towire_connectd_peer_connect_subd(NULL,
&channel->peer->id,
channel->peer->connectd_counter,
&channel->cid)));
subd_send_fd(ld->connectd, fds[1]);
return;
Expand All @@ -1089,6 +1090,7 @@ static void connect_activate_subd(struct lightningd *ld, struct channel *channel
/* Get connectd to send error and close. */
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &channel->peer->id,
channel->peer->connectd_counter,
error)));
}

Expand Down Expand Up @@ -1134,6 +1136,7 @@ static void peer_connected_hook_final(struct peer_connected_hook_payload *payloa
"dev_disconnect permfail");
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &peer->id,
peer->connectd_counter,
channel->error)));
}
return;
Expand All @@ -1157,6 +1160,7 @@ static void peer_connected_hook_final(struct peer_connected_hook_payload *payloa
/* Get connectd to send error and close. */
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &peer->id,
peer->connectd_counter,
error)));
}

Expand Down Expand Up @@ -1279,12 +1283,14 @@ void peer_connected(struct lightningd *ld, const u8 *msg)
u8 *their_features;
struct peer *peer;
struct peer_connected_hook_payload *hook_payload;
u64 connectd_counter;

hook_payload = tal(NULL, struct peer_connected_hook_payload);
hook_payload->ld = ld;
hook_payload->error = NULL;
if (!fromwire_connectd_peer_connected(hook_payload, msg,
&id, &hook_payload->addr,
&id, &connectd_counter,
&hook_payload->addr,
&hook_payload->remote_addr,
&hook_payload->incoming,
&their_features))
Expand All @@ -1298,6 +1304,14 @@ void peer_connected(struct lightningd *ld, const u8 *msg)
peer = new_peer(ld, 0, &id, &hook_payload->addr,
hook_payload->incoming);

/* We track this, because messages can race between connectd and us.
* For example, we could tell it to attach a subd, but it's actually
* already reconnected: we would tell it again when we read the
* "peer_connected" message, and it would get upset (plus, our first
* subd wouldn't die as expected. So we echo this back to connectd
* on peer commands, and it knows to ignore if it wrong. */
peer->connectd_counter = connectd_counter;

/* We mark peer in "connecting" state until hooks have passed. */
assert(peer->connected == PEER_DISCONNECTED);
peer->connected = PEER_CONNECTING;
Expand Down Expand Up @@ -1336,24 +1350,21 @@ void peer_spoke(struct lightningd *ld, const u8 *msg)
{
struct node_id id;
u16 msgtype;
u64 connectd_counter;
struct channel *channel;
struct channel_id channel_id;
struct peer *peer;
bool dual_fund;
u8 *error;
int fds[2];

if (!fromwire_connectd_peer_spoke(msg, &id, &msgtype, &channel_id))
if (!fromwire_connectd_peer_spoke(msg, &id, &connectd_counter, &msgtype, &channel_id))
fatal("Connectd gave bad CONNECTD_PEER_SPOKE message %s",
tal_hex(msg, msg));

/* We must know it, and it must be the right connectd_id */
peer = peer_by_id(ld, &id);
if (!peer) {
/* This race is possible, but I want to see it in CI. */
log_broken(ld->log, "Unknown active peer %s",
type_to_string(tmpctx, struct node_id, &id));
return;
}
assert(peer->connectd_counter == connectd_counter);

/* Do we know what channel they're talking about? */
channel = find_channel_by_id(peer, &channel_id);
Expand Down Expand Up @@ -1470,12 +1481,15 @@ void peer_spoke(struct lightningd *ld, const u8 *msg)
/* Get connectd to send error and close. */
subd_send_msg(ld->connectd,
take(towire_connectd_peer_final_msg(NULL, &peer->id,
peer->connectd_counter,
error)));
return;

tell_connectd:
subd_send_msg(ld->connectd,
take(towire_connectd_peer_connect_subd(NULL, &id, &channel_id)));
take(towire_connectd_peer_connect_subd(NULL, &id,
peer->connectd_counter,
&channel_id)));
subd_send_fd(ld->connectd, fds[1]);
}

Expand All @@ -1495,16 +1509,20 @@ static void destroy_disconnect_command(struct disconnect_command *dc)
void peer_disconnect_done(struct lightningd *ld, const u8 *msg)
{
struct node_id id;
u64 connectd_counter;
struct disconnect_command *i, *next;
struct peer *p;

if (!fromwire_connectd_peer_disconnect_done(msg, &id))
if (!fromwire_connectd_peer_disconnect_done(msg, &id, &connectd_counter))
fatal("Connectd gave bad PEER_DISCONNECT_DONE message %s",
tal_hex(msg, msg));

/* If we still have peer, it's disconnected now */
/* FIXME: We should keep peers until it tells us they're disconnected,
* and not free when no more channels. */
p = peer_by_id(ld, &id);
if (p) {
assert(p->connectd_counter == connectd_counter);
log_peer_debug(ld->log, &id, "peer_disconnect_done");
p->connected = PEER_DISCONNECTED;

Expand Down
3 changes: 3 additions & 0 deletions lightningd/peer_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ struct peer {
/* ID of peer */
struct node_id id;

/* Connection counter from connectd. */
u64 connectd_counter;

/* Our channels */
struct list_head channels;

Expand Down
10 changes: 5 additions & 5 deletions lightningd/test/run-invoice-select-inchan.c
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,13 @@ bool fromwire_channel_id(const u8 **cursor UNNEEDED, size_t *max UNNEEDED,
bool fromwire_channeld_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED)
{ fprintf(stderr, "fromwire_channeld_dev_memleak_reply called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_connected */
bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED)
bool fromwire_connectd_peer_connected(const tal_t *ctx UNNEEDED, const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED, struct wireaddr_internal *addr UNNEEDED, struct wireaddr **remote_addr UNNEEDED, bool *incoming UNNEEDED, u8 **features UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_connected called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_disconnect_done */
bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED)
bool fromwire_connectd_peer_disconnect_done(const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_disconnect_done called!\n"); abort(); }
/* Generated stub for fromwire_connectd_peer_spoke */
bool fromwire_connectd_peer_spoke(const void *p UNNEEDED, struct node_id *id UNNEEDED, u16 *msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED)
bool fromwire_connectd_peer_spoke(const void *p UNNEEDED, struct node_id *id UNNEEDED, u64 *counter UNNEEDED, u16 *msgtype UNNEEDED, struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "fromwire_connectd_peer_spoke called!\n"); abort(); }
/* Generated stub for fromwire_dualopend_dev_memleak_reply */
bool fromwire_dualopend_dev_memleak_reply(const void *p UNNEEDED, bool *leak UNNEEDED)
Expand Down Expand Up @@ -715,10 +715,10 @@ u8 *towire_channeld_dev_memleak(const tal_t *ctx UNNEEDED)
u8 *towire_channeld_dev_reenable_commit(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_channeld_dev_reenable_commit called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_connect_subd */
u8 *towire_connectd_peer_connect_subd(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const struct channel_id *channel_id UNNEEDED)
u8 *towire_connectd_peer_connect_subd(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED, const struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_connect_subd called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_final_msg */
u8 *towire_connectd_peer_final_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, const u8 *msg UNNEEDED)
u8 *towire_connectd_peer_final_msg(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED, const u8 *msg UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_final_msg called!\n"); abort(); }
/* Generated stub for towire_dualopend_dev_memleak */
u8 *towire_dualopend_dev_memleak(const tal_t *ctx UNNEEDED)
Expand Down
Loading

0 comments on commit d314202

Please sign in to comment.