Skip to content

Commit

Permalink
Switch parallel backend from doFuture to doParallel
Browse files Browse the repository at this point in the history
This change improves multi-core performance of robyn_run()

My test run on 200 iterations and 4 cores shown 9.5 sec improvement (0.71 -> 0.55 min)
Tested on Ubuntu 20.04 / VirtualBox under Win10 host.
  • Loading branch information
andrey-legayev committed Nov 23, 2021
1 parent cbab310 commit 07a9e43
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 27 deletions.
2 changes: 0 additions & 2 deletions R/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,8 @@ Depends:
Imports:
data.table,
doParallel,
doFuture,
doRNG,
foreach,
future,
ggplot2,
ggridges,
glmnet,
Expand Down
6 changes: 1 addition & 5 deletions R/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,13 @@ export(robyn_save)
export(saturation_hill)
import(data.table)
import(ggplot2)
importFrom(doFuture,registerDoFuture)
importFrom(doParallel,registerDoParallel)
importFrom(doParallel,stopImplicitCluster)
importFrom(doRNG,"%dorng%")
importFrom(foreach,"%dopar%")
importFrom(foreach,foreach)
importFrom(foreach,getDoParWorkers)
importFrom(foreach,registerDoSEQ)
importFrom(future,availableCores)
importFrom(future,multicore)
importFrom(future,plan)
importFrom(future,sequential)
importFrom(ggridges,geom_density_ridges)
importFrom(glmnet,cv.glmnet)
importFrom(glmnet,glmnet)
Expand Down
6 changes: 2 additions & 4 deletions R/R/imports.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
#' @author Antonio Prada (aprada@@fb.com)
#' @author Igor Skokan (igorskokan@@fb.com)
#' @import data.table
#' @importFrom doFuture registerDoFuture
#' @importFrom doRNG %dorng%
#' @importFrom doParallel registerDoParallel
#' @importFrom doParallel registerDoParallel stopImplicitCluster
#' @importFrom foreach foreach %dopar% getDoParWorkers registerDoSEQ
#' @importFrom future multicore plan sequential availableCores
#' @import ggplot2
#' @importFrom ggridges geom_density_ridges
#' @importFrom glmnet cv.glmnet glmnet
Expand Down Expand Up @@ -56,7 +54,7 @@ dt_vars <- c(
"optmResponseUnitTotalLift", "optmSpendUnit", "optmSpendUnitTotalDelta", "param",
"perc", "percentage", "pos", "predicted", "refreshStatus", "response", "rn", "robynPareto",
"roi", "roi_mean", "roi_total", "rsq_lm", "rsq_nls", "rsq_train", "s0", "scale_shape_halflife",
"season", "sequential", "shape", "solID", "spend", "spend_share", "spend_share_refresh",
"season", "shape", "solID", "spend", "spend_share", "spend_share_refresh",
"theta", "theta_halflife", "total_spend", "trend", "trial", "type", "value", "variable",
"weekday", "x", "xDecompAgg", "xDecompMeanNon0", "xDecompMeanNon0Perc",
"xDecompMeanNon0PercRF", "xDecompMeanNon0RF", "xDecompPerc", "xDecompPercRF", "y", "yhat",
Expand Down
30 changes: 14 additions & 16 deletions R/R/model.R
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,10 @@ robyn_run <- function(InputCollect,
#decompSpendDist <- decompSpendDist[xDecompAgg[rn %in% InputCollect$paid_media_vars, .(rn, xDecompAgg, solID)], on = c("rn", "solID")]

## get mean_response
registerDoFuture()
if (.Platform$OS.type == "unix") {
plan(multicore, workers = InputCollect$cores)
registerDoParallel(InputCollect$cores)
} else {
plan(sequential)
registerDoSEQ()
}

# if (hyper_fixed == FALSE) {pb <- txtProgressBar(min=1, max = length(decompSpendDist$rn), style = 3)}
Expand All @@ -270,6 +269,7 @@ robyn_run <- function(InputCollect,
return(dt_resp)
}
#if (hyper_fixed == FALSE) close(pb)
stopImplicitCluster()
registerDoSEQ()
getDoParWorkers()

Expand Down Expand Up @@ -1090,6 +1090,14 @@ robyn_mmm <- function(hyper_collect,
}
# assign("InputCollect", InputCollect, envir = .GlobalEnv) # adding this to enable InputCollect reading during parallel
# opts <- list(progress = function(n) setTxtProgressBar(pb, n))

# create cluster before big for-loop to minimize overhead for parallel backend registering
if (.Platform$OS.type == "unix") {
registerDoParallel(InputCollect$cores)
} else {
registerDoSEQ()
}

sysTimeDopar <- system.time({
for (lng in 1:iterNG) { # lng = 1
nevergrad_hp <- list()
Expand Down Expand Up @@ -1134,17 +1142,7 @@ robyn_mmm <- function(hyper_collect,
nrmse.collect <- c()
decomp.rssd.collect <- c()
best_mape <- Inf
# registerDoParallel(cores) #registerDoParallel(cores=InputCollect$cores)

registerDoFuture()
if (.Platform$OS.type == "unix") {
plan(multicore, workers = cores)
} else {
plan(sequential)
}

# nbrOfWorkers()
getDoParWorkers()
doparCollect <- suppressPackageStartupMessages(
foreach(i = 1:iterPar) %dorng% { # i = 1
t1 <- Sys.time()
Expand Down Expand Up @@ -1420,13 +1418,10 @@ robyn_mmm <- function(hyper_collect,
}
) # end foreach parallel

# stopImplicitCluster()

nrmse.collect <- sapply(doparCollect, function(x) x$nrmse)
decomp.rssd.collect <- sapply(doparCollect, function(x) x$decomp.rssd)
mape.lift.collect <- sapply(doparCollect, function(x) x$mape.lift)


#####################################
#### Nevergrad tells objectives

Expand All @@ -1450,6 +1445,9 @@ robyn_mmm <- function(hyper_collect,

message("\n Finished in ", round(sysTimeDopar[3] / 60, 2), " mins")

# stop cluster to avoid memory leaks
stopImplicitCluster()

if (hyper_fixed == FALSE) close(pb)
registerDoSEQ()
getDoParWorkers()
Expand Down

0 comments on commit 07a9e43

Please sign in to comment.