crew
is simple to use inside Shiny apps. Not only does
crew
bring parallel computing to a single app session, it
elastically auto-scales worker processes to reduce the resource burden
on other concurrent user sessions. And because of the centralized
controller interface, there is no need to manually loop through
individual tasks, which is convenient for workloads with thousands of
tasks.
The simple example below has three interface elements: an action
button, a plot output, and a text output. When you click the action
button, a new 5-second task pushes to the crew
controller.
The action button can submit new tasks even when existing tasks are
running in the background. The plot output shows the random
visualization returned from latest task.
The text output continuously refreshes to show the current time and number of tasks in progress. Watch the short video linked below to see the app in action. You will see the time tick away even as tasks run in the background. In other words, the tasks run asynchronously and do not block the app session.
The example app uses extended tasks, which are only available in Shiny 1.8.1.1 or above. If your installed version of Shiny is too low, you can upgrade to the development version with:
To begin, the app script loads Shiny.
The run_task()
function waits 5 seconds and then
generates a random aRtsy::canvas_squares()
plot.
run_task <- function() {
Sys.sleep(5)
aRtsy::canvas_squares(colors = aRtsy::colorPalette("random-palette"))
}
The user interface shows the three parts explained previously, along with HTML/CSS formatting.
ui <- fluidPage(
tags$br(),
tags$style("#status,#task{font-size:3em}"),
tags$style("#task{border:3px solid black}"),
actionButton("task", "Submit a task (5 seconds)"),
textOutput("status"),
plotOutput("result")
)
The server sets up
a local
process controller. The controller has 4 workers, and each worker
automatically shuts down if 10 seconds pass without any task
assignments. The onStop()
statement says to terminate the
controller when the app session terminates.
server <- function(input, output, session) {
controller <- crew::crew_controller_local(workers = 4, seconds_idle = 10)
controller$start()
onStop(function() controller$terminate())
The task
object below is a Shiny extended task which
accepts a promise object from the controller. The crew
promise resolves when a task completes, and the
ExtendedTask
object efficiently manages a queue of such
promises.1 2
The “Submit a task (5 seconds)” button pushes a new task to the controller and enqueues a new promise to asynchronously handle the result.
observeEvent(input$task, {
controller$push(command = run_task(), data = list(run_task = run_task))
task$invoke()
})
Because of the Shiny extended task and the crew
promise,
the plot output automatically refreshes almost immediately after the
task completes.
The text status periodically refreshes to show the current time and the number of tasks in progress. When you run the app, you will see the time tick away even as tasks and promises operate in the background.
output$status <- renderText({
input$task
task$status()
invalidateLater(millis = 1000)
time <- format(Sys.time(), "%H:%M:%S")
paste("Time:", time, "|", "Running tasks:", controller$unresolved())
})
}
Finally, shinyApp()
runs the app with the UI and server
defined above.
See below for the complete app.R
file.
library(shiny)
run_task <- function() {
Sys.sleep(5)
aRtsy::canvas_squares(colors = aRtsy::colorPalette("random-palette"))
}
ui <- fluidPage(
tags$br(),
tags$style("#status,#task{font-size:3em}"),
tags$style("#task{border:3px solid black}"),
actionButton("task", "Submit a task (5 seconds)"),
textOutput("status"),
plotOutput("result")
)
server <- function(input, output, session) {
# crew controller
controller <- crew::crew_controller_local(workers = 4, seconds_idle = 10)
controller$start()
onStop(function() controller$terminate())
# extended task to get completed results from the controller
task <- ExtendedTask$new(function() controller$promise(mode = "one"))
# button to submit a crew task
observeEvent(input$task, {
controller$push(command = run_task(), data = list(run_task = run_task))
task$invoke()
})
# task result
output$result <- renderPlot(task$result()$result[[1L]])
# time and task status
output$status <- renderText({
input$task
task$status()
invalidateLater(millis = 1000)
time <- format(Sys.time(), "%H:%M:%S")
paste("Time:", time, "|", "Running tasks:", controller$unresolved())
})
}
shinyApp(ui = ui, server = server)
The example app below demonstrates a high-throughput scenario where
there may be too many individual tasks to efficiently manage each one
individually. The app has an action button and two text outputs. When
you click the action button, the app submits a batch of 20 tasks
(simulated coin flips) to the crew
controller. The action
button can submit new tasks even when existing tasks are running in the
background. The text outputs refresh to show the current time, the
number of upcoming coin flips, and running totals for heads, tails, and
errors.
We first load Shiny.
Our task is a simulated coin flip: wait a random number of seconds,
then return a random TRUE
/FALSE
value. After
many flips, the user may deduce that the coin is unfair.
flip_coin <- function() {
Sys.sleep(runif(n = 1, min = 0.25, max = 2.5))
as.logical(rbinom(n = 1, size = 1, prob = 0.4))
}
The UI invites the user to simulate multiple coin flips and try to figure out if the coin is fair.
ui <- fluidPage(
div("Is the coin fair?"),
actionButton("task", "Flip 20 coins"),
textOutput("status"),
textOutput("outcomes")
)
In the server, we start by creating a crew
controller
which will simulate coin flips in parallel across 4 workers.
server <- function(input, output, session) {
controller <- crew::crew_controller_local(workers = 4, seconds_idle = 10)
controller$start()
onStop(function() controller$terminate())
We keep running totals of heads, tails, and task errors in a list of reactive values.
We create a button to submit a batch of 20 coin flips. Each coin flip
is a separate crew
task.
observeEvent(input$task, {
controller$walk(
command = flip_coin(),
iterate = list(index = seq_len(20)),
data = list(flip_coin = flip_coin)
)
})
A text output refreshes every second to show the current time and the number of coin flips submitted but not yet completed.
output$status <- renderText({
invalidateLater(millis = 1000)
time <- format(Sys.time(), "%H:%M:%S")
sprintf("%s Flipping %s coins.", time, controller$unresolved())
})
Another text output automatically refreshes to show the total number of heads, tails, and errors from completed coin flips.
output$outcomes <- renderText({
pattern <- "%s heads %s tails %s errors"
sprintf(pattern, flips$heads, flips$tails, flips$errors)
})
We collect new coin flip results every second and update the totals in the reactive values list.
observe({
invalidateLater(millis = 1000)
results <- controller$collect()
req(results)
new_flips <- as.logical(results$result)
errors <- na.omit(as.character(results$error))
flips$heads <- flips$heads + sum(new_flips)
flips$tails <- flips$tails + sum(!new_flips)
flips$errors <- flips$errors + length(errors)
})
}
Lastly, we run the app.
See below for the complete app.R
file.
library(shiny)
flip_coin <- function() {
Sys.sleep(runif(n = 1, min = 0.25, max = 2.5))
as.logical(rbinom(n = 1, size = 1, prob = 0.4))
}
ui <- fluidPage(
div("Is the coin fair?"),
actionButton("task", "Flip 20 coins"),
textOutput("status"),
textOutput("outcomes")
)
server <- function(input, output, session) {
# crew controller
controller <- crew::crew_controller_local(workers = 4, seconds_idle = 10)
controller$start()
onStop(function() controller$terminate())
# Keep running totals of heads, tails, and task errors.
flips <- reactiveValues(heads = 0, tails = 0, errors = 0)
# Button to submit a batch of coin flips.
observeEvent(input$task, {
controller$walk(
command = flip_coin(),
iterate = list(index = seq_len(20)),
data = list(flip_coin = flip_coin)
)
})
# Print time and task status.
output$status <- renderText({
invalidateLater(millis = 1000)
time <- format(Sys.time(), "%H:%M:%S")
sprintf("%s Flipping %s coins.", time, controller$unresolved())
})
# Print number of heads and tails.
output$outcomes <- renderText({
pattern <- "%s heads %s tails %s errors"
sprintf(pattern, flips$heads, flips$tails, flips$errors)
})
# Collect coin flip results.
observe({
invalidateLater(millis = 1000)
results <- controller$collect()
req(results)
new_flips <- as.logical(results$result)
errors <- na.omit(as.character(results$error))
flips$heads <- flips$heads + sum(new_flips)
flips$tails <- flips$tails + sum(!new_flips)
flips$errors <- flips$errors + length(errors)
})
}
shinyApp(ui = ui, server = server)
A self-renewing Shiny extended task can make the coin flip app more
responsive. However, there may be a performance cost due to
post-processing each task individually (as opposed to processing in bulk
using controller$collect()
). These are tradeoffs which may
lead to different optimal decisions on a case-by-case basis. Please
choose the approach that best fits your own app.
To make the app respond immediately to each completed coin flip, first define an extended task that re-invokes itself whenever a task completes.
task <- ExtendedTask$new(function() controller$promise(mode = "one"))
observe(if (task$status() != "running") task$invoke())
Next, use task$status()
to invalidate the text status as
soon as a task completes.
output$status <- renderText({
invalidateLater(millis = 1000)
task$status() # Invalidates the reactive when the task completes.
time <- format(Sys.time(), "%H:%M:%S")
sprintf("%s Flipping %s coins.", time, controller$unresolved())
})
Finally, trust task$result()
to invalidate the reactive
expression that collects results.
observe({
new_flip <- task$result()$result[[1]]
flips$heads <- flips$heads + sum(new_flip)
flips$tails <- flips$tails + sum(!new_flip)
})
The full app code is below:
library(shiny)
flip_coin <- function() {
Sys.sleep(runif(n = 1, min = 0.25, max = 2.5))
as.logical(rbinom(n = 1, size = 1, prob = 0.4))
}
ui <- fluidPage(
div("Is the coin fair?"),
actionButton("task", "Flip 20 coins"),
textOutput("status"),
textOutput("outcomes")
)
server <- function(input, output, session) {
# crew controller
controller <- crew::crew_controller_local(workers = 4, seconds_idle = 10)
controller$start()
onStop(function() controller$terminate())
# Keep running totals of heads, tails, and task errors.
flips <- reactiveValues(heads = 0, tails = 0, errors = 0)
# Create a self-renewing extended task that collects individual
# results and invalidates reactive expressions on each one.
# Also collects errors.
task <- ExtendedTask$new(function() controller$promise(mode = "one"))
observe(if (task$status() != "running") task$invoke())
# Button to submit a batch of coin flips.
observeEvent(input$task, {
controller$walk(
command = flip_coin(),
iterate = list(index = seq_len(20)),
data = list(flip_coin = flip_coin)
)
})
# Print time and task status.
output$status <- renderText({
invalidateLater(millis = 1000)
task$status()
time <- format(Sys.time(), "%H:%M:%S")
sprintf("%s Flipping %s coins.", time, controller$unresolved())
})
# Print number of heads and tails.
output$outcomes <- renderText({
pattern <- "%s heads %s tails"
sprintf(pattern, flips$heads, flips$tails)
})
# Collect coin flip results.
observe({
new_flip <- task$result()$result[[1]]
flips$heads <- flips$heads + sum(new_flip)
flips$tails <- flips$tails + sum(!new_flip)
})
}
shinyApp(ui = ui, server = server)
For more on extended tasks, install the development
version of Shiny and call ?shiny::ExtendedTask
to view the
help file.↩︎
For more on crew
promises, visit https://wlandau.github.io/crew/articles/promises.html.↩︎