forked from r-dbi/RPostgres
-
Notifications
You must be signed in to change notification settings - Fork 0
/
work-queue.Rmd
272 lines (222 loc) · 8.89 KB
/
work-queue.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
---
title: "Implementing a Work Queue using RPostgres"
author: "Jamie Lentin"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{Implementing a Work Queue using RPostgres}
%\VignetteEngine{knitr::rmarkdown}
\usepackage[utf8]{inputenc}
---
```{r, echo = FALSE}
library(DBI)
knitr::opts_chunk$set(
error = (Sys.getenv("IN_PKGDOWN") != "true"),
collapse = TRUE,
comment = "#>",
eval = RPostgres::postgresHasDefault()
)
con <- NULL
rp <- NULL
rs <- NULL
```
Imagine you have an R process that is relatively intensive, based on user input.
To keep things as fast as possible, you may want to use several servers to all process incoming requests for square roots.
However, to do this you need to co-ordinate between all of your servers (or workers).
How do you decide which server works on what? What if one server dies mid-way?
To decide this, we need a work queue, also known as a job queue or task queue.
This document will show show you how to build a work queue system using R and PostgreSQL that would ordinarily require an external tool,
like [RabbitMQ](https://www.rabbitmq.com/).
In this example, our work will be generating square roots. We'll keep track of the results in a table:
```{r}
library(DBI)
con <- dbConnect(RPostgres::Postgres())
dbExecute(con, "DROP TABLE IF EXISTS sqroot_vignette_example;")
dbExecute(con, "
CREATE TABLE sqroot_vignette_example (
in_val INTEGER PRIMARY KEY,
out_val DOUBLE PRECISION NULL
)
")
```
When a client wants a square root value, it can insert a new row into a table, filling ``in_val``.
We'll then have a bunch of workers that will calculate the results for the client, and fill in ``out_val``.
To manage these workers, we will combine 2 PostgreSQL concepts:
```{r, echo = FALSE}
if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL }
if (!is.null(con)) { dbDisconnect(con) ; con <- NULL }
if (!is.null(rp)) { rp$wait() ; rp <- NULL }
```
## LISTEN / NOTIFY
The Postgres ``LISTEN`` and ``NOTIFY`` commands allow you to send and receive messages between clients connected to a PostgreSQL database.
This is known as a publish/subscribe architecture.
We tell Postgres that we are interested in receiving messages using ``LISTEN``. For example:
```{r}
con <- dbConnect(RPostgres::Postgres())
dbExecute(con, "LISTEN grapevine")
```
...in this case, "grapevine" is arbitary, we don't need to create channels ahead of time.
To make sure we have something to receive, we can start a separate R process using [callr](https://CRAN.R-project.org/package=callr).
Ordinarily this would be part of another R script, maybe on another computer.
This will wait a bit, and use ``NOTIFY`` to send a message, then finish:
```{r}
rp <- callr::r_bg(function () {
library(DBI)
Sys.sleep(0.3)
db_notify <- dbConnect(RPostgres::Postgres())
dbExecute(db_notify, "NOTIFY grapevine, 'psst'")
dbDisconnect(db_notify)
})
```
Finally, we should wait for any incoming messages. To do this, use ``postgresWaitForNotify``.
The payload will contain the message from the other R process:
```{r}
# Sleep until we get the message
n <- NULL
while (is.null(n)) {
n <- RPostgres::postgresWaitForNotify(con)
}
n$payload
```
## SKIP LOCKED
We can use LISTEN/NOTIFY to inform all workers that there is something to be done, but how do we decide which worker actually does the work?
This is done using ``SKIP LOCKED``.
We notify all workers that the input ``99`` is ready for processing.
After receiving this, they all do the following:
```{r}
rs <- dbSendQuery(con, "
SELECT in_val
FROM sqroot_vignette_example
WHERE in_val = $1
FOR UPDATE
SKIP LOCKED
", params = list(99))
```
One lucky worker will get a row back, but thanks to ``FOR UPDATE``, the row is now locked.
For any other worker, as the row is now locked, they will skip over it (``SKIP LOCKED``) and find something else to do.
If there are no other jobs available, then nothing will be returned.
Using SKIP LOCKED is discussed in more detail [in this article](https://www.2ndquadrant.com/en/blog/what-is-select-skip-locked-for-in-postgresql-9-5/).
```{r, echo = FALSE}
if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL }
if (!is.null(con)) { dbDisconnect(con) ; con <- NULL }
if (!is.null(rp)) { rp$wait() ; rp <- NULL }
```
## Implementing our worker
Now we can put the concepts together.
The following implements our worker as a function (again, this would be running as a script on several servers):
```{r}
worker <- function () {
library(DBI)
db_worker <- dbConnect(RPostgres::Postgres())
on.exit(dbDisconnect(db_worker))
dbExecute(db_worker, "LISTEN sqroot")
dbExecute(db_worker, "LISTEN sqroot_shutdown")
while(TRUE) {
# Wait for new work to do
n <- RPostgres::postgresWaitForNotify(db_worker, 60)
if (is.null(n)) {
# If nothing to do, send notifications of any not up-to-date work
dbExecute(db_worker, "
SELECT pg_notify('sqroot', in_val::TEXT)
FROM sqroot_vignette_example
WHERE out_val IS NULL
")
next
}
# If we've been told to shutdown, stop right away
if (n$channel == 'sqroot_shutdown') {
writeLines("Shutting down.")
break
}
in_val <- strtoi(n$payload)
tryCatch({
dbWithTransaction(db_worker, {
# Try and fetch the item we got notified about
rs <- dbSendQuery(db_worker, "
SELECT in_val
FROM sqroot_vignette_example
WHERE out_val IS NULL -- if another worker already finished, don't reprocess
AND in_val = $1
FOR UPDATE SKIP LOCKED -- Don't let another worker work on this at the same time
", params = list(in_val))
in_val <- dbFetch(rs)[1,1]
dbClearResult(rs)
if (!is.na(in_val)) {
# Actually do the sqrt
writeLines(paste("Sqroot-ing", in_val, "... "))
Sys.sleep(in_val * 0.1)
out_val <- sqrt(in_val)
# Update the datbase with the result
dbExecute(db_worker, "
UPDATE sqroot_vignette_example
SET out_val = $1
WHERE in_val = $2
", params = list(out_val, in_val))
} else {
writeLines(paste("Not sqroot-ing as another worker got there first"))
}
})
}, error = function (e) {
# Something went wrong. Report error and carry on
writeLines(paste("Failed to sqroot:", e$message))
})
}
}
```
The worker connects to the database, starts listening and loops indefinitely.
* First, we wait for new notifications.
* If there aren't any notifications, then we search for any old items and generate new notifications.
This allows items to be picked up again if they didn't get processed the first time around,
e.g. because there were no workers listening.
* If we got a shutdown message, stop.
* Try to grab the row for the new item, if we win, and only one worker will, then fill in the square root.
Let's use callr again to start 2 workers:
```{r}
stdout_1 <- tempfile()
stdout_2 <- tempfile()
rp <- callr::r_bg(worker, stdout = stdout_1, stderr = stdout_1)
rp <- callr::r_bg(worker, stdout = stdout_2, stderr = stdout_2)
Sys.sleep(1) # Give workers a chance to set themselves up
```
Now our client can add some values to our table and notify the workers that there's something to do:
```{r}
con <- dbConnect(RPostgres::Postgres())
add_sqroot <- function (in_val) {
dbExecute(con, "
INSERT INTO sqroot_vignette_example (in_val) VALUES ($1)
", params = list(in_val))
dbExecute(con, "
SELECT pg_notify('sqroot', $1)
", params = list(in_val))
}
add_sqroot(7)
add_sqroot(8)
add_sqroot(9)
```
...after a wait, the answers should have been populated by the workers for us:
```{r}
Sys.sleep(3)
rs <- dbSendQuery(con, "SELECT * FROM sqroot_vignette_example ORDER BY in_val")
dbFetch(rs)
dbClearResult(rs) ; rs <- NULL
```
Finally, we can use ``NOTIFY`` to stop all the workers:
```{r}
dbExecute(con, "NOTIFY sqroot_shutdown, ''")
```
And see what messages were printed as they run:
```{r}
# We can't control which worker will process the first entry,
# so we sort the results so the vignette output stays the same.
outputs <- sort(c(
paste(readLines(con = stdout_1), collapse = "\n"),
paste(readLines(con = stdout_2), collapse = "\n")))
writeLines(outputs[[1]])
writeLines(outputs[[2]])
```
Notice that the work has been shared between the 2 workers.
If these 2 weren't enough, we could happily add more to keep the system going.
```{r, echo = FALSE}
if (!is.null(rs)) { dbClearResult(rs) ; rs <- NULL }
if (!is.null(con)) { dbDisconnect(con) ; con <- NULL }
if (!is.null(rp)) { rp$wait() ; rp <- NULL }
```