Skip to content

Commit

Permalink
lightningd: don't explicitly tell connectd to disconnect, have it do …
Browse files Browse the repository at this point in the history
…it on sending error/warning.

Connectd already does this when we *receive* an error or warning, but
now do it on send.  This causes some slight behavior change: we don't
disconnect when we close a channel, for example (our behaviour here
has been inconsistent across versions, depending on the code).

When connectd is told to disconnect, it now does so immediately, and
doesn't wait for subds to drain etc.  That simplifies the manual
disconnect case, which now cleans up as it would from any other
disconnection when connectd says it's disconnected.

Signed-off-by: Rusty Russell <[email protected]>
  • Loading branch information
rustyrussell authored and niftynei committed Jul 19, 2022
1 parent 2962b93 commit a3c4908
Show file tree
Hide file tree
Showing 16 changed files with 75 additions and 146 deletions.
2 changes: 1 addition & 1 deletion connectd/connectd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1868,7 +1868,7 @@ static void peer_discard(struct daemon *daemon, const u8 *msg)
if (peer->counter != counter)
return;
status_peer_debug(&id, "discard_peer");
drain_peer(peer);
tal_free(peer);
}

/* lightningd tells us to send a msg and disconnect. */
Expand Down
23 changes: 22 additions & 1 deletion connectd/multiplex.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static void close_subd_timeout(struct subd *subd)
io_close(subd->conn);
}

void drain_peer(struct peer *peer)
static void drain_peer(struct peer *peer)
{
status_debug("drain_peer");
assert(!peer->draining);
Expand Down Expand Up @@ -403,6 +403,12 @@ static bool is_urgent(enum peer_wire type)
return false;
}

/* io_sock_shutdown, but in format suitable for an io_plan callback */
static struct io_plan *io_sock_shutdown_cb(struct io_conn *conn, struct peer *unused)
{
return io_sock_shutdown(conn);
}

static struct io_plan *encrypt_and_send(struct peer *peer,
const u8 *msg TAKES,
struct io_plan *(*next)
Expand Down Expand Up @@ -438,6 +444,21 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
#endif
set_urgent_flag(peer, is_urgent(type));

/* BOLT #1:
*
* A sending node:
*...
* - MAY close the connection after sending.
*/
if (type == WIRE_ERROR || type == WIRE_WARNING) {
/* Might already be draining... */
if (!peer->draining)
drain_peer(peer);

/* Close as soon as we've sent this. */
next = io_sock_shutdown_cb;
}

/* We free this and the encrypted version in next write_to_peer */
peer->sent_to_peer = cryptomsg_encrypt_msg(peer, &peer->cs, msg);
return io_write(peer->to_peer,
Expand Down
4 changes: 0 additions & 4 deletions connectd/multiplex.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ void multiplex_final_msg(struct peer *peer,
* this does io logging. */
void inject_peer_msg(struct peer *peer, const u8 *msg TAKES);

/* Start closing the peer: removes itself from hash table, frees itself
* once done. */
void drain_peer(struct peer *peer);

void setup_peer_gossip_store(struct peer *peer,
const struct feature_set *our_features,
const u8 *their_features);
Expand Down
6 changes: 1 addition & 5 deletions lightningd/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,10 @@
void channel_set_owner(struct channel *channel, struct subd *owner)
{
struct subd *old_owner = channel->owner;
bool was_connected = channel_is_connected(channel);
channel->owner = owner;

if (old_owner) {
if (old_owner)
subd_release_channel(old_owner, channel);
if (was_connected && !channel_is_connected(channel))
maybe_disconnect_peer(channel->peer->ld, channel->peer);
}
}

struct htlc_out *channel_has_htlc_out(struct channel *channel)
Expand Down
27 changes: 0 additions & 27 deletions lightningd/connect_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -641,33 +641,6 @@ void connectd_activate(struct lightningd *ld)
assert(ret == ld->connectd);
}

void maybe_disconnect_peer(struct lightningd *ld, struct peer *peer)
{
struct channel *channel;

/* Any channels left which want to talk? */
if (peer->uncommitted_channel)
return;

list_for_each(&peer->channels, channel, list)
if (channel_is_connected(channel))
return;

/* If shutting down, connectd no longer exists.
* FIXME: Call peer_disconnect_done(), but nobody cares. */
if (!ld->connectd) {
peer->connected = PEER_DISCONNECTED;
return;
}

/* If connectd was the one who told us to cleanup peer, don't
* tell it to discard again: it might have reconnected! */
if (peer->connected == PEER_CONNECTED)
subd_send_msg(ld->connectd,
take(towire_connectd_discard_peer(NULL, &peer->id,
peer->connectd_counter)));
}

static struct command_result *json_sendcustommsg(struct command *cmd,
const char *buffer,
const jsmntok_t *obj UNNEEDED,
Expand Down
3 changes: 0 additions & 3 deletions lightningd/connect_control.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,4 @@ void connect_failed_disconnect(struct lightningd *ld,
const struct node_id *id,
const struct wireaddr_internal *addr);

/* Disconnect a peer (if no subds want to talk any more) */
void maybe_disconnect_peer(struct lightningd *ld, struct peer *peer);

#endif /* LIGHTNING_LIGHTNINGD_CONNECT_CONTROL_H */
1 change: 0 additions & 1 deletion lightningd/opening_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ static void destroy_uncommitted_channel(struct uncommitted_channel *uc)

uc->peer->uncommitted_channel = NULL;

maybe_disconnect_peer(uc->peer->ld, uc->peer);
maybe_delete_peer(uc->peer);
}

Expand Down
69 changes: 10 additions & 59 deletions lightningd/peer_control.c
Original file line number Diff line number Diff line change
Expand Up @@ -1539,11 +1539,6 @@ void peer_disconnect_done(struct lightningd *ld, const u8 *msg)
assert(p->connectd_counter == connectd_counter);
log_peer_debug(ld->log, &id, "peer_disconnect_done");
p->connected = PEER_DISCONNECTED;

/* If there are literally no channels, might as well
* free immediately. */
if (!p->uncommitted_channel && list_empty(&p->channels))
p = tal_free(p);
}

/* If you were trying to connect, it failed. */
Expand All @@ -1561,6 +1556,10 @@ void peer_disconnect_done(struct lightningd *ld, const u8 *msg)
was_pending(command_success(i->cmd,
json_stream_success(i->cmd)));
}

/* If connection was only thing keeping it, this will delete it. */
if (p)
maybe_delete_peer(p);
}

static bool check_funding_details(const struct bitcoin_tx *tx,
Expand Down Expand Up @@ -2084,9 +2083,8 @@ static struct command_result *json_disconnect(struct command *cmd,
struct node_id *id;
struct disconnect_command *dc;
struct peer *peer;
struct channel *channel, **channels;
struct channel *channel;
bool *force;
bool disconnected = false;

if (!param(cmd, buffer, params,
p_req("id", param_node_id, &id),
Expand All @@ -2109,58 +2107,11 @@ static struct command_result *json_disconnect(struct command *cmd,
channel_state_name(channel));
}

/* Careful here! Disconnecting can free peer! */
channels = tal_arr(cmd, struct channel *, 0);
list_for_each(&peer->channels, channel, list) {
if (!channel->owner)
continue;
if (!channel->owner->talks_to_peer)
continue;

switch (channel->state) {
case DUALOPEND_OPEN_INIT:
case CHANNELD_AWAITING_LOCKIN:
case CHANNELD_NORMAL:
case CHANNELD_SHUTTING_DOWN:
case DUALOPEND_AWAITING_LOCKIN:
case CLOSINGD_SIGEXCHANGE:
tal_arr_expand(&channels, channel);
continue;
case CLOSINGD_COMPLETE:
case AWAITING_UNILATERAL:
case FUNDING_SPEND_SEEN:
case ONCHAIN:
case CLOSED:
/* We don't expect these to have owners who connect! */
log_broken(channel->log,
"Don't expect owner %s in state %s",
channel->owner->name,
channel_state_name(channel));
continue;
}
abort();
}

/* This can free peer too! */
if (peer->uncommitted_channel) {
kill_uncommitted_channel(peer->uncommitted_channel,
"disconnect command");
disconnected = true;
}

for (size_t i = 0; i < tal_count(channels); i++) {
if (channel_unsaved(channels[i]))
channel_unsaved_close_conn(channels[i],
"disconnect command");
else
channel_fail_reconnect(channels[i],
"disconnect command");
disconnected = true;
}

/* It's just sitting in connectd? */
if (!disconnected)
maybe_disconnect_peer(cmd->ld, peer);
/* If it's not already disconnecting, tell connectd to disconnect */
if (peer->connected == PEER_CONNECTED)
subd_send_msg(peer->ld->connectd,
take(towire_connectd_discard_peer(NULL, &peer->id,
peer->connectd_counter)));

/* Connectd tells us when it's finally disconnected */
dc = tal(cmd, struct disconnect_command);
Expand Down
10 changes: 6 additions & 4 deletions lightningd/test/run-invoice-select-inchan.c
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,9 @@ void json_add_short_channel_id(struct json_stream *response UNNEEDED,
const struct short_channel_id *id UNNEEDED)
{ fprintf(stderr, "json_add_short_channel_id called!\n"); abort(); }
/* Generated stub for json_add_string */
void json_add_string(struct json_stream *result UNNEEDED, const char *fieldname UNNEEDED, const char *value TAKES UNNEEDED)
void json_add_string(struct json_stream *js UNNEEDED,
const char *fieldname UNNEEDED,
const char *str TAKES UNNEEDED)
{ fprintf(stderr, "json_add_string called!\n"); abort(); }
/* Generated stub for json_add_stringn */
void json_add_stringn(struct json_stream *result UNNEEDED, const char *fieldname UNNEEDED,
Expand Down Expand Up @@ -509,9 +511,6 @@ void log_(struct log *log UNNEEDED, enum log_level level UNNEEDED,
const char *fmt UNNEEDED, ...)

{ fprintf(stderr, "log_ called!\n"); abort(); }
/* Generated stub for maybe_disconnect_peer */
void maybe_disconnect_peer(struct lightningd *ld UNNEEDED, struct peer *peer UNNEEDED)
{ fprintf(stderr, "maybe_disconnect_peer called!\n"); abort(); }
/* Generated stub for merkle_tlv */
void merkle_tlv(const struct tlv_field *fields UNNEEDED, struct sha256 *merkle UNNEEDED)
{ fprintf(stderr, "merkle_tlv called!\n"); abort(); }
Expand Down Expand Up @@ -716,6 +715,9 @@ u8 *towire_channeld_dev_memleak(const tal_t *ctx UNNEEDED)
/* Generated stub for towire_channeld_dev_reenable_commit */
u8 *towire_channeld_dev_reenable_commit(const tal_t *ctx UNNEEDED)
{ fprintf(stderr, "towire_channeld_dev_reenable_commit called!\n"); abort(); }
/* Generated stub for towire_connectd_discard_peer */
u8 *towire_connectd_discard_peer(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED)
{ fprintf(stderr, "towire_connectd_discard_peer called!\n"); abort(); }
/* Generated stub for towire_connectd_peer_connect_subd */
u8 *towire_connectd_peer_connect_subd(const tal_t *ctx UNNEEDED, const struct node_id *id UNNEEDED, u64 counter UNNEEDED, const struct channel_id *channel_id UNNEEDED)
{ fprintf(stderr, "towire_connectd_peer_connect_subd called!\n"); abort(); }
Expand Down
4 changes: 3 additions & 1 deletion lightningd/test/run-log-pruning.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ void json_add_str_fmt(struct json_stream *js UNNEEDED,
const char *fmt UNNEEDED, ...)
{ fprintf(stderr, "json_add_str_fmt called!\n"); abort(); }
/* Generated stub for json_add_string */
void json_add_string(struct json_stream *result UNNEEDED, const char *fieldname UNNEEDED, const char *value TAKES UNNEEDED)
void json_add_string(struct json_stream *js UNNEEDED,
const char *fieldname UNNEEDED,
const char *str TAKES UNNEEDED)
{ fprintf(stderr, "json_add_string called!\n"); abort(); }
/* Generated stub for json_add_time */
void json_add_time(struct json_stream *result UNNEEDED, const char *fieldname UNNEEDED,
Expand Down
10 changes: 5 additions & 5 deletions plugins/test/run-route-overlong.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ void json_add_short_channel_id(struct json_stream *response UNNEEDED,
const struct short_channel_id *id UNNEEDED)
{ fprintf(stderr, "json_add_short_channel_id called!\n"); abort(); }
/* Generated stub for json_add_string */
void json_add_string(struct json_stream *result UNNEEDED, const char *fieldname UNNEEDED, const char *value TAKES UNNEEDED)
void json_add_string(struct json_stream *js UNNEEDED,
const char *fieldname UNNEEDED,
const char *str TAKES UNNEEDED)
{ fprintf(stderr, "json_add_string called!\n"); abort(); }
/* Generated stub for json_add_timeabs */
void json_add_timeabs(struct json_stream *result UNNEEDED, const char *fieldname UNNEEDED,
Expand Down Expand Up @@ -155,12 +157,10 @@ bool json_to_u16(const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED,
uint16_t *num UNNEEDED)
{ fprintf(stderr, "json_to_u16 called!\n"); abort(); }
/* Generated stub for json_to_u32 */
bool json_to_u32(const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED,
uint32_t *num UNNEEDED)
bool json_to_u32(const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED, u32 *num UNNEEDED)
{ fprintf(stderr, "json_to_u32 called!\n"); abort(); }
/* Generated stub for json_to_u64 */
bool json_to_u64(const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED,
uint64_t *num UNNEEDED)
bool json_to_u64(const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED, u64 *num UNNEEDED)
{ fprintf(stderr, "json_to_u64 called!\n"); abort(); }
/* Generated stub for json_tok_bin_from_hex */
u8 *json_tok_bin_from_hex(const tal_t *ctx UNNEEDED, const char *buffer UNNEEDED, const jsmntok_t *tok UNNEEDED)
Expand Down
13 changes: 5 additions & 8 deletions tests/test_closing.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def test_penalty_inhtlc(node_factory, bitcoind, executor, chainparams):
bitcoind.generate_block(100)

sync_blockheight(bitcoind, [l1, l2])
wait_for(lambda: len(l2.rpc.listpeers()['peers']) == 0)
wait_for(lambda: only_one(l2.rpc.listpeers()['peers'])['channels'] == [])

# Do one last pass over the logs to extract the reactions l2 sent
l2.daemon.logsearch_start = needle
Expand Down Expand Up @@ -679,7 +679,7 @@ def test_penalty_outhtlc(node_factory, bitcoind, executor, chainparams):
bitcoind.generate_block(100)

sync_blockheight(bitcoind, [l1, l2])
wait_for(lambda: len(l2.rpc.listpeers()['peers']) == 0)
wait_for(lambda: only_one(l2.rpc.listpeers()['peers'])['channels'] == [])

# Do one last pass over the logs to extract the reactions l2 sent
l2.daemon.logsearch_start = needle
Expand Down Expand Up @@ -3447,10 +3447,8 @@ def test_you_forgot_closed_channel(node_factory, executor):
wait_for(lambda: only_one(only_one(l2.rpc.listpeers()['peers'])['channels'])['state'] == 'CLOSINGD_COMPLETE')
assert only_one(only_one(l1.rpc.listpeers()['peers'])['channels'])['state'] == 'CLOSINGD_SIGEXCHANGE'

# l2 closes on us.
wait_for(lambda: only_one(l1.rpc.listpeers()['peers'])['connected'] is False)

# l1 reconnects, it should succeed.
# l1 won't send anything else until we reconnect, then it should succeed.
l1.rpc.disconnect(l2.info['id'], force=True)
l1.rpc.connect(l2.info['id'], 'localhost', l2.port)
fut.result(TIMEOUT)

Expand Down Expand Up @@ -3486,8 +3484,7 @@ def no_new_blocks(req):
wait_for(lambda: only_one(only_one(l2.rpc.listpeers()['peers'])['channels'])['state'] == 'ONCHAIN')

# l1 reconnects, it should succeed.
# l1 will disconnect once it sees block
wait_for(lambda: only_one(l1.rpc.listpeers()['peers'])['connected'] is False)
l1.rpc.disconnect(l2.info['id'], force=True)
l1.rpc.connect(l2.info['id'], 'localhost', l2.port)
fut.result(TIMEOUT)

Expand Down
Loading

0 comments on commit a3c4908

Please sign in to comment.