forked from ElementsProject/lightning
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjson_stream.c
232 lines (196 loc) · 5.41 KB
/
json_stream.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#include <ccan/io/io.h>
/* To reach into io_plan: not a public header! */
#include <ccan/io/backend.h>
#include <ccan/json_escape/json_escape.h>
#include <ccan/json_out/json_out.h>
#include <ccan/str/hex/hex.h>
#include <ccan/tal/str/str.h>
#include <common/daemon.h>
#include <common/utils.h>
#include <lightningd/json.h>
#include <lightningd/json_stream.h>
#include <lightningd/log.h>
#include <stdarg.h>
#include <stdio.h>
struct json_stream {
/* NULL if we ran OOM! */
struct json_out *jout;
/* Who is writing to this buffer now; NULL if nobody is. */
struct command *writer;
/* Who is io_writing from this buffer now: NULL if nobody is. */
struct io_conn *reader;
struct io_plan *(*reader_cb)(struct io_conn *conn,
struct json_stream *js,
void *arg);
void *reader_arg;
size_t len_read;
/* Where to log I/O */
struct log *log;
};
static void adjust_io_write(struct json_out *jout,
ptrdiff_t delta,
struct json_stream *js)
{
/* If io_write is in progress, we shift it to point to new buffer pos */
if (js->reader)
/* FIXME: This, or something prettier (io_replan?) belong in ccan/io! */
js->reader->plan[IO_OUT].arg.u1.cp += delta;
}
struct json_stream *new_json_stream(const tal_t *ctx,
struct command *writer,
struct log *log)
{
struct json_stream *js = tal(ctx, struct json_stream);
/* FIXME: Add magic so tal_resize can fail! */
js->jout = json_out_new(js);
json_out_call_on_move(js->jout, adjust_io_write, js);
js->writer = writer;
js->reader = NULL;
js->log = log;
return js;
}
struct json_stream *json_stream_dup(const tal_t *ctx,
struct json_stream *original,
struct log *log)
{
struct json_stream *js = tal_dup(ctx, struct json_stream, original);
if (original->jout)
js->jout = json_out_dup(js, original->jout);
js->log = log;
return js;
}
bool json_stream_still_writing(const struct json_stream *js)
{
return js->writer != NULL;
}
void json_stream_log_suppress(struct json_stream *js, const char *cmd_name)
{
/* Really shouldn't be used for anything else */
assert(streq(cmd_name, "getlog"));
js->log = NULL;
}
/* If we have an allocation failure. */
static void COLD js_oom(struct json_stream *js)
{
js->jout = tal_free(js->jout);
}
void json_stream_append(struct json_stream *js,
const char *str, size_t len)
{
char *dest;
if (!js->jout)
return;
dest = json_out_direct(js->jout, len);
if (!dest) {
js_oom(js);
return;
}
memcpy(dest, str, len);
}
void json_stream_close(struct json_stream *js, struct command *writer)
{
/* FIXME: We use writer == NULL for malformed: make writer a void *?
* I used to assert(writer); here. */
assert(js->writer == writer);
/* Should be well-formed at this point! */
json_out_finished(js->jout);
json_stream_append(js, "\n\n", strlen("\n\n"));
json_stream_flush(js);
js->writer = NULL;
}
/* Also called when we're oom, so it will kill reader. */
void json_stream_flush(struct json_stream *js)
{
/* Wake the stream reader. FIXME: Could have a flag here to optimize */
io_wake(js);
}
char *json_member_direct(struct json_stream *js,
const char *fieldname, size_t extra)
{
char *dest;
if (!js->jout)
return NULL;
dest = json_out_member_direct(js->jout, fieldname, extra);
if (!dest)
js_oom(js);
return dest;
}
void json_array_start(struct json_stream *js, const char *fieldname)
{
if (js->jout && !json_out_start(js->jout, fieldname, '['))
js_oom(js);
}
void json_array_end(struct json_stream *js)
{
if (js->jout && !json_out_end(js->jout, ']'))
js_oom(js);
}
void json_object_start(struct json_stream *js, const char *fieldname)
{
if (js->jout && !json_out_start(js->jout, fieldname, '{'))
js_oom(js);
}
void json_object_end(struct json_stream *js)
{
if (js->jout && !json_out_end(js->jout, '}'))
js_oom(js);
}
void json_object_compat_end(struct json_stream *js)
{
/* In 0.7.1 we upgraded pylightning to no longer need this. */
#ifdef COMPAT_V070
json_stream_append(js, " ", 1);
#endif
json_object_end(js);
}
void json_add_member(struct json_stream *js,
const char *fieldname,
bool quote,
const char *fmt, ...)
{
va_list ap;
va_start(ap, fmt);
if (js->jout && !json_out_addv(js->jout, fieldname, quote, fmt, ap))
js_oom(js);
va_end(ap);
}
/* This is where we read the json_stream and write it to conn */
static struct io_plan *json_stream_output_write(struct io_conn *conn,
struct json_stream *js)
{
const char *p;
/* Out of memory? Nothing we can do but close conn */
if (!js->jout)
return io_close(conn);
/* For when we've just done some output */
json_out_consume(js->jout, js->len_read);
/* Get how much we can write out from js */
p = json_out_contents(js->jout, &js->len_read);
/* Nothing in buffer? */
if (!p) {
/* We're not doing io_write now, unset. */
js->reader = NULL;
if (!json_stream_still_writing(js))
return js->reader_cb(conn, js, js->reader_arg);
return io_out_wait(conn, js, json_stream_output_write, js);
}
js->reader = conn;
if (js->log)
log_io(js->log, LOG_IO_OUT, "", p, js->len_read);
return io_write(conn,
p, js->len_read,
json_stream_output_write, js);
}
struct io_plan *json_stream_output_(struct json_stream *js,
struct io_conn *conn,
struct io_plan *(*cb)(struct io_conn *conn,
struct json_stream *js,
void *arg),
void *arg)
{
assert(!js->reader);
js->reader_cb = cb;
js->reader_arg = arg;
js->len_read = 0;
return json_stream_output_write(conn, js);
}