forked from SpComb/evsql
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore.c
968 lines (751 loc) · 25.2 KB
/
core.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
#define _GNU_SOURCE
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include "internal.h"
#include "lib/log.h"
#include "lib/error.h"
#include "lib/misc.h"
/*
* A couple function prototypes
*/
static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn);
/*
* Actually execute the given query.
*
* The backend should be able to accept the query at this time.
*
* You should assume that if trying to execute a query fails, then the connection should also be considred as failed.
*/
static int _evsql_query_exec (struct evsql_conn *conn, struct evsql_query *query, const char *command) {
int err;
DEBUG("evsql.%p: exec query=%p on trans=%p on conn=%p:", conn->evsql, query, conn->trans, conn);
switch (conn->evsql->type) {
case EVSQL_EVPQ:
// got params?
if (query->params.count) {
err = evpq_query_params(conn->engine.evpq, command,
query->params.count,
query->params.types,
query->params.values,
query->params.lengths,
query->params.formats,
query->params.result_format
);
} else {
// plain 'ole query
err = evpq_query(conn->engine.evpq, command);
}
if (err) {
if (PQstatus(evpq_pgconn(conn->engine.evpq)) != CONNECTION_OK)
WARNING("conn failed");
else
WARNING("query failed, dropping conn as well");
}
break;
default:
FATAL("evsql->type");
}
if (!err)
// assign the query
conn->query = query;
return err;
}
void _evsql_query_free (struct evsql_query *query) {
if (!query)
return;
assert(query->command == NULL);
// free params if present
free(query->params.types);
free(query->params.values);
free(query->params.lengths);
free(query->params.formats);
free(query->params.item_vals);
// free the query itself
free(query);
}
/*
* Execute the callback if res is given, and free the query.
*
* The query has been aborted, it will simply be freed
*/
static void _evsql_query_done (struct evsql_query *query, struct evsql_result *res) {
if (res) {
if (query->cb_fn) {
// call the callback
query->cb_fn(res, query->cb_arg);
} else {
WARNING("supressing cb_fn because query was aborted");
// free the results
evsql_result_free(res);
}
}
// free
_evsql_query_free(query);
}
/*
* XXX:
* /
static void _evsql_destroy (struct evsql *evsql, const struct evsql_result *res) {
struct evsql_query *query;
// clear the queue
while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
_evsql_query_done(query, res);
TAILQ_REMOVE(&evsql->query_queue, query, entry);
}
// free
free(evsql);
}
*/
/*
* Free the transaction, it should already be deassociated from the query and conn.
*/
static void _evsql_trans_free (struct evsql_trans *trans) {
// ensure we don't leak anything
assert(trans->query == NULL);
assert(trans->conn == NULL);
// free
free(trans);
}
/*
* Release a connection. It should already be deassociated from the trans and query.
*
* Releases the engine, removes from the conn_list and frees this.
*/
static void _evsql_conn_release (struct evsql_conn *conn) {
// ensure we don't leak anything
assert(conn->trans == NULL);
assert(conn->query == NULL);
// release the engine
switch (conn->evsql->type) {
case EVSQL_EVPQ:
evpq_release(conn->engine.evpq);
break;
default:
FATAL("evsql->type");
}
// remove from list
LIST_REMOVE(conn, entry);
// catch deadlocks
// XXX: still just assert?
assert(!LIST_EMPTY(&conn->evsql->conn_list) || TAILQ_EMPTY(&conn->evsql->query_queue));
// free
free(conn);
}
/*
* Release a transaction, it should already be deassociated from the query.
*
* Perform a two-way-deassociation with the conn, and then free the trans.
*/
static void _evsql_trans_release (struct evsql_trans *trans) {
assert(trans->query == NULL);
assert(trans->conn != NULL);
// deassociate the conn
trans->conn->trans = NULL; trans->conn = NULL;
// free the trans
_evsql_trans_free(trans);
}
/*
* Fail a single query, this will trigger the callback and free it.
*
* NOTE: Only for *TRANSACTIONLESS* queries.
*/
static void _evsql_query_fail (struct evsql* evsql, struct evsql_query *query) {
struct evsql_result res; ZINIT(res);
// set up the result_info
res.evsql = evsql;
res.error = 1;
// finish off the query
_evsql_query_done(query, &res);
}
/*
* Fail a transaction, this will silently drop any query, trigger the error callback, two-way-deassociate/release the
* conn, and then free the trans.
*/
static void _evsql_trans_fail (struct evsql_trans *trans) {
if (trans->query) {
// free the query silently
_evsql_query_free(trans->query); trans->query = NULL;
// also deassociate it from the conn!
trans->conn->query = NULL;
}
// tell the user
// XXX: trans is in a bad state during this call
if (trans->error_fn)
trans->error_fn(trans, trans->cb_arg);
else
WARNING("supressing error because error_fn was NULL");
// deassociate and release the conn
trans->conn->trans = NULL; _evsql_conn_release(trans->conn); trans->conn = NULL;
// pump the queue for requests that were waiting for this connection
_evsql_pump(trans->evsql, NULL);
// free the trans
_evsql_trans_free(trans);
}
/*
* Fail a connection. If the connection is transactional, this will just call _evsql_trans_fail, but otherwise it will
* fail any ongoing query, and then release the connection.
*/
static void _evsql_conn_fail (struct evsql_conn *conn) {
if (conn->trans) {
// let transactions handle their connection failures
_evsql_trans_fail(conn->trans);
} else {
if (conn->query) {
// fail the in-progress query
_evsql_query_fail(conn->evsql, conn->query); conn->query = NULL;
}
// finish off the whole connection
_evsql_conn_release(conn);
}
}
/*
* Processes enqueued non-transactional queries until the queue is empty, or we managed to exec a query.
*
* If execing a query on a connection fails, both the query and the connection are failed (in that order).
*
* Any further queries will then also be failed, because there's no reconnection/retry logic yet.
*
* This means that if conn is NULL, all queries are failed.
*/
static void _evsql_pump (struct evsql *evsql, struct evsql_conn *conn) {
struct evsql_query *query;
int err;
// look for waiting queries
while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
// zero err
err = 0;
// dequeue
TAILQ_REMOVE(&evsql->query_queue, query, entry);
if (conn) {
// try and execute it
err = _evsql_query_exec(conn, query, query->command);
}
// free the command buf
free(query->command); query->command = NULL;
if (err || !conn) {
if (!conn) {
// warn when dropping queries
WARNING("failing query becuse there are no conns");
}
// fail the query
_evsql_query_fail(evsql, query);
if (conn) {
// fail the connection
WARNING("failing the connection because a query-exec failed");
_evsql_conn_fail(conn); conn = NULL;
}
} else {
// we have succesfully enqueued a query, and we can wait for this connection to complete
break;
}
// handle the rest of the queue
}
// ok
return;
}
/*
* Callback for a trans's 'BEGIN' query, which means the transaction is now ready for use.
*/
static void _evsql_trans_ready (struct evsql_result *res, void *arg) {
struct evsql_trans *trans = arg;
assert(trans != NULL);
// check for errors
if (res->error)
ERROR("transaction 'BEGIN' failed: %s", evsql_result_error(res));
// transaction is now ready for use
trans->ready_fn(trans, trans->cb_arg);
// good
return;
error:
_evsql_trans_fail(trans);
}
/*
* The transaction's connection is ready, send the 'BEGIN' query.
*
* If anything fails, calls _evsql_trans_fail and returns nonzero, zero on success
*/
static int _evsql_trans_conn_ready (struct evsql *evsql, struct evsql_trans *trans) {
char trans_sql[EVSQL_QUERY_BEGIN_BUF];
const char *isolation_level;
int ret;
// determine the isolation_level to use
switch (trans->type) {
case EVSQL_TRANS_DEFAULT:
isolation_level = NULL; break;
case EVSQL_TRANS_SERIALIZABLE:
isolation_level = "SERIALIZABLE"; break;
case EVSQL_TRANS_REPEATABLE_READ:
isolation_level = "REPEATABLE READ"; break;
case EVSQL_TRANS_READ_COMMITTED:
isolation_level = "READ COMMITTED"; break;
case EVSQL_TRANS_READ_UNCOMMITTED:
isolation_level = "READ UNCOMMITTED"; break;
default:
FATAL("trans->type: %d", trans->type);
}
// build the trans_sql
if (isolation_level)
ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION ISOLATION LEVEL %s", isolation_level);
else
ret = snprintf(trans_sql, EVSQL_QUERY_BEGIN_BUF, "BEGIN TRANSACTION");
// make sure it wasn't truncated
if (ret >= EVSQL_QUERY_BEGIN_BUF)
ERROR("trans_sql overflow: %d >= %d", ret, EVSQL_QUERY_BEGIN_BUF);
// execute the query
if (evsql_query(evsql, trans, trans_sql, _evsql_trans_ready, trans) == NULL)
ERROR("evsql_query");
// success
return 0;
error:
// fail the transaction
_evsql_trans_fail(trans);
return -1;
}
/*
* The evpq connection was succesfully established.
*/
static void _evsql_evpq_connected (struct evpq_conn *_conn, void *arg) {
struct evsql_conn *conn = arg;
if (conn->trans)
// notify the transaction
// don't care about errors
(void) _evsql_trans_conn_ready(conn->evsql, conn->trans);
else
// pump any waiting transactionless queries
_evsql_pump(conn->evsql, conn);
}
/*
* Got one result on this evpq connection.
*/
static void _evsql_evpq_result (struct evpq_conn *_conn, PGresult *result, void *arg) {
struct evsql_conn *conn = arg;
struct evsql_query *query = conn->query;
assert(query != NULL);
// if we get multiple results, only return the first one
if (query->result.pq) {
WARNING("[evsql] evpq query returned multiple results, discarding previous one");
PQclear(query->result.pq); query->result.pq = NULL;
}
// remember the result
query->result.pq = result;
}
/*
* No more results for this query.
*/
static void _evsql_evpq_done (struct evpq_conn *_conn, void *arg) {
struct evsql_conn *conn = arg;
struct evsql_query *query = conn->query;
struct evsql_result res; ZINIT(res);
assert(query != NULL);
// set up the result_info
res.evsql = conn->evsql;
res.result = query->result;
if (query->result.pq == NULL) {
// if a query didn't return any results (bug?), warn and fail the query
WARNING("[evsql] evpq query didn't return any results");
res.error = 1;
} else if (strcmp(PQresultErrorMessage(query->result.pq), "") != 0) {
// the query failed with some error
res.error = 1;
} else {
// the query succeeded \o/
res.error = 0;
}
// de-associate the query from the connection
conn->query = NULL;
// how we handle query completion depends on if we're a transaction or not
if (conn->trans) {
// we can deassign the trans's query
conn->trans->query = NULL;
// was an abort?
if (!query->cb_fn)
// notify the user that the transaction query has been aborted
conn->trans->ready_fn(conn->trans, conn->trans->cb_arg);
// then hand the query to the user
_evsql_query_done(query, &res);
} else {
// a transactionless query, so just finish it off and pump any other waiting ones
_evsql_query_done(query, &res);
// pump the next one
_evsql_pump(conn->evsql, conn);
}
}
/*
* The connection failed.
*/
static void _evsql_evpq_failure (struct evpq_conn *_conn, void *arg) {
struct evsql_conn *conn = arg;
// just fail the conn
_evsql_conn_fail(conn);
}
/*
* Our evpq behaviour
*/
static struct evpq_callback_info _evsql_evpq_cb_info = {
.fn_connected = _evsql_evpq_connected,
.fn_result = _evsql_evpq_result,
.fn_done = _evsql_evpq_done,
.fn_failure = _evsql_evpq_failure,
};
/*
* Allocate the generic evsql context.
*/
static struct evsql *_evsql_new_base (struct event_base *ev_base, evsql_error_cb error_fn, void *cb_arg) {
struct evsql *evsql = NULL;
// allocate it
if ((evsql = calloc(1, sizeof(*evsql))) == NULL)
ERROR("calloc");
// store
evsql->ev_base = ev_base;
evsql->error_fn = error_fn;
evsql->cb_arg = cb_arg;
// init
LIST_INIT(&evsql->conn_list);
TAILQ_INIT(&evsql->query_queue);
// done
return evsql;
error:
return NULL;
}
/*
* Start a new connection and add it to the list, it won't be ready until _evsql_evpq_connected is called
*/
static struct evsql_conn *_evsql_conn_new (struct evsql *evsql) {
struct evsql_conn *conn = NULL;
// allocate
if ((conn = calloc(1, sizeof(*conn))) == NULL)
ERROR("calloc");
// init
conn->evsql = evsql;
// connect the engine
switch (evsql->type) {
case EVSQL_EVPQ:
if ((conn->engine.evpq = evpq_connect(evsql->ev_base, evsql->engine_conf.evpq, _evsql_evpq_cb_info, conn)) == NULL)
goto error;
break;
default:
FATAL("evsql->type");
}
// add it to the list
LIST_INSERT_HEAD(&evsql->conn_list, conn, entry);
// success
return conn;
error:
free(conn);
return NULL;
}
struct evsql *evsql_new_pq (struct event_base *ev_base, const char *pq_conninfo, evsql_error_cb error_fn, void *cb_arg) {
struct evsql *evsql = NULL;
// base init
if ((evsql = _evsql_new_base (ev_base, error_fn, cb_arg)) == NULL)
goto error;
// store conf
evsql->engine_conf.evpq = pq_conninfo;
// pre-create one connection
if (_evsql_conn_new(evsql) == NULL)
goto error;
// done
return evsql;
error:
// there's no other state yet
free(evsql);
return NULL;
}
/*
* Checks if the connection is already allocated for some other trans/query.
*
* Returns:
* 0 connection idle, can be allocated
* >1 connection busy
*/
static int _evsql_conn_busy (struct evsql_conn *conn) {
// transactions get the connection to themselves
if (conn->trans)
return 1;
// if it has a query assigned, it's busy
if (conn->query)
return 1;
// otherwise, it's all idle
return 0;
}
/*
* Checks if the connection is ready for use (i.e. _evsql_evpq_connected was called).
*
* The connection should not already have a query running.
*
* Returns
* <0 the connection is not valid (failed, query in progress)
* 0 the connection is still pending, and will become ready at some point
* >0 it's ready
*/
static int _evsql_conn_ready (struct evsql_conn *conn) {
switch (conn->evsql->type) {
case EVSQL_EVPQ: {
enum evpq_state state = evpq_state(conn->engine.evpq);
switch (state) {
case EVPQ_CONNECT:
return 0;
case EVPQ_CONNECTED:
return 1;
case EVPQ_QUERY:
case EVPQ_INIT:
case EVPQ_FAILURE:
return -1;
default:
FATAL("evpq_state: %d", state);
}
}
default:
FATAL("evsql->type: %d", conn->evsql->type);
}
}
/*
* Allocate a connection for use and return it via *conn_ptr, or if may_queue is nonzero and the connection pool is
* getting full, return NULL (query should be queued).
*
* Note that the returned connection might not be ready for use yet (if we created a new one, see _evsql_conn_ready).
*
* Returns zero if a connection was found or the request should be queued, or nonzero if something failed and the
* request should be dropped.
*/
static int _evsql_conn_get (struct evsql *evsql, struct evsql_conn **conn_ptr, int may_queue) {
int have_nontrans = 0;
*conn_ptr = NULL;
// find a connection that isn't busy and is ready (unless the query queue is empty).
LIST_FOREACH(*conn_ptr, &evsql->conn_list, entry) {
// we can only have a query enqueue itself if there is a non-trans conn it can later use
if (!(*conn_ptr)->trans)
have_nontrans = 1;
// skip busy conns always
if (_evsql_conn_busy(*conn_ptr))
continue;
// accept pending conns as long as there are NO enqueued queries (might cause deadlock otherwise)
if (_evsql_conn_ready(*conn_ptr) == 0 && TAILQ_EMPTY(&evsql->query_queue))
break;
// accept conns that are in a fully ready state
if (_evsql_conn_ready(*conn_ptr) > 0)
break;
}
// if we found an idle connection, we can just return that right away
if (*conn_ptr)
return 0;
// return NULL if may_queue and we have a non-trans conn that we can, at some point, use
if (may_queue && have_nontrans)
return 0;
// we need to open a new connection
if ((*conn_ptr = _evsql_conn_new(evsql)) == NULL)
goto error;
// good
return 0;
error:
return -1;
}
struct evsql_trans *evsql_trans (struct evsql *evsql, enum evsql_trans_type type, evsql_trans_error_cb error_fn, evsql_trans_ready_cb ready_fn, evsql_trans_done_cb done_fn, void *cb_arg) {
struct evsql_trans *trans = NULL;
// allocate it
if ((trans = calloc(1, sizeof(*trans))) == NULL)
ERROR("calloc");
// store
trans->evsql = evsql;
trans->ready_fn = ready_fn;
trans->done_fn = done_fn;
trans->cb_arg = cb_arg;
trans->type = type;
// find a connection
if (_evsql_conn_get(evsql, &trans->conn, 0))
ERROR("_evsql_conn_get");
// associate the conn
trans->conn->trans = trans;
// is it already ready?
if (_evsql_conn_ready(trans->conn) > 0) {
// call _evsql_trans_conn_ready directly, it will handle cleanup (silently, !error_fn)
if (_evsql_trans_conn_ready(evsql, trans)) {
// return NULL directly
return NULL;
}
} else {
// otherwise, wait for the conn to be ready
}
// and let it pass errors to the user
trans->error_fn = error_fn;
// ok
return trans;
error:
free(trans);
return NULL;
}
/*
* Internal query functions
*/
struct evsql_query *_evsql_query_new (struct evsql *evsql, struct evsql_trans *trans, evsql_query_cb query_fn, void *cb_arg) {
struct evsql_query *query = NULL;
// if it's part of a trans, then make sure the trans is idle
if (trans && trans->query)
ERROR("transaction is busy");
// allocate it
if ((query = calloc(1, sizeof(*query))) == NULL)
ERROR("calloc");
// store
query->cb_fn = query_fn;
query->cb_arg = cb_arg;
// success
return query;
error:
return NULL;
}
int _evsql_query_enqueue (struct evsql *evsql, struct evsql_trans *trans, struct evsql_query *query, const char *command) {
// transaction queries are handled differently
if (trans) {
// it's an in-transaction query
assert(trans->query == NULL);
// assign the query
trans->query = query;
// execute directly
if (_evsql_query_exec(trans->conn, query, command)) {
// ack, fail the transaction
_evsql_trans_fail(trans);
// caller frees query
goto error;
}
} else {
struct evsql_conn *conn;
// find an idle connection
if ((_evsql_conn_get(evsql, &conn, 1)))
ERROR("couldn't allocate a connection for the query");
// we must enqueue if no idle conn or the conn is not yet ready
if (conn && _evsql_conn_ready(conn) > 0) {
// execute directly
if (_evsql_query_exec(conn, query, command)) {
// ack, fail the connection
_evsql_conn_fail(conn);
// make sure we don't deadlock any queries, but if this query got a conn directly, then we shouldn't
// have any queries enqueued anyways
assert(TAILQ_EMPTY(&evsql->query_queue));
// caller frees query
goto error;
}
} else {
// copy the command for later execution
if ((query->command = strdup(command)) == NULL)
ERROR("strdup");
// enqueue until some connection pumps the queue
TAILQ_INSERT_TAIL(&evsql->query_queue, query, entry);
}
}
// ok, good
return 0;
error:
return -1;
}
void _evsql_trans_commit_res (struct evsql_result *res, void *arg) {
struct evsql_trans *trans = arg;
// check for errors
if (res->error)
ERROR("transaction 'COMMIT' failed: %s", evsql_result_error(res));
// transaction is now done
trans->done_fn(trans, trans->cb_arg);
// release it
_evsql_trans_release(trans);
// success
return;
error:
_evsql_trans_fail(trans);
}
int evsql_trans_commit (struct evsql_trans *trans) {
static const char *sql = "COMMIT TRANSACTION";
if (trans->query)
ERROR("cannot COMMIT because transaction is still busy");
// query
if (evsql_query(trans->evsql, trans, sql, _evsql_trans_commit_res, trans) == NULL)
goto error;
// mark it as commited in case someone wants to abort it
trans->has_commit = 1;
// success
return 0;
error:
return -1;
}
void _evsql_trans_rollback_res (struct evsql_result *res, void *arg) {
struct evsql_trans *trans = arg;
// fail the connection on errors
if (res->error)
ERROR("transaction 'ROLLBACK' failed: %s", evsql_result_error(res));
// release it
_evsql_trans_release(trans);
// success
return;
error:
// fail the connection too, errors are supressed
_evsql_trans_fail(trans);
}
/*
* Used as the ready_fn callback in case of abort, otherwise directly
*/
void _evsql_trans_rollback (struct evsql_trans *trans, void *arg) {
static const char *sql = "ROLLBACK TRANSACTION";
(void) arg;
// query
if (evsql_query(trans->evsql, trans, sql, _evsql_trans_rollback_res, trans) == NULL) {
// fail the transaction/connection, errors are supressed
_evsql_trans_fail(trans);
}
}
void evsql_trans_abort (struct evsql_trans *trans) {
// supress errors
trans->error_fn = NULL;
if (trans->has_commit) {
// abort after commit doesn't make sense
FATAL("transaction was already commited");
}
if (trans->query) {
// gah, some query is running
WARNING("aborting pending query");
// prepare to rollback once complete by hijacking ready_fn
trans->ready_fn = _evsql_trans_rollback;
// abort
evsql_query_abort(trans, trans->query);
} else {
// just rollback directly
_evsql_trans_rollback(trans, NULL);
}
}
void evsql_destroy (struct evsql *evsql) {
struct evsql_query *query;
struct evsql_conn *conn;
// kill off all queued queries
while ((query = TAILQ_FIRST(&evsql->query_queue)) != NULL) {
// just free it, command first
free(query->command); query->command = NULL;
_evsql_query_free(query);
}
// kill off all connections
while ((conn = LIST_FIRST(&evsql->conn_list)) != NULL) {
// kill off the query
if (conn->query) {
free(query->command); query->command = NULL;
_evsql_query_free(query);
conn->query = NULL;
}
// kill off the transaction
if (conn->trans) {
conn->trans->query = NULL;
_evsql_trans_release(conn->trans);
}
// kill it off
_evsql_conn_release(conn);
}
// then free the evsql itself
free(evsql);
}
void _evsql_destroy_handler (int fd, short what, void *arg)
{
struct evsql *evsql = arg;
evsql_destroy(evsql);
}
evsql_err_t evsql_destroy_next (struct evsql *evsql)
{
struct timeval tv = {0, 0};
// schedule a one-time event
return event_base_once(evsql->ev_base, 0, EV_TIMEOUT, &_evsql_destroy_handler, evsql, &tv);
}