Skip to content

Commit

Permalink
Add indirection for src$con (tidyverse#2013)
Browse files Browse the repository at this point in the history
This commit adds con_acquire and con_release interfaces
which dplyr can use when it needs to get a connection from
a src (instead of just src$con). This makes it possible to
support connection pools properly; without this change,
there is no way for dplyr to signal to a connection pool
that it is done with a connection.
  • Loading branch information
jcheng5 authored and krlmlr committed Dec 14, 2016
1 parent 4e27e42 commit 09d7b6d
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 44 deletions.
2 changes: 2 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ export(combine)
export(common_by)
export(compare_tbls)
export(compute)
export(con_acquire)
export(con_release)
export(contains)
export(copy_lahman)
export(copy_nycflights13)
Expand Down
11 changes: 9 additions & 2 deletions R/explain.r
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,21 @@ explain <- function(x, ...) {
explain.tbl_sql <- function(x, ...) {
force(x)
show_query(x)

con <- con_acquire(x$src)
on.exit(con_release(x$src, con), add = TRUE)

message("\n")
message("<PLAN>\n", db_explain(x$src$con, sql_render(x)))
message("<PLAN>\n", db_explain(con, sql_render(x, con = con)))

invisible(NULL)
}

#' @export
#' @rdname explain
show_query <- function(x) {
message("<SQL>\n", sql_render(x))
con <- con_acquire(x$src)
on.exit(con_release(x$src, con), add = TRUE)

message("<SQL>\n", sql_render(x, con = con))
}
4 changes: 2 additions & 2 deletions R/lazy-ops.R
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
#' @name lazy_ops
NULL

op_base_remote <- function(src, x, vars = NULL) {
op_base_remote <- function(src, x, con = NULL, vars = NULL) {
# If not literal sql, must be a table identifier
if (!is.sql(x)) {
x <- ident(x)
}

if (is.null(vars)) {
vars <- db_query_fields(src$con, x)
vars <- db_query_fields(con, x)
}
op_base("remote", src, x, vars)
}
Expand Down
2 changes: 1 addition & 1 deletion R/sql-build.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sql_build <- function(op, con, ...) {

#' @export
sql_build.tbl_sql <- function(op, con, ...) {
sql_build(op$ops, op$con, ...)
sql_build(op$ops, con, ...)
}

#' @export
Expand Down
6 changes: 5 additions & 1 deletion R/sql-render.R
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ sql_render.op <- function(query, con = NULL, ...) {

#' @export
sql_render.tbl_sql <- function(query, con = NULL, ...) {
sql_render(sql_build(query$ops, query$src$con, ...), con = query$src$con, ...)
if (is.null(con)) {
con <- con_acquire(query$src)
on.exit(con_release(query$src, con), add = TRUE)
}
sql_render(sql_build(query$ops, con, ...), con = con, ...)
}

#' @export
Expand Down
43 changes: 40 additions & 3 deletions R/src-sql.r
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,55 @@
#' @param ... fields used by object
src_sql <- function(subclass, con, ...) {
subclass <- paste0("src_", subclass)
structure(list(con = con, ...), class = c(subclass, "src_sql", "src"))
structure(list(obj = con, ...), class = c(subclass, "src_sql", "src"))
}

#' Acquire/release connections from a src object
#'
#' \code{con_acquire} gets a connection from a src object; \code{con_release}
#' returns a previously acquired connection back to its src object. Intended for
#' internal use.
#'
#' These methods have default implementations for \code{src_sql} and can be
#' overridden for src objects that are not themselves DB connections, but know
#' how to get them (e.g. a connection pool).
#'
#' @keywords internal
#' @export
#' @param src A src object (most commonly, from \code{src_sql})
#' @param con A connection
#' @return For \code{con_acquire}, a connection object; for \code{con_release},
#' nothing.
con_acquire <- function(src) {
UseMethod("con_acquire", src)
}

#' @rdname con_acquire
#' @export
con_release <- function(src, con) {
UseMethod("con_release", src)
}

con_acquire.src_sql <- function(src) {
src$obj
}

con_release.src_sql <- function(src, con) {
}


#' @export
same_src.src_sql <- function(x, y) {
if (!inherits(y, "src_sql")) return(FALSE)
identical(x$con, y$con)
identical(x$obj, y$obj)
}

#' @export
src_tbls.src_sql <- function(x, ...) {
db_list_tables(x$con)
con <- con_acquire(x)
on.exit(con_release(x, con), add = TRUE)

db_list_tables(con)
}

#' @export
Expand Down
98 changes: 63 additions & 35 deletions R/tbl-sql.r
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
#' dplyr. However, you should usually be able to leave this blank and it
#' will be determined from the context.
tbl_sql <- function(subclass, src, from, ..., vars = attr(from, "vars")) {
con <- con_acquire(src)
on.exit(con_release(src, con), add = TRUE)

make_tbl(
c(subclass, "sql", "lazy"),
src = src,
ops = op_base_remote(src, from, vars)
ops = op_base_remote(src, from, con, vars)
)
}

Expand Down Expand Up @@ -330,32 +333,44 @@ copy_to.src_sql <- function(dest, df, name = deparse(substitute(df)),
assert_that(is.data.frame(df), is.string(name), is.flag(temporary))
class(df) <- "data.frame" # avoid S4 dispatch problem in dbSendPreparedQuery

if (isTRUE(db_has_table(dest$con, name))) {
stop("Table ", name, " already exists.", call. = FALSE)
}

types <- types %||% db_data_type(dest$con, df)
names(types) <- names(df)

con <- dest$con
db_begin(con)
on.exit(db_rollback(con))

db_create_table(con, name, types, temporary = temporary)
db_insert_into(con, name, df)
db_create_indexes(con, name, unique_indexes, unique = TRUE)
db_create_indexes(con, name, indexes, unique = FALSE)
if (analyze) db_analyze(con, name)
con <- con_acquire(dest)
tryCatch({
if (isTRUE(db_has_table(con, name))) {
stop("Table ", name, " already exists.", call. = FALSE)
}

db_commit(con)
on.exit(NULL)
types <- types %||% db_data_type(con, df)
names(types) <- names(df)

db_begin(con)
tryCatch({
db_create_table(con, name, types, temporary = temporary)
db_insert_into(con, name, df)
db_create_indexes(con, name, unique_indexes, unique = TRUE)
db_create_indexes(con, name, indexes, unique = FALSE)
if (analyze) db_analyze(con, name)

db_commit(con)
}, error = function(err) {
db_rollback(con)
stop(err)
})
}, finally = {
con_release(dest, con)
})

tbl(dest, name)
}

#' @export
collapse.tbl_sql <- function(x, vars = NULL, ...) {
sql <- sql_render(x)
con <- con_acquire(x$src)
tryCatch({
sql <- sql_render(x, con)
}, finally = {
con_release(x$src, con)
})

tbl(x$src, sql) %>% group_by_(.dots = groups(x))
}

Expand All @@ -371,13 +386,18 @@ compute.tbl_sql <- function(x, name = random_table_name(), temporary = TRUE,
unique_indexes <- as.list(unique_indexes)
}

vars <- op_vars(x)
assert_that(all(unlist(indexes) %in% vars))
assert_that(all(unlist(unique_indexes) %in% vars))
x_aliased <- select_(x, .dots = vars) # avoids problems with SQLite quoting (#1754)
db_save_query(x$src$con, sql_render(x_aliased), name = name, temporary = temporary)
db_create_indexes(x$src$con, name, unique_indexes, unique = TRUE)
db_create_indexes(x$src$con, name, indexes, unique = FALSE)
con <- con_acquire(x$src)
tryCatch({
vars <- op_vars(x)
assert_that(all(unlist(indexes) %in% vars))
assert_that(all(unlist(unique_indexes) %in% vars))
x_aliased <- select_(x, .dots = vars) # avoids problems with SQLite quoting (#1754)
db_save_query(con, sql_render(x_aliased, con), name = name, temporary = temporary)
db_create_indexes(con, name, unique_indexes, unique = TRUE)
db_create_indexes(con, name, indexes, unique = FALSE)
}, finally = {
con_release(x$src, con)
})

tbl(x$src, name) %>% group_by_(.dots = groups(x))
}
Expand All @@ -389,14 +409,19 @@ collect.tbl_sql <- function(x, ..., n = 1e5, warn_incomplete = TRUE) {
n <- -1
}

sql <- sql_render(x)
res <- dbSendQuery(x$src$con, sql)
on.exit(dbClearResult(res))
con <- con_acquire(x$src)
on.exit(con_release(x$src, con), add = TRUE)

out <- dbFetch(res, n)
if (warn_incomplete) {
res_warn_incomplete(res, "n = Inf")
}
sql <- sql_render(x, con)
res <- dbSendQuery(con, sql)
tryCatch({
out <- dbFetch(res, n)
if (warn_incomplete) {
res_warn_incomplete(res, "n = Inf")
}
}, finally = {
dbClearResult(res)
})

grouped_df(out, groups(x))
}
Expand All @@ -422,6 +447,9 @@ do_.tbl_sql <- function(.data, ..., .dots, .chunk_size = 1e4L) {
summarise() %>%
collect()

con <- con_acquire(.data$src)
on.exit(con_release(.data$src, con), add = TRUE)

n <- nrow(labels)
m <- length(args)

Expand All @@ -431,7 +459,7 @@ do_.tbl_sql <- function(.data, ..., .dots, .chunk_size = 1e4L) {
env <- new.env(parent = lazyeval::common_env(args))

# Create ungrouped data frame suitable for chunked retrieval
query <- query(.data$src$con, sql_render(ungroup(.data)), op_vars(.data))
query <- query(con, sql_render(ungroup(.data), con), op_vars(.data))

# When retrieving in pages, there's no guarantee we'll get a complete group.
# So we always assume the last group in the chunk is incomplete, and leave
Expand Down
32 changes: 32 additions & 0 deletions man/con_acquire.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 09d7b6d

Please sign in to comment.