-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool_lobj.c
242 lines (204 loc) · 6.86 KB
/
pool_lobj.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
/* -*-pgsql-c-*- */
/*
* $Header$
*
* pgpool: a language independent connection pool server for PostgreSQL
* written by Tatsuo Ishii
*
* Copyright (c) 2003-2010 PgPool Global Development Group
*
* Permission to use, copy, modify, and distribute this software and
* its documentation for any purpose and without fee is hereby
* granted, provided that the above copyright notice appear in all
* copies and that both that copyright notice and this permission
* notice appear in supporting documentation, and that the name of the
* author not be used in advertising or publicity pertaining to
* distribution of the software without specific, written prior
* permission. The author makes no representations about the
* suitability of this software for any purpose. It is provided "as
* is" without express or implied warranty.
*
* pool_lobj.c: Transparently translate lo_creat call to lo_create so
* that large objects replicated safely.
* lo_create anyway.
*/
#include "config.h"
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <netinet/in.h>
#include "pool.h"
#include "pool_lobj.h"
#include "pool_relcache.h"
#include "pool_config.h"
/*
* Rewrite lo_creat call to lo_create call if:
* 1) it's a lo_creat function call
* 2) PostgreSQL has lo_create
* 3) In replication mode
* 4) lobj_lock_table exists and writable to everyone
*
* The argument for lo_create is created by fetching max(loid)+1 from
* pg_largeobject. To avoid race condition, we lock lobj_lock_table.
*
* Caller should call this only if protocol is V3 or higher(for
* now. There's no reason for this function not working with V2
* protocol). Return value is a rewritten packet without kind and
* length. This is allocated in a static memory. New packet length is
* set to *len.
*/
char *pool_rewrite_lo_creat(char kind, char *packet, int packet_len,
POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend, int* len)
{
#define LO_CREAT_OID_QUERY "SELECT oid FROM pg_catalog.pg_proc WHERE proname = 'lo_creat' and pronamespace = (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = 'pg_catalog')"
#define LO_CREATE_OID_QUERY "SELECT oid FROM pg_catalog.pg_proc WHERE proname = 'lo_create' and pronamespace = (SELECT oid FROM pg_catalog.pg_namespace WHERE nspname = 'pg_catalog')"
#define LO_CREATE_PACKET_LENGTH sizeof(int32)*3+sizeof(int16)*4
#define LOCK_QUERY "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE"
#define GET_MAX_LOBJ_KEY "SELECT coalesce(max(loid)::INTEGER, 0)+1 FROM pg_catalog.pg_largeobject"
static char rewritten_packet[LO_CREATE_PACKET_LENGTH];
static POOL_RELCACHE *relcache_lo_creat;
static POOL_RELCACHE *relcache_lo_create;
int lo_creat_oid;
int lo_create_oid;
int orig_fcall_oid;
POOL_STATUS status;
char qbuf[1024];
char *p;
POOL_SELECT_RESULT *result;
int lobjid;
int32 int32val;
int16 int16val;
int16 result_format_code;
if (kind != 'F')
return NULL; /* not function call */
if (!strcmp(pool_config->lobj_lock_table,""))
return NULL; /* no lock table */
if (!REPLICATION)
return NULL; /* not in replication mode */
/*
* If relcache does not exist, create it.
*/
if (!relcache_lo_creat)
{
relcache_lo_creat = pool_create_relcache(1, LO_CREAT_OID_QUERY,
int_register_func, int_unregister_func,
false);
if (relcache_lo_creat == NULL)
{
pool_error("pool_check_lo_creat: pool_create_relcache error");
return NULL;
}
}
/*
* Get lo_creat oid
*/
lo_creat_oid = (int)(intptr_t)pool_search_relcache(relcache_lo_creat, backend, "pg_proc");
memmove(&orig_fcall_oid, packet, sizeof(int32));
orig_fcall_oid = ntohl(orig_fcall_oid);
pool_debug("orig_fcall_oid:% d lo_creat_oid: %d", orig_fcall_oid, lo_creat_oid);
/*
* This function call is calling lo_creat? */
if (orig_fcall_oid != lo_creat_oid)
return NULL;
/*
* If relcache does not exist, create it.
*/
if (!relcache_lo_create)
{
relcache_lo_create = pool_create_relcache(1, LO_CREATE_OID_QUERY,
int_register_func, int_unregister_func,
false);
if (relcache_lo_create == NULL)
{
pool_error("pool_check_lo_creat: pool_create_relcache error");
return NULL;
}
}
/*
* Get lo_create oid
*/
lo_create_oid = (int)(intptr_t)pool_search_relcache(relcache_lo_create, backend, "pg_proc");
pool_debug("pool_check_lo_creat: lo_creat_oid: %d lo_create_oid: %d",
lo_creat_oid, lo_create_oid);
/*
* Parse input packet
*/
memmove(&int16val, packet+packet_len-sizeof(int16), sizeof(int16));
result_format_code = ntohs(int16val);
/* sanity check */
if (result_format_code != 0 && result_format_code != 1)
{
pool_error("pool_rewrite_lo_creat: wrong return format code: %d", int16val);
return NULL;
}
pool_debug("pool_rewrite_lo_creat: return format code: %d", int16val);
/*
* Ok, do it...
*/
/* issue lock table command to lob_lock_table */
snprintf(qbuf, sizeof(qbuf), "LOCK TABLE %s IN SHARE ROW EXCLUSIVE MODE", pool_config->lobj_lock_table);
per_node_statement_log(backend, MASTER_NODE_ID, qbuf);
status = do_command(frontend, MASTER(backend), qbuf, MAJOR(backend), MASTER_CONNECTION(backend)->pid,
MASTER_CONNECTION(backend)->key, 0);
if (status == POOL_END)
{
pool_error("pool_rewrite_lo_creat: failed to execute LOCK");
return NULL;
}
/*
* If transaction state is E, do_command failed to execute command
*/
if (TSTATE(backend, MASTER_NODE_ID) == 'E')
{
pool_log("pool_check_lo_creat: failed to execute: %s", qbuf);
return NULL;
}
/* get max lobj id */
per_node_statement_log(backend, MASTER_NODE_ID, GET_MAX_LOBJ_KEY);
status = do_query(MASTER(backend), GET_MAX_LOBJ_KEY, &result, MAJOR(backend));
if (status == POOL_END)
{
pool_error("pool_rewrite_lo_creat: do_query failed");
return NULL;
}
if (!result)
{
pool_log("pool_check_lo_creat: failed to execute: %s", GET_MAX_LOBJ_KEY);
return NULL;
}
lobjid = atoi(result->data[0]);
pool_debug("lobjid:%d", lobjid);
free_select_result(result);
/* sanity check */
if (lobjid <= 0)
{
pool_error("pool_rewrite_lo_creat: wrong lob id: %d", lobjid);
return NULL;
}
/*
* Create lo_create call packet
*/
p = rewritten_packet;
*len = LO_CREATE_PACKET_LENGTH;
int32val = htonl(lo_create_oid);
memmove(p,&int32val, sizeof(int32)); /* lo_create oid */
p += sizeof(int32);
int16val = htons(1);
memmove(p,&int16val, sizeof(int16)); /* number of argument format code */
p += sizeof(int16);
int16val = htons(1);
memmove(p,&int16val, sizeof(int16)); /* format code */
p += sizeof(int16);
int16val = htons(1);
memmove(p,&int16val, sizeof(int16)); /* number of arguments */
p += sizeof(int16);
int32val = htonl(4);
memmove(p,&int32val, sizeof(int32)); /* argument length */
p += sizeof(int32);
int32val = htonl(lobjid);
memmove(p,&int32val, sizeof(int32)); /* argument(lobj id) */
p += sizeof(int32);
int16val = htons(result_format_code);
memmove(p,&int16val, sizeof(int16)); /* result format code */
return rewritten_packet;
}