GitHub - r-lib/coro: Coroutines for R (original) (raw)
coro
Overview
coro implements coroutines for R, i.e. functions that can be suspended and resumed later on. There are two kinds:
- Async functions, which make it straightforward to program concurrently
- Generators for iterating over complex sequences
Supported features:
- Suspending within loops and if/else branches
- Suspending within
tryCatch()
on.exit()
expressions and stack-based cleanup such as provided bylocal_
functions in the withrpackage- Step-debugging and
browser()
within coroutines
Compatibility with:
- Python iterators from thereticulate package
- Async operations from thepromises package
- Parallel computations from thefuture package
Attach the package to follow the examples:
Async/await functions
Concurrent programming is made straightforward by async-await functions. Whenever you are waiting for a result that may take a while (downloading a file, computing a value in an external process), use await()
. The argument to await()
must return a promise from thepromises package.
Concurrent code based on promises can quickly become hard to write and follow. In the following artificial example, we wait for a download to complete, then decide to launch a computation in an external process depending on a property of the downloaded data. We also handle some errors specifically.
my_async <- function() { async_download() %>% then(function(data) { if (ncol(data) > 10) { then(future::future(fib(30)), function(fib) { data / fib }) } else { data } }, onRejected = function(err) { if (inherits(err, "download_error")) { NULL } else { stop(err) } }) }
Rewriting this function with async/await greatly simplifies the code:
my_async <- async(function() { data <- tryCatch( await(async_download()), download_error = function(err) NULL )
if (is.null(data)) { return(NULL) }
if (ncol(data) > 10) { fib <- await(future::future(fib(30))) data <- data /fib }
data })
Generators
Generators are based on a simple iteration protocol:
- Iterators are functions.
- They can be advanced by calling the function. The new value is returned.
- An exhausted iterator returns the sentinel symbol
exhausted
.
The generator()
function creates a generator factory which returns generator instances:
Create a generator factory
generate_abc <- generator(function() { for (x in letters[1:3]) { yield(x) } })
Create a generator instance
abc <- generate_abc()
A generator instance is an iterator function which yields values:
abc #> <generator/instance> #> function () #> { #> for (x in letters[1:3]) { #> yield(x) #> } #> } #> <environment: 0x1258e3818>
abc() #> [1] "a"
Collect all remaining values from an iterator with collect()
:
collect(abc) #> [[1]] #> [1] "b" #> #> [[2]] #> [1] "c"
Iterate over an iterator with loop()
:
loop(for (x in generate_abc()) { print(toupper(x)) }) #> [1] "A" #> [1] "B" #> [1] "C"
See vignette("generator")
for more information.
Compatibility with the reticulate package
Python iterators imported with thereticulate package are compatible with loop()
and collect()
:
suppressMessages(library(reticulate))
py_run_string(" def first_n(n): num = 1 while num <= n: yield num num += 1 ")
loop(for (x in py$first_n(3)) { print(x * 2) }) #> [1] 2 #> [1] 4 #> [1] 6
They can also be composed with coro generators:
times <- generator(function(it, n) for (x in it) yield(x * n))
composed <- times(py$first_n(3), 10)
collect(composed) #> [[1]] #> [1] 10 #> #> [[2]] #> [1] 20 #> #> [[3]] #> [1] 30
Limitations
yield()
and await()
can be used in loops, if/else branches,tryCatch()
expressions, or any combinations of these. However they can’t be used as function arguments. These will cause errors:
generator(function() { list(yield("foo")) })
async(function() { list(await(foo())) })
Fortunately it is easy to rewrite the code to work around this limitation:
generator(function() { x <- yield("foo") list(x) })
async(function() { x <- await(foo()) list(x) })
How does it work
Coroutines are an abstraction for state machinesin languages that support them. Conversely, you can implement coroutines by rewriting the code source provided by the user as a state machine. Pass internals = TRUE
to the print methods of coroutines to reveal the state machine that is running under the hood:
print(generate_abc, internals = TRUE)
#>
#> function ()
#> {
#> for (x in letters[1:3]) {
#> yield(x)
#> }
#> }
#> <environment: 0x1258e3818>
#> State machine:
#> {
#> if (exhausted) {
#> return(invisible(exhausted()))
#> }
#> repeat switch(state[[1L]], 1
= {
#> iterators[[2L]] <- as_iterator(user(letters[1:3]))
#> state[[1L]] <- 2L
#> state[[2L]] <- 1L
#> }, 2
= {
#> repeat switch(state[[2L]], 1
= {
#> if ({
#> iterator <- iterators[[2L]]
#> if (is_exhausted(elt <- iterator())) {
#> FALSE
#> } else {
#> user_env[["x"]] <- elt
#> TRUE
#> }
#> }) {
#> state[[2L]] <- 2L
#> } else {
#> break
#> }
#> }, 2
= {
#> user({
#> x
#> })
#> state[[2L]] <- 3L
#> suspend()
#> return(last_value())
#> }, 3
= {
#> .last_value <- if (missing(arg)) NULL else arg
#> state[[2L]] <- 1L
#> })
#> iterators[[2L]] <- NULL
#> length(state) <- 1L
#> break
#> })
#> exhausted <- TRUE
#> invisible(exhausted())
#> }
Despite this transformation of source code, browser()
and step-debugging still work as you would expect. This is because coro keeps track of the source references from the original code.
Acknowledgements
- The regenerator Javascript package which uses a similar transformation to implement generators and async functions in older versions of Javascript.
- Gabor Csardi for many interesting discussions about concurrency and the design of coro.
Installation
Install the development version from github with:
install.packages("devtools")
devtools::install_github("r-lib/coro", build_vignettes = TRUE)
Code of Conduct
Please note that the coro project is released with a Contributor Code of Conduct. By contributing to this project, you agree to abide by its terms.