forked from ElementsProject/lightning
-
Notifications
You must be signed in to change notification settings - Fork 0
/
daemon_conn.c
114 lines (98 loc) · 2.65 KB
/
daemon_conn.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#include <ccan/fdpass/fdpass.h>
#include <ccan/io/fdpass/fdpass.h>
#include <ccan/take/take.h>
#include <common/daemon_conn.h>
#include <wire/wire_io.h>
#include <wire/wire_sync.h>
struct io_plan *daemon_conn_read_next(struct io_conn *conn,
struct daemon_conn *dc)
{
dc->msg_in = tal_free(dc->msg_in);
return io_read_wire(conn, dc->ctx, &dc->msg_in, dc->daemon_conn_recv,
dc);
}
struct io_plan *daemon_conn_write_next(struct io_conn *conn,
struct daemon_conn *dc)
{
const u8 *msg;
again:
msg = msg_dequeue(&dc->out);
if (msg) {
int fd = msg_extract_fd(msg);
if (fd >= 0) {
tal_free(msg);
return io_send_fd(conn, fd, true,
daemon_conn_write_next, dc);
}
return io_write_wire(conn, take(msg), daemon_conn_write_next,
dc);
} else if (dc->msg_queue_cleared_cb) {
if (dc->msg_queue_cleared_cb(conn, dc))
goto again;
}
return msg_queue_wait(conn, &dc->out, daemon_conn_write_next, dc);
}
bool daemon_conn_sync_flush(struct daemon_conn *dc)
{
const u8 *msg;
int daemon_fd;
/* Flush any current packet. */
if (!io_flush_sync(dc->conn))
return false;
/* Make fd blocking for the duration */
daemon_fd = io_conn_fd(dc->conn);
if (!io_fd_block(daemon_fd, true))
return false;
/* Flush existing messages. */
while ((msg = msg_dequeue(&dc->out)) != NULL) {
int fd = msg_extract_fd(msg);
if (fd >= 0) {
tal_free(msg);
if (!fdpass_send(daemon_fd, fd))
break;
} else if (!wire_sync_write(daemon_fd, take(msg)))
break;
}
io_fd_block(daemon_fd, false);
/* Success if and only if we flushed them all. */
return msg == NULL;
}
static struct io_plan *daemon_conn_start(struct io_conn *conn,
struct daemon_conn *dc)
{
dc->conn = conn;
return io_duplex(conn, daemon_conn_read_next(conn, dc),
daemon_conn_write_next(conn, dc));
}
void daemon_conn_init(tal_t *ctx, struct daemon_conn *dc, int fd,
struct io_plan *(*daemon_conn_recv)(struct io_conn *,
struct daemon_conn *),
void (*finish)(struct io_conn *, struct daemon_conn *dc))
{
struct io_conn *conn;
dc->daemon_conn_recv = daemon_conn_recv;
dc->ctx = ctx;
dc->msg_in = NULL;
msg_queue_init(&dc->out, dc->ctx);
dc->msg_queue_cleared_cb = NULL;
conn = io_new_conn(ctx, fd, daemon_conn_start, dc);
if (finish)
io_set_finish(conn, finish, dc);
}
void daemon_conn_clear(struct daemon_conn *dc)
{
io_set_finish(dc->conn, NULL, NULL);
io_close(dc->conn);
}
void daemon_conn_send(struct daemon_conn *dc, const u8 *msg)
{
msg_enqueue(&dc->out, msg);
}
void daemon_conn_send_fd(struct daemon_conn *dc, int fd)
{
msg_enqueue_fd(&dc->out, fd);
}
void daemon_conn_wake(struct daemon_conn *dc)
{
msg_wake(&dc->out);
}