forked from memcached/memcached
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproto_proxy.c
4569 lines (3969 loc) · 155 KB
/
proto_proxy.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Functions for handling the proxy layer. wraps text protocols
*
* NOTE: many lua functions generate pointers via "lua_newuserdatauv" or
* similar. Normal memory checking isn't done as lua will throw a high level
* error if malloc fails. Must keep this in mind while allocating data so any
* manually malloc'ed information gets freed properly.
*/
#include "memcached.h"
#include <string.h>
#include <stdlib.h>
#include <ctype.h>
#include <lua.h>
#include <lualib.h>
#include <lauxlib.h>
#include "config.h"
#if defined(__linux__)
#define USE_EVENTFD 1
#include <sys/eventfd.h>
#endif
#ifdef HAVE_LIBURING
#include <liburing.h>
#include <poll.h> // POLLOUT for liburing.
#define PRING_QUEUE_SQ_ENTRIES 2048
#define PRING_QUEUE_CQ_ENTRIES 16384
#endif
#include "proto_proxy.h"
#include "proto_text.h"
#include "queue.h"
#define XXH_INLINE_ALL // modifier for xxh3's include below
#include "xxhash.h"
#ifdef PROXY_DEBUG
#define P_DEBUG(...) \
do { \
fprintf(stderr, __VA_ARGS__); \
} while (0)
#else
#define P_DEBUG(...)
#endif
#define WSTAT_L(t) pthread_mutex_lock(&t->stats.mutex);
#define WSTAT_UL(t) pthread_mutex_unlock(&t->stats.mutex);
#define WSTAT_INCR(c, stat, amount) { \
pthread_mutex_lock(&c->thread->stats.mutex); \
c->thread->stats.stat += amount; \
pthread_mutex_unlock(&c->thread->stats.mutex); \
}
#define STAT_L(ctx) pthread_mutex_lock(&ctx->stats_lock);
#define STAT_UL(ctx) pthread_mutex_unlock(&ctx->stats_lock);
#define STAT_INCR(ctx, stat, amount) { \
pthread_mutex_lock(&ctx->stats_lock); \
ctx->global_stats.stat += amount; \
pthread_mutex_unlock(&ctx->stats_lock); \
}
#define STAT_DECR(ctx, stat, amount) { \
pthread_mutex_lock(&ctx->stats_lock); \
ctx->global_stats.stat -= amount; \
pthread_mutex_unlock(&ctx->stats_lock); \
}
// FIXME (v2): do include dir properly.
#include "vendor/mcmc/mcmc.h"
// Note: value created from thin air. Could be shorter.
#define MCP_REQUEST_MAXLEN KEY_MAX_LENGTH * 2
#define ENDSTR "END\r\n"
#define ENDLEN sizeof(ENDSTR)-1
#define MCP_THREAD_UPVALUE 1
#define MCP_ATTACH_UPVALUE 2
#define MCP_BACKEND_UPVALUE 3
// all possible commands.
#define CMD_FIELDS \
X(CMD_MG) \
X(CMD_MS) \
X(CMD_MD) \
X(CMD_MN) \
X(CMD_MA) \
X(CMD_ME) \
X(CMD_GET) \
X(CMD_GAT) \
X(CMD_SET) \
X(CMD_ADD) \
X(CMD_CAS) \
X(CMD_GETS) \
X(CMD_GATS) \
X(CMD_INCR) \
X(CMD_DECR) \
X(CMD_TOUCH) \
X(CMD_APPEND) \
X(CMD_DELETE) \
X(CMD_REPLACE) \
X(CMD_PREPEND) \
X(CMD_END_STORAGE) \
X(CMD_QUIT) \
X(CMD_STATS) \
X(CMD_SLABS) \
X(CMD_WATCH) \
X(CMD_LRU) \
X(CMD_VERSION) \
X(CMD_SHUTDOWN) \
X(CMD_EXTSTORE) \
X(CMD_FLUSH_ALL) \
X(CMD_VERBOSITY) \
X(CMD_LRU_CRAWLER) \
X(CMD_REFRESH_CERTS) \
X(CMD_CACHE_MEMLIMIT)
#define X(name) name,
enum proxy_defines {
P_OK = 0,
CMD_FIELDS
CMD_SIZE, // used to define array size for command hooks.
CMD_ANY, // override _all_ commands
CMD_ANY_STORAGE, // override commands specific to key storage.
};
#undef X
// certain classes of ascii commands have similar parsing (ie;
// get/gets/gat/gats). Use types so we don't have to test a ton of them.
enum proxy_cmd_types {
CMD_TYPE_GENERIC = 0,
CMD_TYPE_GET, // get/gets/gat/gats
CMD_TYPE_UPDATE, // add/set/cas/prepend/append/replace
CMD_TYPE_META, // m*'s.
};
typedef struct _io_pending_proxy_t io_pending_proxy_t;
typedef struct proxy_event_thread_s proxy_event_thread_t;
#ifdef HAVE_LIBURING
typedef void (*proxy_event_cb)(void *udata, struct io_uring_cqe *cqe);
typedef struct {
void *udata;
proxy_event_cb cb;
bool set; // NOTE: not sure if necessary if code structured properly
} proxy_event_t;
static struct __kernel_timespec updater_ts = {.tv_sec = 3, .tv_nsec = 0};
static void _proxy_evthr_evset_notifier(proxy_event_thread_t *t);
static void _proxy_evthr_evset_clock(proxy_event_thread_t *t);
static void *proxy_event_thread_ur(void *arg);
static void proxy_event_updater_ur(void *udata, struct io_uring_cqe *cqe);
#endif
struct proxy_user_stats {
size_t num_stats; // number of stats, for sizing various arrays
char **names; // not needed for worker threads
uint64_t *counters; // array of counters.
};
struct proxy_global_stats {
uint64_t config_reloads;
uint64_t config_reload_fails;
uint64_t backend_total;
uint64_t backend_disconn; // backends with no connections
uint64_t backend_requests; // reqs sent to backends
uint64_t backend_responses; // responses received from backends
uint64_t backend_errors; // errors from backends
uint64_t backend_marked_bad; // backend set to autofail
uint64_t backend_failed; // an error caused a backend reset
};
struct proxy_tunables {
struct timeval connect;
struct timeval retry; // wait time before retrying a dead backend
struct timeval read;
#ifdef HAVE_LIBURING
struct __kernel_timespec connect_ur;
struct __kernel_timespec retry_ur;
struct __kernel_timespec read_ur;
#endif // HAVE_LIBURING
int backend_failure_limit;
};
typedef STAILQ_HEAD(pool_head_s, mcp_pool_s) pool_head_t;
typedef struct {
lua_State *proxy_state;
void *proxy_code;
proxy_event_thread_t *proxy_threads;
pthread_mutex_t config_lock;
pthread_cond_t config_cond;
pthread_t config_tid;
pthread_mutex_t worker_lock;
pthread_cond_t worker_cond;
pthread_t manager_tid; // deallocation management thread
pthread_mutex_t manager_lock;
pthread_cond_t manager_cond;
pool_head_t manager_head; // stack for pool deallocation.
bool worker_done; // signal variable for the worker lock/cond system.
bool worker_failed; // covered by worker_lock as well.
bool use_uring; // use IO_URING for backend connections.
struct proxy_global_stats global_stats;
struct proxy_user_stats user_stats;
struct proxy_tunables tunables; // NOTE: updates covered by stats_lock
pthread_mutex_t stats_lock; // used for rare global counters
} proxy_ctx_t;
struct proxy_hook {
int lua_ref;
bool is_lua; // pull the lua reference and call it as a lua function.
};
// TODO (v2): some hash functions (crc?) might require initializers. If we run into
// any the interface might need expanding.
typedef uint64_t (*key_hash_func)(const void *key, size_t len, uint64_t seed);
struct proxy_hash_func {
key_hash_func func;
};
typedef const char *(*key_hash_filter_func)(const char *conf, const char *key, size_t klen, size_t *newlen);
typedef uint32_t (*hash_selector_func)(uint64_t hash, void *ctx);
struct proxy_hash_caller {
hash_selector_func selector_func;
void *ctx;
};
enum mcp_backend_states {
mcp_backend_read = 0, // waiting to read any response
mcp_backend_parse, // have some buffered data to check
mcp_backend_read_end, // looking for an "END" marker for GET
mcp_backend_want_read, // read more data to complete command
mcp_backend_next, // advance to the next IO
};
typedef struct mcp_backend_s mcp_backend_t;
typedef struct mcp_request_s mcp_request_t;
typedef struct mcp_parser_s mcp_parser_t;
#define PARSER_MAX_TOKENS 24
struct mcp_parser_meta_s {
uint64_t flags;
};
// Note that we must use offsets into request for tokens,
// as *request can change between parsing and later accessors.
struct mcp_parser_s {
const char *request;
void *vbuf; // temporary buffer for holding value lengths.
uint8_t command;
uint8_t cmd_type; // command class.
uint8_t ntokens;
uint8_t keytoken; // because GAT. sigh. also cmds without a key.
uint32_t parsed; // how far into the request we parsed already
uint32_t reqlen; // full length of request buffer.
int vlen;
uint32_t klen; // length of key.
uint16_t tokens[PARSER_MAX_TOKENS]; // offsets for start of each token
bool has_space; // a space was found after the last byte parsed.
union {
struct mcp_parser_meta_s meta;
} t;
};
#define MCP_PARSER_KEY(pr) (&pr.request[pr.tokens[pr.keytoken]])
#define MAX_REQ_TOKENS 2
struct mcp_request_s {
mcp_parser_t pr; // non-lua-specific parser handling.
struct timeval start; // time this object was created.
mcp_backend_t *be; // backend handling this request.
bool ascii_multiget; // ascii multiget mode. (hide errors/END)
bool was_modified; // need to rewrite the request
int tokent_ref; // reference to token table if modified.
char request[];
};
typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
#define MAX_IPLEN 45
#define MAX_PORTLEN 6
// TODO (v2): IOV_MAX tends to be 1000+ which would allow for more batching but we
// don't have a good temporary space and don't want to malloc/free on every
// write. transmit() uses the stack but we can't do that for uring's use case.
#if (IOV_MAX > 128)
#define BE_IOV_MAX 128
#else
#define BE_IOV_MAX IOV_MAX
#endif
struct mcp_backend_s {
char ip[MAX_IPLEN+1];
char port[MAX_PORTLEN+1];
double weight;
int depth;
int failed_count; // number of fails (timeouts) in a row
pthread_mutex_t mutex; // covers stack.
proxy_event_thread_t *event_thread; // event thread owning this backend.
void *client; // mcmc client
STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
io_head_t io_head; // stack of requests.
char *rbuf; // static allocated read buffer.
struct event event; // libevent
#ifdef HAVE_LIBURING
proxy_event_t ur_rd_ev; // liburing.
proxy_event_t ur_wr_ev; // need a separate event/cb for writing/polling
proxy_event_t ur_te_ev; // for timeout handling
#endif
enum mcp_backend_states state; // readback state machine
bool connecting; // in the process of an asynch connection.
bool can_write; // recently got a WANT_WRITE or are connecting.
bool stacked; // if backend already queued for syscalls.
bool bad; // timed out, marked as bad.
struct iovec write_iovs[BE_IOV_MAX]; // iovs to stage batched writes
};
typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
struct proxy_event_thread_s {
pthread_t thread_id;
struct event_base *base;
struct event notify_event; // listen event for the notify pipe/eventfd.
struct event clock_event; // timer for updating event thread data.
#ifdef HAVE_LIBURING
struct io_uring ring;
proxy_event_t ur_notify_event; // listen on eventfd.
proxy_event_t ur_clock_event; // timer for updating event thread data.
eventfd_t event_counter;
bool use_uring;
#endif
pthread_mutex_t mutex; // covers stack.
pthread_cond_t cond; // condition to wait on while stack drains.
io_head_t io_head_in; // inbound requests to process.
be_head_t be_head; // stack of backends for processing.
#ifdef USE_EVENTFD
int event_fd;
#else
int notify_receive_fd;
int notify_send_fd;
#endif
proxy_ctx_t *ctx; // main context.
struct proxy_tunables tunables; // periodically copied from main ctx
};
#define RESP_CMD_MAX 8
typedef struct {
mcmc_resp_t resp;
struct timeval start; // start time inherited from paired request
char cmd[RESP_CMD_MAX+1]; // until we can reverse CMD_*'s to strings directly.
int status; // status code from mcmc_read()
char *buf; // response line + potentially value.
size_t blen; // total size of the value to read.
int bread; // amount of bytes read into value so far.
} mcp_resp_t;
// re-cast an io_pending_t into this more descriptive structure.
// the first few items _must_ match the original struct.
struct _io_pending_proxy_t {
int io_queue_type;
LIBEVENT_THREAD *thread;
conn *c;
mc_resp *resp; // original struct ends here
struct _io_pending_proxy_t *next; // stack for IO submission
STAILQ_ENTRY(_io_pending_proxy_t) io_next; // stack for backends
int coro_ref; // lua registry reference to the coroutine
int mcpres_ref; // mcp.res reference used for await()
lua_State *coro; // pointer directly to the coroutine
mcp_backend_t *backend; // backend server to request from
struct iovec iov[2]; // request string + tail buffer
int iovcnt; // 1 or 2...
unsigned int iovbytes; // total bytes in the iovec
int await_ref; // lua reference if we were an await object
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
bool flushed; // whether we've fully written this request to a backend.
bool ascii_multiget; // passed on from mcp_r_t
bool is_await; // are we an await object?
};
// Note: does *be have to be a sub-struct? how stable are userdata pointers?
// https://stackoverflow.com/questions/38718475/lifetime-of-lua-userdata-pointers
// - says no.
typedef struct {
int ref; // luaL_ref reference.
mcp_backend_t *be;
} mcp_pool_be_t;
#define KEY_HASH_FILTER_MAX 5
typedef struct mcp_pool_s mcp_pool_t;
struct mcp_pool_s {
struct proxy_hash_caller phc;
key_hash_filter_func key_filter;
key_hash_func key_hasher;
pthread_mutex_t lock; // protects refcount.
proxy_ctx_t *ctx; // main context.
STAILQ_ENTRY(mcp_pool_s) next; // stack for deallocator.
char key_filter_conf[KEY_HASH_FILTER_MAX+1];
uint64_t hash_seed; // calculated from a string.
int refcount;
int phc_ref;
int self_ref; // TODO (v2): double check that this is needed.
int pool_size;
mcp_pool_be_t pool[];
};
typedef struct {
mcp_pool_t *main; // ptr to original
} mcp_pool_proxy_t;
static int proxy_run_coroutine(lua_State *Lc, mc_resp *resp, io_pending_proxy_t *p, conn *c);
#define PROCESS_MULTIGET true
#define PROCESS_NORMAL false
static void proxy_process_command(conn *c, char *command, size_t cmdlen, bool multiget);
static size_t _process_request_next_key(mcp_parser_t *pr);
static int process_request(mcp_parser_t *pr, const char *command, size_t cmdlen);
static void dump_stack(lua_State *L);
static void mcp_queue_io(conn *c, mc_resp *resp, int coro_ref, lua_State *Lc);
static mcp_request_t *mcp_new_request(lua_State *L, mcp_parser_t *pr, const char *command, size_t cmdlen);
static void mcp_request_attach(lua_State *L, mcp_request_t *rq, io_pending_proxy_t *p);
static int mcplib_await_run(conn *c, lua_State *L, int coro_ref);
static int mcplib_await_return(io_pending_proxy_t *p);
static void proxy_backend_handler(const int fd, const short which, void *arg);
static void proxy_event_handler(evutil_socket_t fd, short which, void *arg);
static void proxy_event_updater(evutil_socket_t fd, short which, void *arg);
static void *proxy_event_thread(void *arg);
static void proxy_out_errstring(mc_resp *resp, const char *str);
static int _flush_pending_write(mcp_backend_t *be);
static int _reset_bad_backend(mcp_backend_t *be);
static void _set_event(mcp_backend_t *be, struct event_base *base, int flags, struct timeval t, event_callback_fn callback);
static int proxy_thread_loadconf(LIBEVENT_THREAD *thr);
static int proxy_backend_drive_machine(mcp_backend_t *be, int bread, char **rbuf, size_t *toread);
static void proxy_lua_error(lua_State *L, const char *s);
static void proxy_lua_ferror(lua_State *L, const char *fmt, ...);
/******** EXTERNAL FUNCTIONS ******/
// functions starting with _ are breakouts for the public functions.
// see also: process_extstore_stats()
// FIXME (v2): get context off of conn? global variables
void proxy_stats(ADD_STAT add_stats, conn *c) {
if (!settings.proxy_enabled) {
return;
}
proxy_ctx_t *ctx = settings.proxy_ctx;
STAT_L(ctx);
APPEND_STAT("proxy_config_reloads", "%llu", (unsigned long long)ctx->global_stats.config_reloads);
APPEND_STAT("proxy_config_reload_fails", "%llu", (unsigned long long)ctx->global_stats.config_reload_fails);
APPEND_STAT("proxy_backend_total", "%llu", (unsigned long long)ctx->global_stats.backend_total);
APPEND_STAT("proxy_backend_marked_bad", "%llu", (unsigned long long)ctx->global_stats.backend_marked_bad);
APPEND_STAT("proxy_backend_failed", "%llu", (unsigned long long)ctx->global_stats.backend_failed);
STAT_UL(ctx);
}
void process_proxy_stats(ADD_STAT add_stats, conn *c) {
char key_str[STAT_KEY_LEN];
if (!settings.proxy_enabled) {
return;
}
proxy_ctx_t *ctx = settings.proxy_ctx;
STAT_L(ctx);
// prepare aggregated counters.
struct proxy_user_stats *us = &ctx->user_stats;
uint64_t counters[us->num_stats];
memset(counters, 0, sizeof(counters));
// aggregate worker thread counters.
for (int x = 0; x < settings.num_threads; x++) {
LIBEVENT_THREAD *t = get_worker_thread(x);
struct proxy_user_stats *tus = t->proxy_stats;
WSTAT_L(t);
for (int i = 0; i < tus->num_stats; i++) {
counters[i] += tus->counters[i];
}
WSTAT_UL(t);
}
// return all of the stats
for (int x = 0; x < us->num_stats; x++) {
snprintf(key_str, STAT_KEY_LEN-1, "user_%s", us->names[x]);
APPEND_STAT(key_str, "%llu", (unsigned long long)counters[x]);
}
STAT_UL(ctx);
}
struct _dumpbuf {
size_t size;
size_t used;
char *buf;
};
static int _dump_helper(lua_State *L, const void *p, size_t sz, void *ud) {
(void)L;
struct _dumpbuf *db = ud;
if (db->used + sz > db->size) {
db->size *= 2;
char *nb = realloc(db->buf, db->size);
if (nb == NULL) {
return -1;
}
db->buf = nb;
}
memcpy(db->buf + db->used, (const char *)p, sz);
db->used += sz;
return 0;
}
static const char * _load_helper(lua_State *L, void *data, size_t *size) {
(void)L;
struct _dumpbuf *db = data;
if (db->used == 0) {
*size = 0;
return NULL;
}
*size = db->used;
db->used = 0;
return db->buf;
}
void proxy_start_reload(void *arg) {
proxy_ctx_t *ctx = arg;
if (pthread_mutex_trylock(&ctx->config_lock) == 0) {
pthread_cond_signal(&ctx->config_cond);
pthread_mutex_unlock(&ctx->config_lock);
}
}
// Manages a queue of inbound objects destined to be deallocated.
static void *_proxy_manager_thread(void *arg) {
proxy_ctx_t *ctx = arg;
pool_head_t head;
pthread_mutex_lock(&ctx->manager_lock);
while (1) {
STAILQ_INIT(&head);
while (STAILQ_EMPTY(&ctx->manager_head)) {
pthread_cond_wait(&ctx->manager_cond, &ctx->manager_lock);
}
// pull dealloc queue into local queue.
STAILQ_CONCAT(&head, &ctx->manager_head);
pthread_mutex_unlock(&ctx->manager_lock);
// Config lock is required for using config VM.
pthread_mutex_lock(&ctx->config_lock);
lua_State *L = ctx->proxy_state;
mcp_pool_t *p;
STAILQ_FOREACH(p, &head, next) {
// we let the pool object _gc() handle backend references.
luaL_unref(L, LUA_REGISTRYINDEX, p->phc_ref);
// need to... unref self.
// NOTE: double check if we really need to self-reference.
// this is a backup here to ensure the external refcounts hit zero
// before lua garbage collects the object. other things hold a
// reference to the object though.
luaL_unref(L, LUA_REGISTRYINDEX, p->self_ref);
}
pthread_mutex_unlock(&ctx->config_lock);
// done.
pthread_mutex_lock(&ctx->manager_lock);
}
return NULL;
}
// Thread handling the configuration reload sequence.
// TODO (v2): get a logger instance.
// TODO (v2): making this "safer" will require a few phases of work.
// 1) JFDI
// 2) "test VM" -> from config thread, test the worker reload portion.
// 3) "unit testing" -> from same temporary worker VM, execute set of
// integration tests that must pass.
// 4) run update on each worker, collecting new mcp.attach() hooks.
// Once every worker has successfully executed and set new hooks, roll
// through a _second_ time to actually swap the hook structures and unref
// the old structures where marked dirty.
static void *_proxy_config_thread(void *arg) {
proxy_ctx_t *ctx = arg;
logger_create();
pthread_mutex_lock(&ctx->config_lock);
while (1) {
pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
STAT_INCR(ctx, config_reloads, 1);
lua_State *L = ctx->proxy_state;
lua_settop(L, 0); // clear off any crud that could have been left on the stack.
// The main stages of config reload are:
// - load and execute the config file
// - run mcp_config_pools()
// - for each worker:
// - copy and execute new lua code
// - copy selector table
// - run mcp_config_routes()
if (proxy_load_config(ctx) != 0) {
// Failed to load. log and wait for a retry.
STAT_INCR(ctx, config_reload_fails, 1);
LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
continue;
}
// TODO (v2): create a temporary VM to test-load the worker code into.
// failing to load partway through the worker VM reloads can be
// critically bad if we're not careful about references.
// IE: the config VM _must_ hold references to selectors and backends
// as long as they exist in any worker for any reason.
for (int x = 0; x < settings.num_threads; x++) {
LIBEVENT_THREAD *thr = get_worker_thread(x);
pthread_mutex_lock(&ctx->worker_lock);
ctx->worker_done = false;
ctx->worker_failed = false;
proxy_reload_notify(thr);
while (!ctx->worker_done) {
// in case of spurious wakeup.
pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock);
}
pthread_mutex_unlock(&ctx->worker_lock);
// Code load bailed.
if (ctx->worker_failed) {
STAT_INCR(ctx, config_reload_fails, 1);
LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
continue;
}
}
LOGGER_LOG(NULL, LOG_SYSEVENTS, LOGGER_PROXY_CONFIG, NULL, "done");
}
return NULL;
}
static int _start_proxy_config_threads(proxy_ctx_t *ctx) {
int ret;
pthread_mutex_lock(&ctx->config_lock);
if ((ret = pthread_create(&ctx->config_tid, NULL,
_proxy_config_thread, ctx)) != 0) {
fprintf(stderr, "Failed to start proxy configuration thread: %s\n",
strerror(ret));
pthread_mutex_unlock(&ctx->config_lock);
return -1;
}
pthread_mutex_unlock(&ctx->config_lock);
pthread_mutex_lock(&ctx->manager_lock);
if ((ret = pthread_create(&ctx->manager_tid, NULL,
_proxy_manager_thread, ctx)) != 0) {
fprintf(stderr, "Failed to start proxy configuration thread: %s\n",
strerror(ret));
pthread_mutex_unlock(&ctx->manager_lock);
return -1;
}
pthread_mutex_unlock(&ctx->manager_lock);
return 0;
}
// TODO (v2): IORING_SETUP_ATTACH_WQ port from bench_event once we have multiple
// event threads.
static void _proxy_init_evthread_events(proxy_event_thread_t *t) {
#ifdef HAVE_LIBURING
bool use_uring = t->ctx->use_uring;
struct io_uring_params p = {0};
assert(t->event_fd); // uring only exists where eventfd also does.
// Setup the CQSIZE to be much larger than SQ size, since backpressure
// issues can cause us to block on SQ submissions and as a network server,
// stuff happens.
if (use_uring) {
p.flags = IORING_SETUP_CQSIZE;
p.cq_entries = PRING_QUEUE_CQ_ENTRIES;
int ret = io_uring_queue_init_params(PRING_QUEUE_SQ_ENTRIES, &t->ring, &p);
if (ret) {
perror("io_uring_queue_init_params");
exit(1);
}
if (!(p.features & IORING_FEAT_NODROP)) {
fprintf(stderr, "uring: kernel missing IORING_FEAT_NODROP, using libevent\n");
use_uring = false;
}
if (!(p.features & IORING_FEAT_SINGLE_MMAP)) {
fprintf(stderr, "uring: kernel missing IORING_FEAT_SINGLE_MMAP, using libevent\n");
use_uring = false;
}
if (!(p.features & IORING_FEAT_FAST_POLL)) {
fprintf(stderr, "uring: kernel missing IORING_FEAT_FAST_POLL, using libevent\n");
use_uring = false;
}
if (use_uring) {
// FIXME (v2): Sigh. we need a blocking event_fd for io_uring but we've a
// chicken and egg in here. need a better structure... in meantime
// re-create the event_fd.
close(t->event_fd);
t->event_fd = eventfd(0, 0);
// FIXME (v2): hack for event init.
t->ur_notify_event.set = false;
_proxy_evthr_evset_notifier(t);
// periodic data updater for event thread
t->ur_clock_event.cb = proxy_event_updater_ur;
t->ur_clock_event.udata = t;
t->ur_clock_event.set = false;
_proxy_evthr_evset_clock(t);
t->use_uring = true;
return;
} else {
// Decided to not use io_uring, so don't waste memory.
t->use_uring = false;
io_uring_queue_exit(&t->ring);
}
} else {
t->use_uring = false;
}
#endif
struct event_config *ev_config;
ev_config = event_config_new();
event_config_set_flag(ev_config, EVENT_BASE_FLAG_NOLOCK);
t->base = event_base_new_with_config(ev_config);
event_config_free(ev_config);
if (! t->base) {
fprintf(stderr, "Can't allocate event base\n");
exit(1);
}
// listen for notifications.
// NULL was thread_libevent_process
// FIXME (v2): use modern format? (event_assign)
#ifdef USE_EVENTFD
event_set(&t->notify_event, t->event_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
#else
event_set(&t->notify_event, t->notify_receive_fd,
EV_READ | EV_PERSIST, proxy_event_handler, t);
#endif
evtimer_set(&t->clock_event, proxy_event_updater, t);
event_base_set(t->base, &t->clock_event);
struct timeval rate = {.tv_sec = 3, .tv_usec = 0};
evtimer_add(&t->clock_event, &rate);
event_base_set(t->base, &t->notify_event);
if (event_add(&t->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
}
// start the centralized lua state and config thread.
// TODO (v2): return ctx ptr. avoid global vars.
void proxy_init(bool use_uring) {
proxy_ctx_t *ctx = calloc(1, sizeof(proxy_ctx_t));
settings.proxy_ctx = ctx;
ctx->use_uring = use_uring;
pthread_mutex_init(&ctx->config_lock, NULL);
pthread_cond_init(&ctx->config_cond, NULL);
pthread_mutex_init(&ctx->worker_lock, NULL);
pthread_cond_init(&ctx->worker_cond, NULL);
pthread_mutex_init(&ctx->manager_lock, NULL);
pthread_cond_init(&ctx->manager_cond, NULL);
pthread_mutex_init(&ctx->stats_lock, NULL);
// FIXME (v2): default defines.
ctx->tunables.backend_failure_limit = 3;
ctx->tunables.connect.tv_sec = 5;
ctx->tunables.retry.tv_sec = 3;
ctx->tunables.read.tv_sec = 3;
#ifdef HAVE_LIBURING
ctx->tunables.connect_ur.tv_sec = 5;
ctx->tunables.retry_ur.tv_sec = 3;
ctx->tunables.read_ur.tv_sec = 3;
#endif // HAVE_LIBURING
STAILQ_INIT(&ctx->manager_head);
lua_State *L = luaL_newstate();
ctx->proxy_state = L;
luaL_openlibs(L);
// NOTE: might need to differentiate the libs yes?
proxy_register_libs(NULL, L);
// Create/start the backend threads, which we need before servers
// start getting created.
// Supporting N event threads should be possible, but it will be a
// low number of N to avoid too many wakeup syscalls.
// For now we hardcode to 1.
proxy_event_thread_t *threads = calloc(1, sizeof(proxy_event_thread_t));
ctx->proxy_threads = threads;
for (int i = 0; i < 1; i++) {
proxy_event_thread_t *t = &threads[i];
t->ctx = ctx;
#ifdef USE_EVENTFD
t->event_fd = eventfd(0, EFD_NONBLOCK);
if (t->event_fd == -1) {
perror("failed to create backend notify eventfd");
exit(1);
}
#else
int fds[2];
if (pipe(fds)) {
perror("can't create proxy backend notify pipe");
exit(1);
}
t->notify_receive_fd = fds[0];
t->notify_send_fd = fds[1];
#endif
_proxy_init_evthread_events(t);
// incoming request queue.
STAILQ_INIT(&t->io_head_in);
pthread_mutex_init(&t->mutex, NULL);
pthread_cond_init(&t->cond, NULL);
memcpy(&t->tunables, &ctx->tunables, sizeof(t->tunables));
#ifdef HAVE_LIBURING
if (t->use_uring) {
pthread_create(&t->thread_id, NULL, proxy_event_thread_ur, t);
} else {
pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
}
#else
pthread_create(&t->thread_id, NULL, proxy_event_thread, t);
#endif // HAVE_LIBURING
}
_start_proxy_config_threads(ctx);
}
int proxy_load_config(void *arg) {
proxy_ctx_t *ctx = arg;
lua_State *L = ctx->proxy_state;
int res = luaL_loadfile(L, settings.proxy_startfile);
if (res != LUA_OK) {
fprintf(stderr, "ERROR: Failed to load proxy_startfile: %s\n", lua_tostring(L, -1));
return -1;
}
// LUA_OK, LUA_ERRSYNTAX, LUA_ERRMEM, LUA_ERRFILE
// Now we need to dump the compiled code into bytecode.
// This will then get loaded into worker threads.
struct _dumpbuf *db = malloc(sizeof(struct _dumpbuf));
db->size = 16384;
db->used = 0;
db->buf = malloc(db->size);
lua_dump(L, _dump_helper, db, 0);
// 0 means no error.
ctx->proxy_code = db;
// now we complete the data load by calling the function.
res = lua_pcall(L, 0, LUA_MULTRET, 0);
if (res != LUA_OK) {
fprintf(stderr, "ERROR: Failed to load data into lua config state: %s\n", lua_tostring(L, -1));
exit(EXIT_FAILURE);
}
// call the mcp_config_pools function to get the central backends.
lua_getglobal(L, "mcp_config_pools");
if (lua_isnil(L, -1)) {
fprintf(stderr, "ERROR: Configuration file missing 'mcp_config_pools' function\n");
exit(EXIT_FAILURE);
}
lua_pushnil(L); // no "old" config yet.
if (lua_pcall(L, 1, 1, 0) != LUA_OK) {
fprintf(stderr, "ERROR: Failed to execute mcp_config_pools: %s\n", lua_tostring(L, -1));
exit(EXIT_FAILURE);
}
// result is our main config.
return 0;
}
static int _copy_pool(lua_State *from, lua_State *to) {
// from, -3 should have he userdata.
mcp_pool_t *p = luaL_checkudata(from, -3, "mcp.pool");
size_t size = sizeof(mcp_pool_proxy_t);
mcp_pool_proxy_t *pp = lua_newuserdatauv(to, size, 0);
luaL_setmetatable(to, "mcp.pool_proxy");
pp->main = p;
pthread_mutex_lock(&p->lock);
p->refcount++;
pthread_mutex_unlock(&p->lock);
return 0;
}
static void _copy_config_table(lua_State *from, lua_State *to);
// (from, -1) is the source value
// should end with (to, -1) being the new value.
static void _copy_config_table(lua_State *from, lua_State *to) {
int type = lua_type(from, -1);
bool found = false;
luaL_checkstack(from, 4, "configuration error: table recursion too deep");
luaL_checkstack(to, 4, "configuration error: table recursion too deep");
switch (type) {
case LUA_TNIL:
lua_pushnil(to);
break;
case LUA_TUSERDATA:
// see dump_stack() - check if it's something we handle.
if (lua_getmetatable(from, -1) != 0) {
lua_pushstring(from, "__name");
if (lua_rawget(from, -2) != LUA_TNIL) {
const char *name = lua_tostring(from, -1);
if (strcmp(name, "mcp.pool") == 0) {
_copy_pool(from, to);
found = true;
}
}
lua_pop(from, 2);
}
if (!found) {
proxy_lua_ferror(from, "unhandled userdata type in configuration table\n");
}
break;
case LUA_TNUMBER:
if (lua_isinteger(from, -1)) {
lua_pushinteger(to, lua_tointeger(from, -1));
} else {
lua_pushnumber(to, lua_tonumber(from, -1));
}
break;
case LUA_TSTRING:
lua_pushlstring(to, lua_tostring(from, -1), lua_rawlen(from, -1));
break;
case LUA_TTABLE:
// TODO (v2): copy the metatable first?
// TODO (v2): size narr/nrec from old table and use createtable to
// pre-allocate.
lua_newtable(to); // throw new table on worker
int t = lua_absindex(from, -1); // static index of table to copy.
int nt = lua_absindex(to, -1); // static index of new table.
lua_pushnil(from); // start iterator for main
while (lua_next(from, t) != 0) {
// (key, -2), (val, -1)
int keytype = lua_type(from, -2);
// to intentionally limit complexity and allow for future
// optimizations we restrict what types may be used as keys
// for sub-tables.
switch (keytype) {
case LUA_TSTRING:
// to[l]string converts the actual key in the table
// into a string, so we must not do that unless it
// already is one.
lua_pushlstring(to, lua_tostring(from, -2), lua_rawlen(from, -2));
break;
case LUA_TNUMBER:
if (lua_isinteger(from, -1)) {
lua_pushinteger(to, lua_tointeger(from, -1));
} else {
lua_pushnumber(to, lua_tonumber(from, -1));
}
break;
default:
proxy_lua_error(from, "configuration table keys must be strings or numbers");
}
// lua_settable(to, n) - n being the table
// takes -2 key -1 value, pops both.
// use lua_absindex(L, -1) and so to convert easier?
_copy_config_table(from, to); // push next value.
lua_settable(to, nt);
lua_pop(from, 1); // drop value, keep key.
}
// top of from is now the original table.
// top of to should be the new table.
break;
default:
proxy_lua_error(from, "unhandled data type in configuration table\n");
}
}
// Run from proxy worker to coordinate code reload.
// config_lock must be held first.
void proxy_worker_reload(void *arg, LIBEVENT_THREAD *thr) {
proxy_ctx_t *ctx = arg;
pthread_mutex_lock(&ctx->worker_lock);
if (proxy_thread_loadconf(thr) != 0) {
ctx->worker_failed = true;
}
ctx->worker_done = true;
pthread_cond_signal(&ctx->worker_cond);
pthread_mutex_unlock(&ctx->worker_lock);
}
// FIXME (v2): need to test how to recover from an actual error here. error message
// needs to go somewhere useful, counters added, etc.
static int proxy_thread_loadconf(LIBEVENT_THREAD *thr) {