---
title: "Split, map, and reduce"
output:
  rmarkdown::html_vignette:
    toc: true
    toc_depth: 4
description: >
  Shows how to split data, apply the pipeline to each subset, and then
  reduce the results back into a combined output.
vignette: >
  %\VignetteIndexEntry{Split, map, and reduce}
  %\VignetteEngine{knitr::rmarkdown}
  %\VignetteEncoding{UTF-8}
---

```{r knitr-setup, include = FALSE}
require(pipeflow)

knitr::opts_chunk$set(
    comment = "#",
    prompt = FALSE,
    tidy = FALSE,
    cache = FALSE,
    collapse = TRUE
)

old <- options(width = 100L)
```

### Motivation

A common scenario is to split a data set into subsets and then apply the same
analysis to each part. In context of pipelines, this means that we would like
to apply the same pipeline multiple times to each data subset. In addition,
we may then want to combine parts of the individual output. As we will see,
{pipeflow} provides a built-in function to handle this scenario.


### Define pipeline

Let's first define our pipeline, which, to keep matters simple, just
fits a linear model and outputs the model coefficients.

```{r}
library(pipeflow)

pip <- pip_new("my-pipeline") |>
    pip_add(
        "data",
        function(data = NULL) data
    ) |>
    pip_add(
        "fit",
        function(
            data = ~data,
            xVar = "x",
            yVar = "y"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>
    pip_add(
        "coefs",
        function(fit = ~fit) {
            coefficients(fit)
        }
    )
```

So our pipeline looks like this:

```{r}
pip
```

Or graphically:

We use the `iris` data set as our working example.

```{r}
head(iris)
```

First, we apply the pipeline to the whole data set.

```{r}
pip |> pip_set_params(list(
    data = iris,
    xVar = "Sepal.Length",
    yVar = "Sepal.Width"
))

pip_run(pip)
```


```{r}
pip[["coefs", "out"]]
```


### Split data

Next, we want to apply the pipeline to each species separately.
One way to do this would be to use R's `split` function.
We can split it by the `Species` column and then run the pipeline
for each subset. For example:

```{r}
run_pipeline_helper <- function(data) {
    pip |> pip_set_params(list(data = data))
    pip_run(pip)
    pip[["coefs", "out"]]
}

results <- lapply(split(iris, iris$Species), FUN = run_pipeline_helper)
```

```{r}
results
```

Unfortunately, with this approach we had to create additional code
that had to be run outside the pipeline framework. In addition,
the run log quickly can become redundant and confusing, as it now
contains multiple runs of the same pipeline.
Since splitting data sets (or more generally mapping function calls
to different subsets of data) is such a common scenario,
{pipeflow} also provides a built-in mechanism to handle this case.

Since version 0.4.0, for each step, it is possible to set the
so-called execution mode, which by default is `exec = "auto"`.
To model the above scenario, we add a new step to our pipeline
that splits the data set and set its execution mode to `split`.

```{r}
pip <- pip_new("my-split-pip") |>
    pip_add(
        "data",
        function(data = NULL) data
    ) |>
    pip_add(
        "split_data",
        function(
            data = ~data,
            byVar = "by"
        ) {
            split(data, f = data[[byVar]])
        },
        exec = "split" # <-- set execution mode to "split"
    ) |>
    pip_add(
        "fit",
        function(
            data = ~split_data,
            xVar = "x",
            yVar = "y"
        ) {
            lm(paste(yVar, "~", xVar), data = data)
        }
    ) |>
    pip_add(
        "coefs",
        function(fit = ~fit) {
            coefficients(fit)
        }
    )
```

```{r}
pip
```

First of all, we see that the pipeline now is printed with an additional
column `exec` marking the `split` execution mode for the `split_data` step.
This also can be inspected in the graph:

```{r, eval = FALSE}
library(visNetwork)
do.call(visNetwork, args = pip_get_graph(pip))
```

```{r, echo = FALSE}
library(visNetwork)
do.call(
    visNetwork,
    args = c(pip_get_graph(pip), list(height = 100, width = 500))
) |>
    visHierarchicalLayout(direction = "LR", sortMethod = "directed")
```

Now what does this execution mode actually do? It basically tells the pipeline
that for all steps that depend on the `split_data` step (directly or indirectly),
the results coming from the split step should be treated as lists of results,
which should be iterated over.

In our particular example, this means that the `fit` step will be executed for
each data subset coming from the `split_data` step and likewise the `coefs`
step will be executed for each fitted model coming from the `fit` step.

Let's see this in action by running the pipeline.

```{r}
pip |> pip_set_params(list(
    data = iris,
    xVar = "Sepal.Length",
    yVar = "Sepal.Width",
    byVar = "Species"
))

pip_run(pip)
```

Looking at the pipeline overview, we see that the `out`puts following the
`split_data` steps are now all lists of results.

```{r}
pip
```

Inspecting in particular the output of the `coefs` step, we see that it
is now a list of coefficient tables, one for each species.

```{r}
pip[["coefs", "out"]]
```

This matches the output^[
    Technically, the output is slightly different, because the returned list
    has an additional class attribute "pipeflow_partitioned".
] we obtained earlier with the helper function
but was obtained without the
need having to write all this extra code around the pipeline.

### Recombine output

While the above approach looks nice already, we are only half way there, because
often we will want to recombine the output of all the different subsets
in some way. For example, we may want to show the resulting coefficients
of the linear models in one summary table.

This is where the `reduce` execution mode comes into play.
Let's for this matter extend our pipeline by one step at the end.

```{r}
pip |> pip_add(
    "combine_coefs",
    function(coefs = ~coefs) {
        do.call(rbind, coefs)
    },
    exec = "reduce" # <-- set execution mode to "reduce"
)
```

```{r}
pip
```

Again, we see that the new step is marked with the execution mode (`reduce`)
in the overview. Graphically, this mode is represented by a circle.

```{r, eval = FALSE}
do.call(visNetwork, args = pip_get_graph(pip))
```

```{r, echo = FALSE}
do.call(
    visNetwork,
    args = c(pip_get_graph(pip), list(height = 100, width = 650))
) |>
    visHierarchicalLayout(direction = "LR", sortMethod = "directed")
```

If we now run the pipeline, we see that the output of the `combine_coefs` step is
a combined table of coefficients.

```{r}
pip_run(pip)

pip[["combine_coefs", "out"]]
```

There you go :-)

```{r, include = FALSE}
options(old)
```
