Parallelization

What is parallelization?

Parallelization is the process of speeding up a computer program by dividing it into independent tasks and running those tasks simultaneously across multiple computer processors. Most modern laptops will have two or more processors (or “cores”), and many statisticians have access to so-called cluster computing systems (CCS), which can have hundreds of processing nodes, each of which can have multiple cores. Roughly speaking, a program that can be run in ten minutes when executed on a single core will take just one minute if it can be broken into ten separate tasks that all run at the same time. Therefore, parallelization can result in massive gains in computing speed and should be done whenever possible.

Not all code can be parallelized; the separate tasks cannot exchange information or depend on each other in any way. However, you can still write programs that are partially parallel, such as when you separately compute ten estimates in parallel and then take the mean of the ten estimates.

The terminology associated with parallel computing can be confusing - what is the difference between a node, a core, and a processor? What is the difference between a job, a task, and a thread? We use the following definitions:

Parallelization in SimEngine

User-friendly parallelization is a hallmark of SimEngine. There are two modes of parallelizing code using SimEngine, which we refer to as local parallelization and cluster parallelization. Local parallelization refers to splitting the computational work of a simulation between multiple cores of a single computer (e.g., a multicore laptop). Cluster parallelization refers to running a simulation on a CCS using job arrays. SimEngine is designed to automate as much of the parallelization process as possible. We give an overview of each parallelization mode below.

Local parallelization

Local parallelization is the easiest way to parallelize code, as the entire process is handled by the package and executed on the user’s computer. This mode is activated using set_config(), as follows.

sim <- new_sim()
sim %<>% set_config(parallel = TRUE)

SimEngine handles the mechanics related to parallelization internally using the base R package parallel. If a single simulation replicate runs in a very short amount of time (e.g., less than one second), using local parallelization can actually result in an increase in total runtime. This is because there is a certain amount of computational overhead involved in the parallelization mechanisms inside SimEngine. A speed comparison can be performed by running the code twice, once with set_config(parallel = TRUE) and once with set_config(parallel = FALSE), each followed by sim %>% vars("total_runtime"), to see the difference in total runtime. The exact overhead involved with local parallelization will differ between machines.

If the user’s computer has n cores available, SimEngine will use n-1 cores by default. The n_cores argument of set_config() can be used to manually specify the number of cores to use, as follows.

sim %<>% set_config(n_cores = 2)

Cluster parallelization

Parallelizing code using a CCS is more complicated, but SimEngine is built to streamline this process as much as possible. A CCS is a supercomputer that consists of a number of nodes, each of which may have multiple cores. In a typical workflow, a user starts by logging into the CCS (via SSH) and transferring files to the CCS filesystem (using Linux commands or an FTP client). The user then runs programs by submitting “jobs” to the CCS using a special program called a job scheduler. The job scheduler manages the process of running the jobs in parallel across multiple nodes and/or multiple cores. Although there are multiple ways to run code in parallel on a CCS, SimEngine makes use of job arrays. The main cluster parallelization function in SimEngine is run_on_cluster(). Throughout this example, we use Slurm as an example job scheduler, but an analogous workflow will apply to other job scheduling software.

To illustrate the cluster parallelization workflow, consider the following simulation:

sim <- new_sim()
create_data <- function(n) { return(rpois(n=n, lambda=20)) }
est_lambda <- function(dat, type) {
  if (type=="M") { return(mean(dat)) }
  if (type=="V") { return(var(dat)) }
}
sim %<>% set_levels(estimator = c("M","V"), n = c(10,100,1000))
sim %<>% set_script(function() {
  dat <- create_data(L$n)
  lambda_hat <- est_lambda(dat=dat, type=L$estimator)
  return(list("lambda_hat"=lambda_hat))
})
sim %<>% set_config(num_sim=100)
sim %<>% run()
sim %>% summarize()

To run this code on a CCS, we simply wrap it in the run_on_cluster() function. To use this function, we must break the code into three blocks, called first, main, and last. The code in the first block will run only once, and will set up the simulation object. When this is finished, SimEngine will save the simulation object in the filesystem of the CCS. The code in the main block will then run once for each simulation replicate, and will have access to the simulation object created in the first block. In most cases, the code in the main block will simply include a single call to run(). Finally, the code in the last block will run after all simulation replicates have finished running, and after SimEngine has automatically compiled the results into the simulation object. Use of the run_on_cluster() function is illustrated below:

run_on_cluster(
  first = {
    sim <- new_sim()
    create_data <- function(n) { return(rpois(n=n, lambda=20)) }
    est_lambda <- function(dat, type) {
      if (type=="M") { return(mean(dat)) }
      if (type=="V") { return(var(dat)) }
    }
    sim %<>% set_levels(estimator = c("M","V"), n = c(10,100,1000))
    sim %<>% set_script(function() {
      dat <- create_data(L$n)
      lambda_hat <- est_lambda(dat=dat, type=L$estimator)
      return(list("lambda_hat"=lambda_hat))
    })
    sim %<>% set_config(num_sim=100, n_cores=20)
  },
  main = {
    sim %<>% run()
  },
  last = {
    sim %>% summarize()
  },
  cluster_config = list(js="slurm")
)        

Note that none of the actual simulation code changed (with the exception of specifying n_cores=20 in the set_config() call); we simply divided the code into chunks and and placed these chunks into the appropriate block (first, main, or last) within run_on_cluster(). Additionally, we specified which job scheduler to use in the cluster_config argument list. The command js_support() can be run in R to see a list of supported job scheduler software; the value in the js_code column is the value that should be specified in the cluster_config argument. Unsupported job schedulers can still be used for cluster parallelization, as detailed below.

Next, we must give the job scheduler instructions on how to run the above code. In the following, we assume that the R code above is stored in a file called my_simulation.R. We also need to create a simple shell script called run_sim.sh with the following two lines, which will run my_simulation.R (we demonstrate this using BASH scripting language, but any shell scripting language may be used).

#!/bin/bash
Rscript my_simulation.R

If created on a local machine, the two simulation files (my_simulation.R and run_sim.sh) must be transferred to the filesystem of the CCS. Finally, we use the job scheduler to submit three jobs. The first will run the first code, the second will run the main code, and the third will run the last code. With Slurm, we run the following three shell commands:

sbatch --export=sim_run='first' run_sim.sh
#> Submitted batch job 101
sbatch --export=sim_run='main' --array=1-20 --depend=afterok:101 run_sim.sh
#> Submitted batch job 102
sbatch --export=sim_run='last' --depend=afterok:102 run_sim.sh
#> Submitted batch job 103

In the first line, we submit the run_sim.sh script using the sim_run='first' environment variable, which tells SimEngine to only run the code in the first block. After running this, Slurm returns the message Submitted batch job 101. The number 101 is called the “job ID” and uniquely identifies the job on the CCS.

In the second line, we submit the run_sim.sh script using the sim_run='main' environment variable and tell Slurm to run a job array with “task IDs” 1-20. Each task corresponds to one core, and so in this case 20 cores will be used. This number should equal the n_cores number specified via set_config(). SimEngine handles the work of dividing the simulation replicates between the cores; the only restriction is that the number of cores cannot exceed the total number of simulation replicates.

Also note that we included the option --depend=afterok:101, which instructs the job scheduler to wait until the first job finishes before starting the job array. (In practice, the number 101 must be replaced with whatever job ID Slurm assigned to the first job.) Once this command is submitted, the code in the main block will be run for each replicate. A temporary folder called sim_results will be created and filled with temporary objects containing data on the results and/or errors for each replicate.

In the third line, we submit the run_sim.sh script using the sim_run='last' environment variable. Again, we use --depend=afterok:102 to ensure this code does not run until all tasks in the job array have finished. When this job runs, SimEngine will compile the results from the main block, run the code in the last block, save the simulation object to the filesystem, and delete the temporary sim_results folder and its contents. If desired, the user can leave the last block empty, but this third sbatch command should be run anyways to compile the results and save the simulation object for further analysis.

Additional cluster parallelization functionality

Running locally

The run_on_cluster() function is programmed such that it can also be run locally. In this case, the code within the first, main, and last blocks will be executed in the calling environment of the run_on_cluster() function (typically the global environment); this can be useful for testing simulations locally before sending them to a CCS.

Using unsupported job schedulers

There may be job schedulers that SimEngine does not natively support. If this is the case, SimEngine can still be used for cluster parallelization; this requires identifying the environment variable that the job scheduler uses to uniquely identify tasks within a job array. For example, Slurm uses the variable "SLURM_ARRAY_TASK_ID" and Grid Engine uses the variable "SGE_TASK_ID". Once this variable is identified, it can be specified in the cluster_config block, as follows:

run_on_cluster(
  first = {...},
  main = {...},
  last = {...},
  cluster_config = list(tid_var="SLURM_ARRAY_TASK_ID")
)

Updating a simulation on a CCS

To update a simulation on a CCS, the update_sim_on_cluster() function can be used. The workflow is similar to that of run_on_cluster(), with several key differences. Instead of creating a new simulation object in the first block using new_sim(), the existing simulation object (which would have been saved to the filesystem when run_on_cluster() was called originally) is loaded using readRDS(). Then, the functions set_levels() and/or set_config() are called to specify the desired updates. In the main block, update_sim() is called (instead of run()). In the last block, code can remain the same or change as needed. These differences are illustrated in the code below.

update_sim_on_cluster(
  first = {
    sim <- readRDS("sim.rds")
    sim %<>% set_levels(n=c(100,500,1000))
  },
  main = {
    sim %<>% update_sim()
  },
  last = {
    sim %>% summarize()
  },
  cluster_config = list(js="slurm")
)        

Submission of this code via a job scheduler proceeds in the same manner as described earlier for run_on_cluster().