Skip to content

Commit

Permalink
connectd: rely on the master to tell us to reconnect.
Browse files Browse the repository at this point in the history
connectd tells master about every disconnection, and master knows
whether it's important to reconnect.  Just get the master to invoke a new
connect command if it considers the peer important!

The only twist is timeouts: we don't want to immediately reconnect if
we've failed to connect.  To solve this, connectd passes a 'delaytime'
to the master when a connection fails, and the master passes it back
when it asks for a connection.

Signed-off-by: Rusty Russell <[email protected]>
  • Loading branch information
rustyrussell authored and cdecker committed Aug 9, 2018
1 parent 30f08cc commit 8939a50
Show file tree
Hide file tree
Showing 14 changed files with 116 additions and 190 deletions.
160 changes: 25 additions & 135 deletions connectd/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,43 +67,6 @@
#define INITIAL_WAIT_SECONDS 1
#define MAX_WAIT_SECONDS 300

/* We put everything in this struct (redundantly) to pass it to timer cb */
struct important_peerid {
struct daemon *daemon;

struct pubkey id;

/* How long to wait after failed connect */
unsigned int wait_seconds;

/* The timer we're using to reconnect */
struct oneshot *reconnect_timer;
};

/* We keep a set of peer ids we're always trying to reach. */
static const struct pubkey *
important_peerid_keyof(const struct important_peerid *imp)
{
return &imp->id;
}

static bool important_peerid_eq(const struct important_peerid *imp,
const struct pubkey *key)
{
return pubkey_eq(&imp->id, key);
}

static size_t important_peerid_hash(const struct pubkey *id)
{
return siphash24(siphash_seed(), id, sizeof(*id));
}

HTABLE_DEFINE_TYPE(struct important_peerid,
important_peerid_keyof,
important_peerid_hash,
important_peerid_eq,
important_peerid_map);

struct listen_fd {
int fd;
/* If we bind() IPv6 then IPv4 to same port, we *may* fail to listen()
Expand Down Expand Up @@ -151,15 +114,9 @@ struct daemon {

struct timers timers;

/* Important peers */
struct important_peerid_map important_peerids;

/* Local and global features to offer to peers. */
u8 *localfeatures, *globalfeatures;

/* Automatically reconnect. */
bool reconnect;

/* Allow localhost to be considered "public" */
bool dev_allow_localhost;

Expand Down Expand Up @@ -192,6 +149,9 @@ struct reaching {

/* How far did we get? */
const char *connstate;

/* How many seconds did we wait this time? */
u32 seconds_waited;
};

struct peer {
Expand Down Expand Up @@ -225,9 +185,6 @@ struct addrhint {
struct wireaddr_internal addr;
};

/* FIXME: Reorder */
static void retry_important(struct important_peerid *imp);

static struct peer *find_reconnecting_peer(struct daemon *daemon,
const struct pubkey *id)
{
Expand Down Expand Up @@ -865,7 +822,7 @@ static struct io_plan *connect_init(struct daemon_conn *master,
daemon, msg,
&daemon->id, &daemon->globalfeatures,
&daemon->localfeatures, &proposed_wireaddr,
&proposed_listen_announce, &daemon->reconnect,
&proposed_listen_announce,
&proxyaddr, &daemon->use_proxy_always,
&daemon->dev_allow_localhost, &daemon->use_dns,
&tor_password)) {
Expand Down Expand Up @@ -955,48 +912,40 @@ struct io_plan *connection_out(struct io_conn *conn, struct reaching *reach)
handshake_out_success, reach);
}

static void PRINTF_FMT(3,4)
static void PRINTF_FMT(4,5)
connect_failed(struct daemon *daemon,
const struct pubkey *id,
u32 seconds_waited,
const char *errfmt, ...)
{
u8 *msg;
struct important_peerid *imp;
va_list ap;
char *err;
u32 wait_seconds;

va_start(ap, errfmt);
err = tal_vfmt(tmpctx, errfmt, ap);
va_end(ap);

/* Wait twice as long to reconnect, between min and max. */
wait_seconds = seconds_waited * 2;
if (wait_seconds > MAX_WAIT_SECONDS)
wait_seconds = MAX_WAIT_SECONDS;
if (wait_seconds < INITIAL_WAIT_SECONDS)
wait_seconds = INITIAL_WAIT_SECONDS;

/* Tell any connect command what happened. */
msg = towire_connectctl_connect_failed(NULL, id, err);
msg = towire_connectctl_connect_failed(NULL, id, err, wait_seconds);
daemon_conn_send(&daemon->master, take(msg));

status_trace("Failed connected out for %s: %s",
type_to_string(tmpctx, struct pubkey, id),
err);

/* If we want to keep trying, do so. */
imp = important_peerid_map_get(&daemon->important_peerids, id);
if (imp) {
imp->wait_seconds *= 2;
if (imp->wait_seconds > MAX_WAIT_SECONDS)
imp->wait_seconds = MAX_WAIT_SECONDS;

status_trace("...will try again in %u seconds",
imp->wait_seconds);
/* If important_id freed, this will be removed too */
imp->reconnect_timer
= new_reltimer(&daemon->timers, imp,
time_from_sec(imp->wait_seconds),
retry_important, imp);
}
}

static void destroy_io_conn(struct io_conn *conn, struct reaching *reach)
{
connect_failed(reach->daemon, &reach->id,
connect_failed(reach->daemon, &reach->id, reach->seconds_waited,
"%s: %s", reach->connstate, strerror(errno));
tal_free(reach);
}
Expand Down Expand Up @@ -1128,7 +1077,9 @@ gossip_resolve_addr(const tal_t *ctx, const struct pubkey *id)
return addr;
}

static void try_reach_peer(struct daemon *daemon, const struct pubkey *id)
static void try_reach_peer(struct daemon *daemon,
const struct pubkey *id,
u32 seconds_waited)
{
struct wireaddr_internal *a;
struct addrhint *hint;
Expand Down Expand Up @@ -1166,7 +1117,7 @@ static void try_reach_peer(struct daemon *daemon, const struct pubkey *id)
}

if (!a) {
connect_failed(daemon, id, "No address known");
connect_failed(daemon, id, seconds_waited, "No address known");
return;
}

Expand Down Expand Up @@ -1221,7 +1172,7 @@ static void try_reach_peer(struct daemon *daemon, const struct pubkey *id)
fd = socket(af, SOCK_STREAM, 0);

if (fd < 0) {
connect_failed(daemon, id,
connect_failed(daemon, id, seconds_waited,
"Can't open %i socket for %s (%s)",
af,
type_to_string(tmpctx, struct pubkey, id),
Expand All @@ -1235,6 +1186,7 @@ static void try_reach_peer(struct daemon *daemon, const struct pubkey *id)
reach->id = *id;
reach->addr = *a;
reach->connstate = "Connection establishment";
reach->seconds_waited = seconds_waited;
list_add_tail(&daemon->reaching, &reach->list);
tal_add_destructor(reach, destroy_reaching);

Expand All @@ -1244,34 +1196,16 @@ static void try_reach_peer(struct daemon *daemon, const struct pubkey *id)
io_new_conn(reach, fd, conn_init, reach);
}

/* Called from timer, so needs single-arg declaration */
static void retry_important(struct important_peerid *imp)
{
/* In case we've come off a timer, don't leave dangling pointer */
imp->reconnect_timer = NULL;

/* With --dev-no-reconnect or --offline, we only want explicit
* connects */
if (!imp->daemon->reconnect)
return;

try_reach_peer(imp->daemon, &imp->id);
}

static struct io_plan *connect_to_peer(struct io_conn *conn,
struct daemon *daemon, const u8 *msg)
{
struct pubkey id;
struct important_peerid *imp;
u32 seconds_waited;

if (!fromwire_connectctl_connect_to_peer(msg, &id))
if (!fromwire_connectctl_connect_to_peer(msg, &id, &seconds_waited))
master_badmsg(WIRE_CONNECTCTL_CONNECT_TO_PEER, msg);

/* If this is an important peer, free any outstanding timer */
imp = important_peerid_map_get(&daemon->important_peerids, &id);
if (imp)
imp->reconnect_timer = tal_free(imp->reconnect_timer);
try_reach_peer(daemon, &id);
try_reach_peer(daemon, &id, seconds_waited);
return daemon_conn_read_next(conn, &daemon->master);
}

Expand All @@ -1292,46 +1226,11 @@ static struct io_plan *addr_hint(struct io_conn *conn,
return daemon_conn_read_next(conn, &daemon->master);
}

static struct io_plan *peer_important(struct io_conn *conn,
struct daemon *daemon, const u8 *msg)
{
struct pubkey id;
bool important;
struct important_peerid *imp;

if (!fromwire_connectctl_peer_important(msg, &id, &important))
master_badmsg(WIRE_CONNECTCTL_PEER_IMPORTANT, msg);

imp = important_peerid_map_get(&daemon->important_peerids, &id);
if (important) {
if (!imp) {
imp = tal(daemon, struct important_peerid);
imp->id = id;
imp->daemon = daemon;
imp->wait_seconds = INITIAL_WAIT_SECONDS;
important_peerid_map_add(&daemon->important_peerids,
imp);
/* Start trying to reaching it now. */
retry_important(imp);
}
} else {
if (imp) {
important_peerid_map_del(&daemon->important_peerids,
imp);
/* Stop trying to reach it (if we are) */
tal_free(find_reaching(daemon, &imp->id));
}
}

return daemon_conn_read_next(conn, &daemon->master);
}

static struct io_plan *peer_disconnected(struct io_conn *conn,
struct daemon *daemon, const u8 *msg)
{
struct pubkey id, *key;
struct peer *peer;
struct important_peerid *imp;

if (!fromwire_connectctl_peer_disconnected(msg, &id))
master_badmsg(WIRE_CONNECTCTL_PEER_DISCONNECTED, msg);
Expand All @@ -1352,11 +1251,6 @@ static struct io_plan *peer_disconnected(struct io_conn *conn,
if (peer)
io_wake(peer);

imp = important_peerid_map_get(&daemon->important_peerids, &id);
if (imp) {
imp->wait_seconds = INITIAL_WAIT_SECONDS;
retry_important(imp);
}
return daemon_conn_read_next(conn, &daemon->master);
}

Expand All @@ -1378,9 +1272,6 @@ static struct io_plan *recv_req(struct io_conn *conn, struct daemon_conn *master
case WIRE_CONNECTCTL_PEER_ADDRHINT:
return addr_hint(conn, daemon, master->msg_in);

case WIRE_CONNECTCTL_PEER_IMPORTANT:
return peer_important(conn, daemon, master->msg_in);

case WIRE_CONNECTCTL_PEER_DISCONNECTED:
return peer_disconnected(conn, daemon, master->msg_in);

Expand Down Expand Up @@ -1418,7 +1309,6 @@ int main(int argc, char *argv[])
list_head_init(&daemon->reconnecting);
list_head_init(&daemon->reaching);
list_head_init(&daemon->addrhints);
important_peerid_map_init(&daemon->important_peerids);
timers_init(&daemon->timers, time_mono());
daemon->broken_resolver_response = NULL;
daemon->listen_fds = tal_arr(daemon, struct listen_fd, 0);
Expand Down
8 changes: 2 additions & 6 deletions connectd/connect_wire.csv
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ connectctl_init,,lfeatures,lflen*u8
connectctl_init,,num_wireaddrs,u16
connectctl_init,,wireaddrs,num_wireaddrs*struct wireaddr_internal
connectctl_init,,listen_announce,num_wireaddrs*enum addr_listen_announce
connectctl_init,,reconnect,bool
connectctl_init,,tor_proxyaddr,?struct wireaddr
connectctl_init,,use_tor_proxy_always,bool
connectctl_init,,dev_allow_localhost,bool
Expand Down Expand Up @@ -45,16 +44,13 @@ connectctl_peer_addrhint,,addr,struct wireaddr_internal
# Master -> connectd: connect to a peer.
connectctl_connect_to_peer,2001
connectctl_connect_to_peer,,id,struct pubkey
connectctl_connect_to_peer,,seconds_waited,u32

# Connectd->master: connect failed.
connectctl_connect_failed,2020
connectctl_connect_failed,,id,struct pubkey
connectctl_connect_failed,,failreason,wirestring

# Master -> connectd: try to always maintain connection to this peer (or not)
connectctl_peer_important,2010
connectctl_peer_important,,id,struct pubkey
connectctl_peer_important,,important,bool
connectctl_connect_failed,,seconds_to_delay,u32

# Connectd -> master: we got a peer. Two fds: peer and gossip
connect_peer_connected,2002
Expand Down
17 changes: 12 additions & 5 deletions lightningd/channel.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <hsmd/gen_hsm_client_wire.h>
#include <inttypes.h>
#include <lightningd/channel.h>
#include <lightningd/connect_control.h>
#include <lightningd/gen_channel_state_names.h>
#include <lightningd/hsm_control.h>
#include <lightningd/jsonrpc.h>
Expand All @@ -22,7 +23,8 @@ static bool connects_to_peer(struct subd *owner)
return owner && owner->talks_to_peer;
}

void channel_set_owner(struct channel *channel, struct subd *owner)
void channel_set_owner(struct channel *channel, struct subd *owner,
bool reconnect)
{
struct subd *old_owner = channel->owner;
channel->owner = owner;
Expand All @@ -33,7 +35,12 @@ void channel_set_owner(struct channel *channel, struct subd *owner)
u8 *msg = towire_connectctl_peer_disconnected(NULL,
&channel->peer->id);
subd_send_msg(channel->peer->ld->connectd, take(msg));
channel->connected = false;
}

if (reconnect) {
/* Reconnect after 1 second: prevents some spurious
* reconnects during tests. */
delay_then_reconnect(channel, 1);
}
}
channel->connected = connects_to_peer(owner);
Expand Down Expand Up @@ -88,7 +95,7 @@ static void destroy_channel(struct channel *channel)
htlc_state_name(hin->hstate));

/* Free any old owner still hanging around. */
channel_set_owner(channel, NULL);
channel_set_owner(channel, NULL, false);

list_del_from(&channel->peer->channels, &channel->list);
}
Expand Down Expand Up @@ -342,7 +349,7 @@ void channel_fail_permanent(struct channel *channel, const char *fmt, ...)
channel->error = towire_errorfmt(channel, &cid, "%s", why);
}

channel_set_owner(channel, NULL);
channel_set_owner(channel, NULL, false);
/* Drop non-cooperatively (unilateral) to chain. */
drop_to_chain(ld, channel, false);
tal_free(why);
Expand Down Expand Up @@ -405,5 +412,5 @@ void channel_fail_transient(struct channel *channel, const char *fmt, ...)
}
#endif

channel_set_owner(channel, NULL);
channel_set_owner(channel, NULL, true);
}
3 changes: 2 additions & 1 deletion lightningd/channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ void delete_channel(struct channel *channel);
const char *channel_state_name(const struct channel *channel);
const char *channel_state_str(enum channel_state state);

void channel_set_owner(struct channel *channel, struct subd *owner);
void channel_set_owner(struct channel *channel, struct subd *owner,
bool reconnect);

/* Channel has failed, but can try again. */
PRINTF_FMT(2,3) void channel_fail_transient(struct channel *channel,
Expand Down
Loading

0 comments on commit 8939a50

Please sign in to comment.