Package {pipeflow}


Title: Fast Interactive Data Analysis Pipelines
Version: 0.3.0
Maintainer: Roman Pahl <roman.pahl@gmail.com>
Description: A lightweight and intuitive framework for building interactive data analysis pipelines. You add R functions one by one, and 'pipeflow' wires them into a pipeline that stays consistent as you go. Modify, remove, or insert steps at any stage, manage all parameters in one place, fast execution (C++-powered DAG) for interactive use and Shiny backends.
License: MIT + file LICENSE
URL: https://rpahl.github.io/pipeflow/, https://github.com/rpahl/pipeflow
BugReports: https://github.com/rpahl/pipeflow/issues
Depends: R (≥ 4.2.0)
Imports: data.table, jsonlite, lgr, methods, R6, Rcpp (≥ 1.1.1), stats, utils
Suggests: ggplot2, gridExtra, knitr, mockery, rmarkdown, targets, testthat, visNetwork
LinkingTo: Rcpp
VignetteBuilder: knitr
Config/roxygen2/version: 8.0.0
Config/testthat/edition: 3
Config/testthat/parallel: true
Encoding: UTF-8
Language: en-US
NeedsCompilation: yes
Packaged: 2026-06-14 20:27:21 UTC; Roman
Author: Roman Pahl [aut, cre]
Repository: CRAN
Date/Publication: 2026-06-15 07:50:07 UTC

pipeflow: Lightweight, General-Purpose Data Analysis Pipelines

Description

logo

A lightweight yet powerful framework for building robust data analysis pipelines. With 'pipeflow', you initialize a pipeline with your dataset and construct workflows step by step by adding R functions. You can modify, remove, or insert steps and parameters at any stage, while 'pipeflow' ensures the pipeline's integrity. Overall, this package offers a beginner-friendly framework that simplifies and streamlines the development of data analysis pipelines by making them modular, intuitive, and adaptable.

Author(s)

Maintainer: Roman Pahl roman.pahl@gmail.com

Authors:

See Also

Useful links:


Pipeline Class

Description

This class implements an analysis pipeline. A pipeline consists of a sequence of analysis steps, which can be added one by one. Each added step may or may not depend on one or more previous steps. The pipeline keeps track of the dependencies among these steps and will ensure that all dependencies are met on creation of the pipeline, that is, before the the pipeline is run. Once the pipeline is run, the output is stored in the pipeline along with each step and can be accessed later. Different pipelines can be bound together while preserving all dependencies within each pipeline.

Lifecycle

Deprecated. Legacy R6 interface. Use pip_new() and pip_* functions.

Public fields

name

string name of the pipeline

pipeline

data.table the pipeline where each row represents one step.

Methods

Public methods


Pipeline$new()

constructor

Usage
Pipeline$new(name, data = NULL, logger = NULL)
Arguments
name

the name of the Pipeline

data

optional data used at the start of the pipeline. The data also can be set later using the set_data function.

logger

custom logger to be used for logging. If no logger is provided, the default logger is used, which should be sufficient for most use cases. If you do want to use your own custom log function, you need to provide a function that obeys the following form:

⁠function(level, msg, ...) { your custom logging code here }⁠

The level argument is a string and will be one of info, warn, or error. The msg argument is a string containing the message to be logged. The ... argument is a list of named parameters, which can be used to add additional information to the log message. Currently, this is only used to add the context in case of a step giving a warning or error.

Note that with the default logger, the log layout can be altered any time via set_log_layout().

Returns

returns the Pipeline object invisibly

Examples
p <- Pipeline$new("myPipe", data = data.frame(x = 1:8))
p

# Passing custom logger
my_logger <- function(level, msg, ...) {
   cat(level, msg, "\n")
}
p <- Pipeline$new("myPipe", logger = my_logger)

Pipeline$add()

Add pipeline step

Usage
Pipeline$add(
  step,
  fun,
  params = list(),
  description = "",
  group = step,
  keepOut = FALSE
)
Arguments
step

string the name of the step. Each step name must be unique.

fun

function or name of the function to be applied at the step. Both existing and anonymous/lambda functions can be used. All function parameters must have default values. If a parameter is missing a default value in the function signature, alternatively, it can be set via the params argument (see Examples section with mean() function).

params

list list of parameters to set or overwrite parameters of the passed function.

description

string optional description of the step

group

string output collected after pipeline execution (see function collect_out) is grouped by the defined group names. By default, this is the name of the step, which comes in handy when the pipeline is copy-appended multiple times to keep the results of the same function/step grouped at one place.

keepOut

logical if FALSE (default) the output of the step is not collected when calling collect_out after the pipeline run. This option is used to only keep the results that matter and skip intermediate results that are not needed. See also function collect_out for more details.

Returns

returns the Pipeline object invisibly

Examples
# Add steps with lambda functions
p <- Pipeline$new("myPipe", data = 1)
p$add("s1", \(x = ~data) 2*x)  # use input data
p$add("s2", \(x = ~data, y = ~s1) x * y)
try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already
try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found
p

# Add step with existing function
p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4))
p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE))
p$run()$get_out("calc_mean")

# Step description
p <- Pipeline$new("myPipe", data = 1:10)
p$add("s1", \(x = ~data) 2*x, description = "multiply by 2")
print(p)
print(p, verbose = TRUE) # print all columns

# Group output
p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5))
p$add("prep_x", \(data = ~data) data$x, group = "prep")
p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep")
p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y)
p$run()$collect_out(all = TRUE)

Pipeline$append()

Append another pipeline When appending, pipeflow takes care of potential name clashes with respect to step names and dependencies, that is, if needed, it will automatically adapt step names and dependencies to make sure they are unique in the merged pipeline.

Usage
Pipeline$append(p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")
Arguments
p

Pipeline object to be appended.

outAsIn

logical if TRUE, output of first pipeline is used as input for the second pipeline.

tryAutofixNames

logical if TRUE, name clashes are tried to be automatically resolved by appending the 2nd pipeline's name. Only set to FALSE, if you know what you are doing.

sep

string separator used when auto-resolving step names

Returns

returns new combined Pipeline.

Examples
# Append pipeline
p1 <- Pipeline$new("pipe1")
p1$add("step1", \(x = 1) x)
p2 <- Pipeline$new("pipe2")
p2$add("step2", \(y = 1) y)
p1$append(p2)

# Append pipeline with potential name clashes
p3 <- Pipeline$new("pipe3")
p3$add("step1", \(z = 1) z)
p1$append(p2)$append(p3)

# Use output of first pipeline as input for second pipeline
p1 <- Pipeline$new("pipe1", data = 8)
p2 <- Pipeline$new("pipe2")
p1$add("square", \(x = ~data) x^2)
p2$add("log2", \(x = ~data) log2(x))

p12 <- p1$append(p2, outAsIn = TRUE)
p12$run()$get_out("log2")
p12

# Custom name separator
p1$append(p2, sep = "___")

Pipeline$append_to_step_names()

Appends string to all step names and takes care of updating step dependencies accordingly.

Usage
Pipeline$append_to_step_names(postfix, sep = ".")
Arguments
postfix

string to be appended to each step name.

sep

string separator between step name and postfix.

Returns

returns the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe")
p$add("step1", \(x = 1) x)
p$add("step2", \(y = 1) y)
p$append_to_step_names("new")
p
p$append_to_step_names("foo", sep = "__")
p

Pipeline$collect_out()

Collect output afer pipeline run, by default, from all steps for which keepOut was set to TRUE. The output is grouped by the group names (see group parameter in function add), which by default are set identical to the step names.

Usage
Pipeline$collect_out(groupBy = "group", all = FALSE)
Arguments
groupBy

string column of pipeline by which to group the output.

all

logical if TRUE all output is collected regardless of the keepOut flag. This can be useful for debugging.

Returns

list containing the output, named after the groups, which, by default, are the steps.

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2)
p$add("step2", \(x = ~step1) x + 2, keepOut = TRUE)
p$run()
p$collect_out()
p$collect_out(all = TRUE) |> str()

# Grouped output
p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2, group = "add")
p$add("step2", \(x = ~step1, y = 2) x + y, group = "add")
p$add("step3", \(x = ~data) x * 3, group = "mult")
p$add("step4", \(x = ~data, y = 2) x * y, group = "mult")
p
p$run()
p$collect_out(all = TRUE) |> str()

# Grouped by state
p$set_params(list(y = 5))
p
p$collect_out(groupBy = "state", all = TRUE) |> str()

Pipeline$discard_steps()

Discard all steps that match a given pattern.

Usage
Pipeline$discard_steps(pattern, recursive = FALSE, fixed = TRUE, ...)
Arguments
pattern

string containing a regular expression (or character string for fixed = TRUE) to be matched.

recursive

logical if TRUE the step is removed together with all its downstream dependencies.

fixed

logical If TRUE, pattern is a string to be matched as is. Overrides all conflicting arguments.

...

further arguments passed to grep().

Returns

the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~add1) x + 2)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p

p$discard_steps("mult")
p

# Re-add steps
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p
# Discarding 'add1' does not work ...
try(p$discard_steps("add1"))

# ... unless we enforce to remove its downstream dependencies as well
p$discard_steps("add1", recursive = TRUE)   # this works
p

# Trying to discard non-existent steps is just ignored
p$discard_steps("non-existent")

Pipeline$get_data()

Get data

Usage
Pipeline$get_data()
Returns

the output defined in the data step, which by default is the first step of the pipeline

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$get_data()
p$set_data(3:4)
p$get_data()

Pipeline$get_depends()

Get all dependencies defined in the pipeline

Usage
Pipeline$get_depends()
Returns

named list of dependencies for each step

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$get_depends()

Pipeline$get_depends_down()

Get all downstream dependencies of given step, by default descending recursively.

Usage
Pipeline$get_depends_down(step, recursive = TRUE)
Arguments
step

string name of step

recursive

logical if TRUE, dependencies of dependencies are also returned.

Returns

list of downstream dependencies

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_down("add1")
p$get_depends_down("add1", recursive = FALSE)

Pipeline$get_depends_up()

Get all upstream dependencies of given step, by default descending recursively.

Usage
Pipeline$get_depends_up(step, recursive = TRUE)
Arguments
step

string name of step

recursive

logical if TRUE, dependencies of dependencies are also returned.

Returns

list of upstream dependencies

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_up("mult4")
p$get_depends_up("mult4", recursive = FALSE)

Pipeline$get_graph()

Visualize the pipeline as a graph.

Usage
Pipeline$get_graph(groups = NULL)
Arguments
groups

character if not NULL, only steps belonging to the given groups are considered.

Returns

two data frames, one for nodes and one for edges ready to be used with the visNetwork package.

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = ~add1, y = ~add2) x * y)
graph <- pipe_get_graph(p)
graph

if (require("visNetwork", quietly = TRUE)) {
    do.call(visNetwork, args = p$get_graph())
}

Pipeline$get_out()

Get output of given step

Usage
Pipeline$get_out(step)
Arguments
step

string name of step

Returns

the output at the given step.

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$run()
p$get_out("add1")
p$get_out("add2")

Pipeline$get_params()

Set unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps. Trying #' to set parameters that don't exist in the pipeline is ignored, by default, with a warning.

Usage
Pipeline$get_params(ignoreHidden = TRUE)
Arguments
ignoreHidden

logical if TRUE, hidden parameters (i.e. all names starting with a dot) are ignored and thus not returned.

Returns

list of parameters, sorted and named by step. Steps with no parameters are filtered out.

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params() |> str()
p$get_params(ignoreHidden = FALSE) |> str()

Pipeline$get_params_at_step()

Get all unbound (i.e. not referring to other steps) at given step name.

Usage
Pipeline$get_params_at_step(step, ignoreHidden = TRUE)
Arguments
step

string name of step

ignoreHidden

logical if TRUE, hidden parameters (i.e. all names starting with a dot) are ignored and thus not returned.

Returns

list of parameters defined at given step.

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params_at_step("add2")
p$get_params_at_step("add2", ignoreHidden = FALSE)
p$get_params_at_step("add3")

Pipeline$get_params_unique()

Get all unbound (i.e. not referring to other steps) parameters defined in the pipeline, but only list each parameter once. The values of the parameters, will be the values of the first step where the parameter was defined. This is particularly useful after the parameters where set using the set_params function, which will set the same value for all steps.

Usage
Pipeline$get_params_unique(ignoreHidden = TRUE)
Arguments
ignoreHidden

logical if TRUE, hidden parameters (i.e. all names starting with a dot) are ignored and thus not returned.

Returns

list of unique parameters

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
p$get_params_unique()
p$get_params_unique(ignoreHidden = FALSE)

Pipeline$get_params_unique_json()

Get all unique function parameters in json format.

Usage
Pipeline$get_params_unique_json(ignoreHidden = TRUE)
Arguments
ignoreHidden

logical if TRUE, hidden parameters (i.e. all names starting with a dot) are ignored and thus not returned.

Returns

list flat unnamed json list of unique function parameters

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
p$get_params_unique_json()
p$get_params_unique_json(ignoreHidden = FALSE)

Pipeline$get_step()

Get step of pipeline

Usage
Pipeline$get_step(step)
Arguments
step

string name of step

Returns

data.table row containing the step.

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z)
p$run()
add1 <- p$get_step("add1")
print(add1)
add1[["params"]]
add1[["fun"]]
try()
try(p$get_step("foo")) # error: step 'foo' does not exist

Pipeline$get_step_names()

Get step names of pipeline

Usage
Pipeline$get_step_names()
Returns

character vector of step names

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_names()

Pipeline$get_step_number()

Get step number

Usage
Pipeline$get_step_number(step)
Arguments
step

string name of step

Returns

the step number in the pipeline

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_number("f2")

Pipeline$has_step()

Check if pipeline has given step

Usage
Pipeline$has_step(step)
Arguments
step

string name of step

Returns

logical whether step exists

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$has_step("f2")
p$has_step("foo")

Pipeline$insert_after()

Insert step after a certain step

Usage
Pipeline$insert_after(afterStep, step, ...)
Arguments
afterStep

string name of step after which to insert

step

string name of step to insert

...

further arguments passed to add method of the pipeline

Returns

returns the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_after("f1", "f3", \(x = ~f1) x)
p

Pipeline$insert_before()

Insert step before a certain step

Usage
Pipeline$insert_before(beforeStep, step, ...)
Arguments
beforeStep

string name of step before which to insert

step

string name of step to insert

...

further arguments passed to add method of the pipeline

Returns

returns the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_before("f2", "f3", \(x = ~f1) x)
p

Pipeline$length()

Length of the pipeline aka number of pipeline steps.

Usage
Pipeline$length()
Returns

numeric length of pipeline.

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$length()

Pipeline$lock_step()

Locking a step means that both its parameters and its output (given it has output) are locked such that neither setting new pipeline parameters nor future pipeline runs can change the current parameter and output content.

Usage
Pipeline$lock_step(step)
Arguments
step

string name of step

Returns

the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$run()
p$get_out("add1")
p$get_out("add2")
p$lock_step("add1")

p$set_data(3)
p$set_params(list(x = 3))
p$run()
p$get_out("add1")
p$get_out("add2")

Pipeline$pop_step()

Drop last step from the pipeline.

Usage
Pipeline$pop_step()
Returns

string the name of the step that was removed

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p
p$pop_step() # "f2"
p

Pipeline$pop_steps_after()

Drop all steps after the given step.

Usage
Pipeline$pop_steps_after(step)
Arguments
step

string name of step

Returns

character vector of steps that were removed.

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_after("f1")  # "f2", "f3"
p

Pipeline$pop_steps_from()

Drop all steps from and including the given step.

Usage
Pipeline$pop_steps_from(step)
Arguments
step

string name of step

Returns

character vector of steps that were removed.

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_from("f2")  # "f2", "f3"
p

Pipeline$print()

Print the pipeline as a table.

Usage
Pipeline$print(verbose = FALSE)
Arguments
verbose

logical if TRUE, print all columns of the pipeline, otherwise only the most relevant columns are displayed.

Returns

the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$print()

Pipeline$remove_step()

Remove certain step from the pipeline. If other steps depend on the step to be removed, an error is given and the removal is blocked, unless recursive was set to TRUE.

Usage
Pipeline$remove_step(step, recursive = FALSE)
Arguments
step

string the name of the step to be removed.

recursive

logical if TRUE the step is removed together with all its downstream dependencies.

Returns

the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = 1, y = ~add2) x * y)
p$remove_step("mult1")
p
try(p$remove_step("add1"))  # fails because "add2" depends on "add1"
p$remove_step("add1", recursive = TRUE)  # removes "add1" and "add2"
p

Pipeline$rename_step()

Safely rename a step in the pipeline. If new step name would result in a name clash, an error is given.

Usage
Pipeline$rename_step(from, to)
Arguments
from

string the name of the step to be renamed.

to

string the new name of the step.

Returns

the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p
try(p$rename_step("add1", "add2"))  # fails because "add2" exists
p$rename_step("add1", "first_add")  # Ok
p

Pipeline$replace_step()

Replaces an existing pipeline step.

Usage
Pipeline$replace_step(
  step,
  fun,
  params = list(),
  description = "",
  group = step,
  keepOut = FALSE
)
Arguments
step

string the name of the step to be replaced. Step must exist.

fun

string or function operation to be applied at the step. Both existing and lambda/anonymous functions can be used.

params

list list of parameters to overwrite default parameters of existing functions.

description

string optional description of the step

group

string grouping information (by default the same as the name of the step. Any output collected later (see function collect_out by default is put together by these group names. This, for example, comes in handy when the pipeline is copy-appended multiple times to keep the results of the same function/step at one place.

keepOut

logical if FALSE the output of the function will be cleaned at the end of the whole pipeline execution. This option is used to only keep the results that matter.

Returns

the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~data, y = 2) x + y)
p$add("mult", \(x = 1, y = 2) x * y, keepOut = TRUE)
p$run()$collect_out()
p$replace_step("mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p$run()$collect_out()
try(p$replace_step("foo", \(x = 1) x))   # step 'foo' does not exist

Pipeline$reset()

Resets the pipeline to the state before it was run. This means that all output is removed and the state of all steps is reset to 'New'.

Usage
Pipeline$reset()
Returns

returns the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$run()
p
p$reset()
p

Pipeline$run()

Run all new and/or outdated pipeline steps.

Usage
Pipeline$run(
  force = FALSE,
  recursive = TRUE,
  cleanUnkept = FALSE,
  progress = NULL,
  showLog = TRUE
)
Arguments
force

logical if TRUE all steps are run regardless of whether they are outdated or not.

recursive

logical if TRUE and a step returns a new pipeline, the run of the current pipeline is aborted and the new pipeline is run recursively.

cleanUnkept

logical if TRUE all output that was not marked to be kept is removed after the pipeline run. This option can be useful if temporary results require a lot of memory.

progress

function this parameter can be used to provide a custom progress function of the form ⁠function(value, detail)⁠, which will show the progress of the pipeline run for each step, where value is the current step number and detail is the name of the step.

showLog

logical should the steps be logged during the pipeline run?

Returns

returns the Pipeline object invisibly

Examples
# Simple pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p$run()$collect_out()
p$set_params(list(z = 4))  # outdates steps add2 and final
p
p$run()$collect_out()
p$run(cleanUnkept = TRUE)  # clean up temporary results
p

# Recursive pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("new_pipe", \(x = ~add1) {
    pp <- Pipeline$new("new_pipe", data = x)
    pp$add("add1", \(x = ~data) x + 1)
    pp$add("add2", \(x = ~add1) x + 2, keepOut = TRUE)
    }
)
p$run(recursive = TRUE)$collect_out()

# Run pipeline with progress bar
p <- Pipeline$new("pipe", data = 1)
p$add("first step", \() Sys.sleep(1))
p$add("second step", \() Sys.sleep(1))
p$add("last step", \() Sys.sleep(1))
pb <- txtProgressBar(min = 1, max = p$length(), style = 3)
fprogress <- function(value, detail) {
   setTxtProgressBar(pb, value)
}
p$run(progress = fprogress, showLog = FALSE)

Pipeline$run_step()

Run given pipeline step possibly together with upstream and downstream dependencies.

Usage
Pipeline$run_step(
  step,
  upstream = TRUE,
  downstream = FALSE,
  cleanUnkept = FALSE
)
Arguments
step

string name of step

upstream

logical if TRUE, run all dependent upstream steps first.

downstream

logical if TRUE, run all depdendent downstream afterwards.

cleanUnkept

logical if TRUE all output that was not marked to be kept is removed after the pipeline run. This option can be useful if temporary results require a lot of memory.

Returns

returns the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("mult", \(x = ~add1, y = ~add2) x * y)
p$run_step("add2")
p$run_step("add2", downstream = TRUE)
p$run_step("mult", upstream = TRUE)

Pipeline$set_data()

Set data in first step of pipeline.

Usage
Pipeline$set_data(data)
Arguments
data

initial data set

Returns

returns the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p$run()$collect_out()
p$set_data(3)
p$run()$collect_out()

Pipeline$set_data_split()

This function can be used to apply the pipeline repeatedly to various data sets. For this, the pipeline split-copies itself by the list of given data sets. Each sub-pipeline will have one of the data sets set as input data. The step names of the sub-pipelines will be the original step names plus the name of the data set.

Usage
Pipeline$set_data_split(
  dataList,
  toStep = character(),
  groupBySplit = TRUE,
  sep = "."
)
Arguments
dataList

list of data sets

toStep

string step name marking optional subset of the pipeline, at which the data split should be applied to.

groupBySplit

logical whether to set step groups according to data split.

sep

string separator to be used between step name and data set name when creating the new step names.

Returns

new combined Pipeline with each sub-pipeline having set one of the data sets.

Examples
# Split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
p$set_data_split(dataList)
p
p$run()$collect_out() |> str()

# Don't group output by split
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
p$set_data_split(dataList, groupBySplit = FALSE)
p
p$run()$collect_out() |> str()

# Split up to certain step
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1)
p$add("mult", \(x = ~data, y = ~add1) x * y)
p$add("average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE)
p
p$get_depends()[["average_result"]]

p$set_data_split(dataList, toStep = "mult")
p
p$get_depends()[["average_result"]]

p$run()$collect_out()

Pipeline$set_keep_out()

Change the keepOut flag at a given pipeline step, which determines whether the output of that step is collected when calling collect_out() after the pipeline was run.

Usage
Pipeline$set_keep_out(step, keepOut = TRUE)
Arguments
step

string name of step

keepOut

logical whether to keep output of step

Returns

the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p$add("add2", \(x = ~data, y = 2) x + y)
p$add("mult", \(x = ~add1, y = ~add2) x * y)
p$run()$collect_out()
p$set_keep_out("add1", keepOut = FALSE)
p$set_keep_out("mult", keepOut = TRUE)
p$collect_out()

Pipeline$set_params()

Set parameters in the pipeline. If a parameter occurs in several steps, the parameter is set commonly in all steps. Trying to set parameters that don't exist in the pipeline is ignored, by default, with a warning.

Usage
Pipeline$set_params(params, warnUndefined = TRUE)
Arguments
params

list of parameters to be set

warnUndefined

logical whether to give a warning when trying to set a parameter that is not defined in the pipeline.

Returns

returns the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2) x + y)
p$add("add2", \(x = ~data, y = 3) x + y)
p$add("mult", \(x = 4, z = 5) x * z)
p$get_params()
p$set_params(list(x = 3, y = 3))
p$get_params()
p$set_params(list(x = 5, z = 3))
p$get_params()
suppressWarnings(
    p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined
)
p$set_params(list(foo = 3), warnUndefined = FALSE)

Pipeline$set_params_at_step()

Set unbound function parameters defined at given pipeline step where 'unbound' means parameters that are not linked to other steps.

Usage
Pipeline$set_params_at_step(step, params)
Arguments
step

string the name of the step

params

list of parameters to be set

Returns

returns the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2, z = 3) x + y)
p$set_params_at_step("add1", list(y = 5, z = 6))
p$get_params()
try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined

Pipeline$split()

Splits pipeline into its independent parts.

Usage
Pipeline$split()
Returns

list of Pipeline objects

Examples
# Example for two independent calculation paths
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = ~data) x)
p$add("f2", \(x = 1) x)
p$add("f3", \(x = ~f1) x)
p$add("f4", \(x = ~f2) x)
p$split()

# Example of split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipes <- p$set_data_split(dataList)$split()
pipes

Pipeline$unlock_step()

Unlock previously locked step. If step was not locked, the command is ignored.

Usage
Pipeline$unlock_step(step)
Arguments
step

string name of step

Returns

the Pipeline object invisibly

Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$lock_step("add1")
p$set_params(list(x = 3))
p$get_params()
p$unlock_step("add1")
p$set_params(list(x = 3))
p$get_params()

Pipeline$clone()

The objects of this class are cloneable with this method.

Usage
Pipeline$clone(deep = FALSE)
Arguments
deep

Whether to make a deep clone.

Author(s)

Roman Pahl

Examples


## ------------------------------------------------
## Method `Pipeline$new()`
## ------------------------------------------------

p <- Pipeline$new("myPipe", data = data.frame(x = 1:8))
p

# Passing custom logger
my_logger <- function(level, msg, ...) {
   cat(level, msg, "\n")
}
p <- Pipeline$new("myPipe", logger = my_logger)

## ------------------------------------------------
## Method `Pipeline$add()`
## ------------------------------------------------

# Add steps with lambda functions
p <- Pipeline$new("myPipe", data = 1)
p$add("s1", \(x = ~data) 2*x)  # use input data
p$add("s2", \(x = ~data, y = ~s1) x * y)
try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already
try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found
p

# Add step with existing function
p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4))
p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE))
p$run()$get_out("calc_mean")

# Step description
p <- Pipeline$new("myPipe", data = 1:10)
p$add("s1", \(x = ~data) 2*x, description = "multiply by 2")
print(p)
print(p, verbose = TRUE) # print all columns

# Group output
p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5))
p$add("prep_x", \(data = ~data) data$x, group = "prep")
p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep")
p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y)
p$run()$collect_out(all = TRUE)

## ------------------------------------------------
## Method `Pipeline$append()`
## ------------------------------------------------

# Append pipeline
p1 <- Pipeline$new("pipe1")
p1$add("step1", \(x = 1) x)
p2 <- Pipeline$new("pipe2")
p2$add("step2", \(y = 1) y)
p1$append(p2)

# Append pipeline with potential name clashes
p3 <- Pipeline$new("pipe3")
p3$add("step1", \(z = 1) z)
p1$append(p2)$append(p3)

# Use output of first pipeline as input for second pipeline
p1 <- Pipeline$new("pipe1", data = 8)
p2 <- Pipeline$new("pipe2")
p1$add("square", \(x = ~data) x^2)
p2$add("log2", \(x = ~data) log2(x))

p12 <- p1$append(p2, outAsIn = TRUE)
p12$run()$get_out("log2")
p12

# Custom name separator
p1$append(p2, sep = "___")

## ------------------------------------------------
## Method `Pipeline$append_to_step_names()`
## ------------------------------------------------

p <- Pipeline$new("pipe")
p$add("step1", \(x = 1) x)
p$add("step2", \(y = 1) y)
p$append_to_step_names("new")
p
p$append_to_step_names("foo", sep = "__")
p

## ------------------------------------------------
## Method `Pipeline$collect_out()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2)
p$add("step2", \(x = ~step1) x + 2, keepOut = TRUE)
p$run()
p$collect_out()
p$collect_out(all = TRUE) |> str()

# Grouped output
p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2, group = "add")
p$add("step2", \(x = ~step1, y = 2) x + y, group = "add")
p$add("step3", \(x = ~data) x * 3, group = "mult")
p$add("step4", \(x = ~data, y = 2) x * y, group = "mult")
p
p$run()
p$collect_out(all = TRUE) |> str()

# Grouped by state
p$set_params(list(y = 5))
p
p$collect_out(groupBy = "state", all = TRUE) |> str()

## ------------------------------------------------
## Method `Pipeline$discard_steps()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~add1) x + 2)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p

p$discard_steps("mult")
p

# Re-add steps
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p
# Discarding 'add1' does not work ...
try(p$discard_steps("add1"))

# ... unless we enforce to remove its downstream dependencies as well
p$discard_steps("add1", recursive = TRUE)   # this works
p

# Trying to discard non-existent steps is just ignored
p$discard_steps("non-existent")

## ------------------------------------------------
## Method `Pipeline$get_data()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$get_data()
p$set_data(3:4)
p$get_data()

## ------------------------------------------------
## Method `Pipeline$get_depends()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$get_depends()

## ------------------------------------------------
## Method `Pipeline$get_depends_down()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_down("add1")
p$get_depends_down("add1", recursive = FALSE)

## ------------------------------------------------
## Method `Pipeline$get_depends_up()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_up("mult4")
p$get_depends_up("mult4", recursive = FALSE)

## ------------------------------------------------
## Method `Pipeline$get_graph()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = ~add1, y = ~add2) x * y)
graph <- pipe_get_graph(p)
graph

if (require("visNetwork", quietly = TRUE)) {
    do.call(visNetwork, args = p$get_graph())
}

## ------------------------------------------------
## Method `Pipeline$get_out()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$run()
p$get_out("add1")
p$get_out("add2")

## ------------------------------------------------
## Method `Pipeline$get_params()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params() |> str()
p$get_params(ignoreHidden = FALSE) |> str()

## ------------------------------------------------
## Method `Pipeline$get_params_at_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params_at_step("add2")
p$get_params_at_step("add2", ignoreHidden = FALSE)
p$get_params_at_step("add3")

## ------------------------------------------------
## Method `Pipeline$get_params_unique()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
p$get_params_unique()
p$get_params_unique(ignoreHidden = FALSE)

## ------------------------------------------------
## Method `Pipeline$get_params_unique_json()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
p$get_params_unique_json()
p$get_params_unique_json(ignoreHidden = FALSE)

## ------------------------------------------------
## Method `Pipeline$get_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z)
p$run()
add1 <- p$get_step("add1")
print(add1)
add1[["params"]]
add1[["fun"]]
try()
try(p$get_step("foo")) # error: step 'foo' does not exist

## ------------------------------------------------
## Method `Pipeline$get_step_names()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_names()

## ------------------------------------------------
## Method `Pipeline$get_step_number()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_number("f2")

## ------------------------------------------------
## Method `Pipeline$has_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$has_step("f2")
p$has_step("foo")

## ------------------------------------------------
## Method `Pipeline$insert_after()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_after("f1", "f3", \(x = ~f1) x)
p

## ------------------------------------------------
## Method `Pipeline$insert_before()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_before("f2", "f3", \(x = ~f1) x)
p

## ------------------------------------------------
## Method `Pipeline$length()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$length()

## ------------------------------------------------
## Method `Pipeline$lock_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$run()
p$get_out("add1")
p$get_out("add2")
p$lock_step("add1")

p$set_data(3)
p$set_params(list(x = 3))
p$run()
p$get_out("add1")
p$get_out("add2")

## ------------------------------------------------
## Method `Pipeline$pop_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p
p$pop_step() # "f2"
p

## ------------------------------------------------
## Method `Pipeline$pop_steps_after()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_after("f1")  # "f2", "f3"
p

## ------------------------------------------------
## Method `Pipeline$pop_steps_from()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_from("f2")  # "f2", "f3"
p

## ------------------------------------------------
## Method `Pipeline$print()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$print()

## ------------------------------------------------
## Method `Pipeline$remove_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = 1, y = ~add2) x * y)
p$remove_step("mult1")
p
try(p$remove_step("add1"))  # fails because "add2" depends on "add1"
p$remove_step("add1", recursive = TRUE)  # removes "add1" and "add2"
p

## ------------------------------------------------
## Method `Pipeline$rename_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p
try(p$rename_step("add1", "add2"))  # fails because "add2" exists
p$rename_step("add1", "first_add")  # Ok
p

## ------------------------------------------------
## Method `Pipeline$replace_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~data, y = 2) x + y)
p$add("mult", \(x = 1, y = 2) x * y, keepOut = TRUE)
p$run()$collect_out()
p$replace_step("mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p$run()$collect_out()
try(p$replace_step("foo", \(x = 1) x))   # step 'foo' does not exist

## ------------------------------------------------
## Method `Pipeline$reset()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$run()
p
p$reset()
p

## ------------------------------------------------
## Method `Pipeline$run()`
## ------------------------------------------------

# Simple pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p$run()$collect_out()
p$set_params(list(z = 4))  # outdates steps add2 and final
p
p$run()$collect_out()
p$run(cleanUnkept = TRUE)  # clean up temporary results
p

# Recursive pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("new_pipe", \(x = ~add1) {
    pp <- Pipeline$new("new_pipe", data = x)
    pp$add("add1", \(x = ~data) x + 1)
    pp$add("add2", \(x = ~add1) x + 2, keepOut = TRUE)
    }
)
p$run(recursive = TRUE)$collect_out()

# Run pipeline with progress bar
p <- Pipeline$new("pipe", data = 1)
p$add("first step", \() Sys.sleep(1))
p$add("second step", \() Sys.sleep(1))
p$add("last step", \() Sys.sleep(1))
pb <- txtProgressBar(min = 1, max = p$length(), style = 3)
fprogress <- function(value, detail) {
   setTxtProgressBar(pb, value)
}
p$run(progress = fprogress, showLog = FALSE)

## ------------------------------------------------
## Method `Pipeline$run_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("mult", \(x = ~add1, y = ~add2) x * y)
p$run_step("add2")
p$run_step("add2", downstream = TRUE)
p$run_step("mult", upstream = TRUE)

## ------------------------------------------------
## Method `Pipeline$set_data()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p$run()$collect_out()
p$set_data(3)
p$run()$collect_out()

## ------------------------------------------------
## Method `Pipeline$set_data_split()`
## ------------------------------------------------

# Split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
p$set_data_split(dataList)
p
p$run()$collect_out() |> str()

# Don't group output by split
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
p$set_data_split(dataList, groupBySplit = FALSE)
p
p$run()$collect_out() |> str()

# Split up to certain step
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1)
p$add("mult", \(x = ~data, y = ~add1) x * y)
p$add("average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE)
p
p$get_depends()[["average_result"]]

p$set_data_split(dataList, toStep = "mult")
p
p$get_depends()[["average_result"]]

p$run()$collect_out()

## ------------------------------------------------
## Method `Pipeline$set_keep_out()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p$add("add2", \(x = ~data, y = 2) x + y)
p$add("mult", \(x = ~add1, y = ~add2) x * y)
p$run()$collect_out()
p$set_keep_out("add1", keepOut = FALSE)
p$set_keep_out("mult", keepOut = TRUE)
p$collect_out()

## ------------------------------------------------
## Method `Pipeline$set_params()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2) x + y)
p$add("add2", \(x = ~data, y = 3) x + y)
p$add("mult", \(x = 4, z = 5) x * z)
p$get_params()
p$set_params(list(x = 3, y = 3))
p$get_params()
p$set_params(list(x = 5, z = 3))
p$get_params()
suppressWarnings(
    p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined
)
p$set_params(list(foo = 3), warnUndefined = FALSE)

## ------------------------------------------------
## Method `Pipeline$set_params_at_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2, z = 3) x + y)
p$set_params_at_step("add1", list(y = 5, z = 6))
p$get_params()
try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined

## ------------------------------------------------
## Method `Pipeline$split()`
## ------------------------------------------------

# Example for two independent calculation paths
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = ~data) x)
p$add("f2", \(x = 1) x)
p$add("f3", \(x = ~f1) x)
p$add("f4", \(x = ~f2) x)
p$split()

# Example of split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipes <- p$set_data_split(dataList)$split()
pipes

## ------------------------------------------------
## Method `Pipeline$unlock_step()`
## ------------------------------------------------

p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$lock_step("add1")
p$set_params(list(x = 3))
p$get_params()
p$unlock_step("add1")
p$set_params(list(x = 3))
p$get_params()

Extract or subset a pipeline

Description

Returns a new pipeline containing selected steps and all required upstream dependencies.

Extracts values from a pipeline using one or two indices. With a single string name, named fields such as "pipeline" or "name" are returned first; anything else returns the matching step-table column. With two indices (row, column), a single cell is extracted.

Usage

## S3 method for class 'pipeflow_pip'
x[i, ...]

## S3 method for class 'pipeflow_pip'
x[[i, j, ...]]

Arguments

x

A pipeflow pipeline object.

i

integer (row indices) or character vector (step names) of steps to select

...

not used

j

column names to select

Value

A new pipeflow pipeline object.

Extracted value(s), depending on i and j.

Examples

p <- pip_new() |>
  pip_add("load", \(n = 5) seq_len(n)) |>
  pip_add("square", \(x = ~load) x^2) |>
  pip_add("total", \(x = ~square) sum(x))

# Select by step name — upstream deps are pulled in automatically.
# Selecting only "total" still includes "load" and "square".
sub <- p["total"]
sub[["pipeline"]][["step"]] # "load", "square", "total"

# Select a subset of steps by name vector
p[c("load", "square")][["pipeline"]][["step"]] # "load", "square"

# Select by integer row index
p[1:2][["pipeline"]][["step"]] # "load", "square"
p <- pip_new() |>
  pip_add("load", \(x = 1) x) |>
  pip_add("fit", \(x = ~load) x + 1)

# Access internal objects by name
p[["pipeline"]] # the full step table
p[["name"]] # "pipe"

# Shorthand column access (equivalent to p[["pipeline"]][["step"]])
p[["step"]]

# Two-index form: p[[row, column]] extracts a single cell
p[["fit", "depends"]] # "load"
p[[2, "state"]] # state of the second step

Length of a pipeflow pipeline or view

Description

Length of a pipeflow pipeline or view

Usage

## S3 method for class 'pipeflow_pip'
length(x)

## S3 method for class 'pipeflow_view'
length(x)

Arguments

x

A pipeflow pipeline or view

Value

Number of steps as an integer.

Examples

p <- pip_new() |>
  pip_add("s1", \(x = 1) x) |>
  pip_add("s2", \(x = ~s1) x + 1) |>
  pip_add("s3", \(x = ~s2) x * 2)
length(p) # 3 — total steps in the pipeline

# A view reports only the number of selected (visible) steps
v <- pip_view(p, i = c("s2", "s3"))
length(v) # 2

Add a step

Description

Adds a named step to the pipeline. Each step is a function whose parameters either hold constant defaults or reference the output of a prior step using formula notation (~step_name). Dependencies are validated when the step is added.

Usage

pip_add(x, step, fun, tags = character(0), after = length(x), exec = "auto")

Arguments

x

A pipeflow pipeline object.

step

Unique step name.

fun

Function to execute for the step. Each function parameter must have a default value. Default values that are simple constants are resolved immediately. Default values that are formulas like ~other_step are treated as dependencies to those steps and resolved to the respective output values at runtime once the step is executed.

tags

Optional character vector of tags belonging to the step. Can also be adjusted later using ⁠[pip_tag()]⁠.

after

Optional position after which the new step should be inserted (defaults to last position). Can be a step name or an integer index. If set to 0, the new step will be inserted at the beginning of the pipeline.

exec

Execution mode for this step. One of "auto", "split", "reduce" or "plain". Using execution mode exec = split, the output of the step is marked as partitioned output. In this mode, any step that depends on the split step (directly or indirectly) will have its output automatically mapped partition-wise during step execution. The reduce mode expects partitioned input and passes it through without mapping, while plain mode only accepts non-partitioned input and always intends to execute a single call. In summary:

  • auto: map if partitioned input appears, otherwise single call

  • split: single call, then mark output as partitioned

  • reduce: single call, but only valid with partitioned input

  • plain: single call, only valid with non-partitioned input

Details

If after was specified, the new step will be inserted after the given step or position. Be aware that in contrast to adding a step at the end, inserting a step in the middle is a rather expensive operation as it requires re-wiring parts of the internal pipeline structure, especially if the new step is inserted at an early position.

Value

The updated pipeline, invisibly.

Examples

# --- Tags, and view filtering ---
p <- pip_new("analysis") |>
  pip_add("load", \(n = 5) seq_len(n), tags = c("io", "raw")) |>
  pip_add("clean", \(x = ~load) x * 2, tags = c("io", "process")) |>
  pip_add("fit", \(x = ~clean) sum(x), tags = c("model", "core", "daily")) |>
  pip_add("report", \(x = ~fit) paste("result:", x), tags = "report")

pip_run(p)
p

# Filter by tag using pip_view — keeps steps with any matching tag
pip_view(p, tags = "daily")
pip_view(p, tags = "core")
pip_view(p, tags = c("raw", "report"))

# --- Split / reduce execution modes ---
q <- pip_new("split-demo") |>
  pip_add("data", \(x = iris) x) |>
  pip_add("split", \(x = ~data) split(x, x$Species),
    exec = "split"
  ) |>
  pip_add("stats", \(x = ~split) summary(x)) |>
  pip_add("combine", \(x = ~stats) do.call(rbind, x),
    exec = "reduce"
  )

pip_run(q)
q[["stats", "out"]]   # partitioned list — one summary per species
q[["combine", "out"]] # combined table

Copy a step from another pipeline

Description

Copies one step from pipeline y into pipeline x, preserving its function, parameters, tags, and dependency links.

Usage

pip_add_from(x, y, step)

Arguments

x

Target pipeflow pipeline object.

y

Source pipeflow pipeline object.

step

Step name to copy from y.

Value

The updated target pipeline, invisibly.

Examples

# Build a source pipeline with reusable steps
src <- pip_new("source") |>
  pip_add("load", \(n = 3) seq_len(n)) |>
  pip_add("square", \(x = ~load) x^2)

# Copy steps into a new pipeline one at a time.
# The dependency of "square" on "load" is re-established automatically.
dst <- pip_new("target")
pip_add_from(dst, src, "load")
pip_add_from(dst, src, "square")
pip_run(dst)
pip_collect_out(dst)

Bind pipelines

Description

Bind two pipelines together by concatenating their steps. If both pipelines have steps with the same name, the step names of the second pipeline will be automatically adapted to avoid name clashes.

Usage

pip_bind(x, y)

Arguments

x

A pipeflow pipeline object.

y

A pipeflow pipeline object.

Value

A new pipeflow pipeline object representing the bound pipelines.

Examples

a <- pip_new("a") |>
  pip_add("prep", \(x = 1) x * 2) |>
  pip_add("fit", \(x = ~prep) x + 10)

# "prep" exists in both pipelines; the one from b gets a numeric suffix
b <- pip_new("b") |> pip_add("prep", \(x = 5) x * 3)

ab <- pip_bind(a, b)
ab[["step"]] # "prep", "fit", "prep2" (step name conflict auto-resolved)
ab

Clone a pipeline

Description

Creates an independent copy of the pipeline. Changes to the cloned pipeline do not affect the original pipeline, and vice versa.

Usage

pip_clone(x, name = NULL)

Arguments

x

A pipeflow pipeline object.

name

Optional name for the cloned pipeline. If NULL, the original name is used.

Value

A cloned pipeflow pipeline object.

Examples

p <- pip_new("original") |>
  pip_add("s1", \(x = 1) x) |>
  pip_add("s2", \(x = ~s1) x + 1)

# Clone produces a fully independent copy
cp <- pip_clone(p, name = "copy")
pip_add(cp, "s3", \(x = ~s2) x * 10)

# As a result, the clone has the new step ...
cp

# ... while the original is left unchanged
p

Collect step outputs

Description

Returns the outputs of all pipeline steps as a flat named list keyed by step name. Use pip_view() to narrow the selection before collecting, and compose calls if grouped output is needed.

Usage

pip_collect_out(x)

Arguments

x

A pipeflow pip or view.

Value

A named list of outputs, one element per step.

Examples

p <- pip_new() |>
  pip_add("load", \(x = 1) x, tags = "io") |>
  pip_add("clean", \(x = ~load) x + 1, tags = "io") |>
  pip_add("model", \(x = ~clean) x * 2, tags = "model")
pip_run(p)

# Flat named list with one entry per step
pip_collect_out(p)

# Combine with pip_view to collect output for specific tags
grouped <- list(
  io = pip_view(p, tags = "io") |> pip_collect_out(),
  model = pip_view(p, tags = "model") |> pip_collect_out()
)
grouped

Build pipeline graph data

Description

Builds graph data (nodes and edges) describing the pipeline's step structure, suitable for visualisation with visNetwork::visNetwork().

Usage

pip_get_graph(x, include_upstream = FALSE)

Arguments

x

A pipeflow pip or view.

include_upstream

Logical. Only relevant for views. If TRUE, add all upstream dependencies of selected steps.

Details

Node shapes reflect execution mode:

Value

A named list with two data.frames: nodes and edges.

Examples

p <- pip_new()
pip_add(p, "load", \(x = 1) x, tags = "io")
pip_add(p, "clean", \(x = ~load) x + 1, tags = "io")
pip_add(p, "fit", \(x = ~clean) x * 2, tags = "model")

graph <- pip_get_graph(p)
graph$nodes # data.frame: id, label, shape, color
graph$edges # data.frame: from, to, arrows

# For a view, include_upstream = TRUE adds upstream deps to the graph
v <- pip_view(p, i = "fit")
pip_get_graph(v, include_upstream = TRUE)

if (require("visNetwork", quietly = TRUE)) {
  do.call(what = visNetwork::visNetwork, args = graph)
}

Get independent parameters

Description

Returns the current default values of all tunable (non-dependency) parameters across the pipeline. These are the parameters that can be updated via pip_set_params(). Parameters wired to another step's output via ~step_name are excluded.

Usage

pip_get_params(x)

Arguments

x

A pipeflow pip or view

Value

Named list of tunable parameter values. If the same parameter name appears in multiple steps, the first occurrence in pipeline order is returned.

Examples

p <- pip_new() |>
  pip_add("load", \(n = 100, seed = 42) seq_len(n)) |>
  pip_add("model", \(x = ~load, lambda = 0.1) x * lambda)

# ~load is a dependency — only non-dependency params are returned
pip_get_params(p) # list(n = 100, seed = 42, lambda = 0.1)

# Useful as a guide for pip_set_params()
pip_set_params(p, params = list(n = 20, lambda = 0.5))
pip_run(p) |> pip_collect_out()

Check whether a step exists

Description

Check whether a step exists

Usage

pip_has_step(x, step)

Arguments

x

A pipeflow pip

step

A step name

Value

Logical indicating if the step exists

Examples

p <- pip_new() |>
  pip_add("load", \(x = 1) x) |>
  pip_add("fit", \(x = ~load) x + 1)

pip_has_step(p, "load") # TRUE
pip_has_step(p, "fit") # TRUE
pip_has_step(p, "predict") # FALSE — step not yet added

Lock selected steps against updates

Description

Locks all selected steps in the pipeline unless p is a view, in which case only steps covered by the view are locked.

Usage

pip_lock(p)

Arguments

p

A pipeflow pip or view.

Value

The updated pipeline or view, invisibly.

Examples

p <- pip_new() |>
  pip_add("load", \(x = 10) x) |>
  pip_add("fit", \(x = ~load) x * 2)
pip_run(p, lgr = NULL)

# Lock only "load" via a view so it won't be re-executed or overwritten
pip_lock(pip_view(p, i = "load"))
p[["pipeline"]][["locked"]] # TRUE, FALSE

# Locked steps are silently skipped during pip_run()
pip_run(p, lgr = NULL, force = TRUE)
p[["pipeline"]][["out"]][[1]] # still 10 — locked, not re-executed

pip_unlock(p)
p[["pipeline"]][["locked"]] # FALSE, FALSE

Create a pipeline

Description

Creates a new, empty pipeline. Add steps with pip_add() and execute them with pip_run().

Usage

pip_new(name = "pipe")

Arguments

name

Single name used for printing and for derived view names.

Value

A pipeflow pipeline object.

Examples

# Create a named pipeline
p <- pip_new("my_analysis")
p[["name"]] # "my_analysis"

# Build a simple pipeline and run it
pip_add(p, "load", \(n = 5) seq_len(n))
pip_add(p, "double", \(x = ~load) x * 2) # x depends on load's output
p
pip_run(p)
p[["out"]] # list of outputs, one per step

Remove a step

Description

If other steps depend on the step to be removed, an error is given and the removal is blocked, unless recursive was set to TRUE. In recursive mode, the selected step and all downstream dependent steps are removed together.

Usage

pip_remove(x, step, recursive = FALSE)

Arguments

x

A pipeflow pip

step

string the name of the step to be removed.

recursive

logical if TRUE the step is removed together with all its downstream dependencies.

Value

The updated pipeline, invisibly.

Examples

p <- pip_new() |>
  pip_add("load", \(x = 1) x) |>
  pip_add("transform", \(x = ~load) x * 2) |>
  pip_add("model", \(x = ~transform) x + 10)

# Removing a leaf step (nothing depends on it) works directly
pip_remove(p, "model")
p                        # "load", "transform"

# Trying to remove a step that others depend on raises an error:
# pip_remove(p, "load")  # Error!

# recursive = TRUE removes the step and all its downstream dependents
pip_remove(p, "load", recursive = TRUE)
p                        # pipeline is now empty

Rename a step

Description

Renames the selected step and updates dependency references in downstream steps.

Usage

pip_rename(x, from, to)

Arguments

x

A pipeflow pip

from

Existing step name

to

New step name

Value

The updated pipeline, invisibly.

Examples

p <- pip_new() |>
  pip_add("s1", \(x = 1) x) |>
  pip_add("s2", \(x = ~s1) x + 1)           # "s2" depends on "s1"

# Downstream dependency references are updated automatically
pip_rename(p, from = "s1", to = "load_data")
p

#' # Trying to rename to an existing step name raises an error:
try(pip_rename(p, "load_data", to = "s2"))  # step 's2' already exists!

Replace a step

Description

Replaces a step's function while keeping it in the same position in the pipeline. Downstream steps are automatically marked as outdated and will re-run on the next pip_run().

Usage

pip_replace(x, step, fun, tags = character(0))

Arguments

x

A pipeflow pipeline object.

step

Step name.

fun

Function to execute for the step.

tags

Optional character vector of tags belonging to the step. Can also be adjusted later using ⁠[pip_tag()]⁠.

Value

The updated pipeline, invisibly.

Examples

p <- pip_new() |>
    pip_add("load", \(n = 5) seq_len(n)) |>
    pip_add("double", \(x = ~load) x * 2)
pip_run(p)
p

# Replace "load" — downstream steps are automatically marked "outdated"
pip_replace(p, "load", \(n = 3) seq_len(n))
p

# Re-run to bring everything up to date
pip_run(p)
p

Run a pipeline

Description

Executes all pending steps in order. Steps already in state "done" are skipped unless force = TRUE.

Usage

pip_run(
  x,
  lgr = pipeflow_lgr,
  force = FALSE,
  progress = NULL,
  recursive = FALSE
)

Arguments

x

A pipeflow pip or view

lgr

A logging function of the form ⁠function(level, msg, ...)⁠. To suppress logging, you can set lgr = NULL.

force

Logical indicating if all steps should be forced to run, regardless of whether they are outdated or not.

progress

Optional callback of the form ⁠function(value, detail)⁠ called before each step.

recursive

If TRUE and a step returns a pipeline object, the current run is aborted and continues from the returned pipeline. Useful for dynamic or self-modifying pipelines.

Details

When x is a view, requested rows are run together with required upstream dependencies.

Value

The updated pipeline or view, invisibly.

See Also

vignette("v06-self-modify-pipeline", package = "pipeflow") for an advanced example of recursive/dynamic pipelines.

Examples

p <- pip_new() |>
  pip_add("load", \(n = 3) seq_len(n)) |>
  pip_add("square", \(x = ~load) x^2) |>
  pip_add("total", \(x = ~square) sum(x))

pip_run(p)
p

# Already-done steps are skipped on a second run
pip_run(p) # all steps skipped

# lgr = NULL suppresses log output
pip_run(p, lgr = NULL)

# force = TRUE re-executes every step regardless of state
pip_run(p, force = TRUE)

# Run only a subset of steps via a view;
# upstream dependencies are automatically included
v <- pip_view(p, i = "total")
pip_run(v)

Set independent parameters

Description

Updates the default values of tunable parameters across the pipeline. Affected steps and their downstream dependents are automatically marked as outdated.

Usage

pip_set_params(p, params = list())

Arguments

p

A pipeflow pip or view

params

Named list of parameters to set.

Details

Parameters of locked steps are never changed and their state remains unchanged.

Value

The updated pipeline or view, invisibly.

Examples

p <- pip_new() |>
  pip_add("load", \(n = 10) seq_len(n)) |>
  pip_add("scale", \(x = ~load, factor = 0.5) x * factor)

# See all adjustable parameters before running
pip_get_params(p) # list(n = 10, factor = 0.5)

# Updating params marks affected steps (and their dependents) outdated
pip_set_params(p, params = list(n = 5, factor = 2.0))
p

pip_run(p)
p

Add tags to selected steps

Description

Adds tags to existing tags for all steps in the pipeline unless p is a view, in which case tags are only added for steps covered by the view. Locked steps are skipped and not updated.

Usage

pip_tag(p, tags = character())

Arguments

p

A pipeflow pip or view.

tags

Character vector of tags to add for each selected step.

Value

The updated pipeline or view, invisibly.

Examples

p <- pip_new() |>
  pip_add("load", \(x = 1) x) |>
  pip_add("fit", \(x = ~load) x + 1)

# Tag every step in the pipeline at once
pip_tag(p, tags = c("daily", "core"))
p[["pipeline"]][["tags"]] # both steps have c("daily", "core")

# Add an extra tag to only one step via a view
v <- pip_view(p, i = "fit")
pip_tag(v, tags = "model")
p[["pipeline"]][["tags"]] # "fit" also has "model"

Unlock selected steps

Description

Unlocks all selected steps in the pipeline unless p is a view, in which case only steps covered by the view are unlocked.

Usage

pip_unlock(p)

Arguments

p

A pipeflow pip or view.

Value

The updated pipeline or view, invisibly.

Examples

p <- pip_new() |>
  pip_add("load", \(x = 1) x) |>
  pip_add("fit", \(x = ~load) x * 2)

# Lock all steps, then unlock to restore normal execution
pip_lock(p)
p[["pipeline"]][["locked"]] # TRUE, TRUE

pip_unlock(p)
p[["pipeline"]][["locked"]] # FALSE, FALSE

Remove tags from selected steps

Description

Removes tags from existing tags for all steps in the pipeline unless p is a view, in which case tags are only removed for steps covered by the view. Locked steps are skipped and not updated.

Usage

pip_untag(p, tags = character())

Arguments

p

A pipeflow pip or view.

tags

Character vector of tags to remove for each selected step.

Value

The updated pipeline or view, invisibly.

Examples

p <- pip_new() |>
  pip_add("load", \(x = 1) x, tags = c("daily", "core")) |>
  pip_add("fit", \(x = ~load) x + 1, tags = c("daily", "model"))

# Remove "daily" from all steps
pip_untag(p, tags = "daily")
# "load" retains "core"; "fit" retains "model"
p[["pipeline"]][["tags"]]

Create a pipeline view

Description

Creates a filtered view showing only a selected subset of steps. A view references the underlying pipeline without copying it, so operations like pip_run() and pip_set_params() applied to a view affect only the selected steps.

Usage

pip_view(
  x,
  i = integer(),
  filter = list(),
  tags = character(),
  fixed = TRUE,
  ...
)

Arguments

x

A pipeflow pipeline or view.

i

Optional row indices or step names to keep.

filter

A named list of filters to apply. Each element can be a character vector specifying the values to keep for the corresponding property or, if fixed is FALSE, a regular expression. See examples for usage.

tags

Tag filter (character). Keeps steps with any matching tag.

fixed

If TRUE, values in filter are treated as fixed strings, otherwise they are treated as regular expressions.

...

further args passed to grepl (only in effect when fixed is FALSE).

Value

A pipeflow_view object.

Examples


p <- pip_new()
pip_add(p, "load_raw", \(x = 1) x,
  tags = c("io", "core", "daily")
)
pip_add(p, "fit_model", \(x = 2) x + 1,
  tags = c("model")
)
pip_add(p, "eval_model", \(x = ~fit_model) x,
  tags = c("model", "daily", "report")
)

# Filter by a fixed column value (one or more states)
pip_view(p, filter = list(state = "new"))

# Combine filters: step pattern AND state
pip_view(p, filter = list(step = "model", state = "new"))

# Filter by tag — keeps steps that have *any* of the given tags
pip_view(p, tags = "daily")

# Combine explicit step selection with a filter (intersection)
pip_view(p,
  i      = c("load_raw", "fit_model"),
  filter = list(state = "new")
)

# Select by integer row indices
pip_view(p, i = c(1L, 2L), filter = list(state = "new"))

# Use a regex pattern to match step names
pip_view(p, filter = list(step = "_model$"), fixed = FALSE)

# Views are composable: create a view-of-view for progressive narrowing
v1 <- pip_view(p, tags = "daily")
print(v1) # load_raw, eval_model
v2 <- pip_view(v1, tags = "report")
print(v2) # eval_model only

Add pipeline step

Description

A pipeline consists of a series of steps, which usually are added one by one. Each step is made up of a function computing something once the pipeline is run. This function can be an existing R function (e.g. mean()) or an anonymous/lambda function specifically defined for the pipeline. One useful feature is that function parameters can refer to results of earlier pipeline steps using the syntax x = ~earlier_step_name - see the Examples for more details.

Usage

pipe_add(
  pip,
  step,
  fun,
  params = list(),
  description = "",
  group = step,
  keepOut = FALSE
)

Arguments

pip

Pipeline object

step

string the name of the step. Each step name must be unique.

fun

function or name of the function to be applied at the step. Both existing and anonymous/lambda functions can be used. All function parameters must have default values. If a parameter is missing a default value in the function signature, alternatively, it can be set via the params argument (see Examples section with mean() function).

params

list list of parameters to set or overwrite parameters of the passed function.

description

string optional description of the step

group

string output collected after pipeline execution (see pipe_collect_out() is grouped by the defined group names. By default, this is the name of the step, which comes in handy when the pipeline is copy-appended multiple times to keep the results of the same function/step grouped at one place.

keepOut

logical if FALSE (default) the output of the step is not collected when calling pipe_collect_out() after the pipeline run. This option is used to only keep the results that matter and skip intermediate results that are not needed. See also function pipe_collect_out() for more details.

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_add() instead.

Examples

# Add steps with lambda functions
p <- pipe_new("myPipe", data = 1)
pipe_add(p, "s1", \(x = ~data) 2 * x) # use input data
pipe_add(p, "s2", \(x = ~data, y = ~s1) x * y)
try(pipe_add(p, "s2", \(z = 3) 3)) # error: step 's2' exists already
try(pipe_add(p, "s3", \(z = ~foo) 3)) # dependency 'foo' not found
p

# Add step with existing function
p <- pipe_new("myPipe", data = c(1, 2, NA, 3, 4))
try(pipe_add(p, "calc_mean", mean)) # default value for x is missing
pipe_add(p, "calc_mean", mean, params = list(x = ~data, na.rm = TRUE))
p |>
  pipe_run() |>
  pipe_get_out("calc_mean")

# Step description
p <- pipe_new("myPipe", data = 1:10)
pipe_add(p, "s1", \(x = ~data) 2 * x, description = "multiply by 2")
print(p, verbose = TRUE) # print all columns including description


# Group output
p <- pipe_new("myPipe", data = data.frame(x = 1:2, y = 3:4))
pipe_add(p, "prep_x", \(data = ~data) data$x, group = "prep")
pipe_add(p, "prep_y", \(data = ~data) (data$y)^2, group = "prep")
pipe_add(p, "sum", \(x = ~prep_x, y = ~prep_y) x + y)
p |>
  pipe_run() |>
  pipe_collect_out(all = TRUE)

Append two pipelines

Description

When appending, pipeflow takes care of potential name clashes with respect to step names and dependencies, that is, if needed, it will automatically adapt step names and dependencies to make sure they are unique in the merged pipeline.

Usage

pipe_append(pip, p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")

Arguments

pip

Pipeline object to be appended to.

p

Pipeline object to be appended.

outAsIn

logical if TRUE, output of first pipeline is used as input for the second pipeline.

tryAutofixNames

logical if TRUE, name clashes are tried to be automatically resolved by appending the 2nd pipeline's name. Only set to FALSE, if you know what you are doing.

sep

string separator used when auto-resolving step names

Value

returns new combined Pipeline object.

Lifecycle

Deprecated. Legacy API. Use pip_bind() instead.

Examples

# Append pipeline
p1 <- pipe_new("pipe1")
pipe_add(p1, "step1", \(x = 1) x)
p2 <- pipe_new("pipe2")
pipe_add(p2, "step2", \(y = 1) y)
p1 |> pipe_append(p2)

# Append pipeline with potential name clashes
p3 <- pipe_new("pipe3")
pipe_add(p3, "step1", \(z = 1) z)
p1 |>
  pipe_append(p2) |>
  pipe_append(p3)

# Use output of first pipeline as input for second pipeline
p1 <- pipe_new("pipe1", data = 8)
p2 <- pipe_new("pipe2")
pipe_add(p1, "square", \(x = ~data) x^2)
pipe_add(p2, "log2", \(x = ~data) log2(x))

p12 <- p1 |> pipe_append(p2, outAsIn = TRUE)
p12 |>
  pipe_run() |>
  pipe_get_out("log2")
p12

# Custom name separator for adapted step names
p1 |> pipe_append(p2, sep = "___")

Append string to all step names

Description

Appends string to all step names and takes care of updating step dependencies accordingly.

Usage

pipe_append_to_step_names(pip, postfix, sep = ".")

Arguments

pip

Pipeline object

postfix

string to be appended to each step name.

sep

string separator between step name and postfix.

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

p <- pipe_new("pipe")
pipe_add(p, "step1", \(x = 1) x)
pipe_add(p, "step2", \(y = 1) y)
pipe_append_to_step_names(p, "new")
p
pipe_append_to_step_names(p, "foo", sep = "__")
p

Clone pipeline

Description

Creates a copy of a pipeline object.

Usage

pipe_clone(pip, deep = FALSE)

Arguments

pip

Pipeline object

deep

logical whether to perform a deep copy

Value

returns the copied Pipeline object

Lifecycle

Deprecated. Legacy API. Use pip_clone() instead.

Examples

p1 <- pipe_new("pipe")
pipe_add(p1, "step1", \(x = 1) x)
p2 <- pipe_clone(p1)
pipe_add(p2, "step2", \(y = 1) y)
p1
p2

Collect output from entire pipeline

Description

Collects output afer pipeline run, by default, from all steps for which keepOut was set to TRUE when steps were added (see pipe_add()). The output is grouped by the group names (see group parameter in pipe_add()), which by default are set identical to the step names.

Usage

pipe_collect_out(pip, groupBy = "group", all = FALSE)

Arguments

pip

Pipeline object

groupBy

string column of pipeline by which to group the output.

all

logical if TRUE all output is collected regardless of the keepOut flag. This can be useful for debugging.

Value

list containing the output, named after the groups, which, by default, are the steps.

Lifecycle

Deprecated. Legacy API. Use pip_collect_out() instead.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "step1", \(x = ~data) x + 2)
pipe_add(p, "step2", \(x = ~step1) x + 2, keepOut = TRUE)
pipe_run(p)
pipe_collect_out(p)
pipe_collect_out(p, all = TRUE) |> str()

# Grouped output
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "step1", \(x = ~data) x + 2, group = "add")
pipe_add(p, "step2", \(x = ~step1, y = 2) x + y, group = "add")
pipe_add(p, "step3", \(x = ~data) x * 3, group = "mult")
pipe_add(p, "step4", \(x = ~data, y = 2) x * y, group = "mult")
p

pipe_run(p)
pipe_collect_out(p, all = TRUE) |> str()

# Grouped by state
pipe_set_params(p, list(y = 5))
p

pipe_collect_out(p, groupBy = "state", all = TRUE) |> str()

Discard steps from the pipeline

Description

Discard all steps that match a given pattern.

Usage

pipe_discard_steps(pip, pattern, recursive = FALSE, fixed = TRUE, ...)

Arguments

pip

Pipeline object

pattern

string containing a regular expression (or character string for fixed = TRUE) to be matched.

recursive

logical if TRUE the step is removed together with all its downstream dependencies.

fixed

logical If TRUE, pattern is a string to be matched as is. Overrides all conflicting arguments.

...

further arguments passed to grep().

Value

the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~add1) x + 2)
pipe_add(p, "mult3", \(x = ~add1) x * 3)
pipe_add(p, "mult4", \(x = ~add2) x * 4)
p

pipe_discard_steps(p, "mult")
p

# Re-add steps
pipe_add(p, "mult3", \(x = ~add1) x * 3)
pipe_add(p, "mult4", \(x = ~add2) x * 4)
p

# Discarding 'add1' does not work ...
try(pipe_discard_steps(p, "add1"))

# ... unless we enforce to remove its downstream dependencies as well
pipe_discard_steps(p, "add1", recursive = TRUE)
p

# Trying to discard non-existent steps is just ignored
pipe_discard_steps(p, "non-existent")

Get data

Description

Get the data set for the pipeline

Usage

pipe_get_data(pip)

Arguments

pip

Pipeline object

Value

the output defined in the data step, which by default is the first step of the pipeline

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_get_data(p)
pipe_set_data(p, 3:4)
pipe_get_data(p)

Get step dependencies

Description

Get step dependencies

Usage

pipe_get_depends(pip)

pipe_get_depends_down(pip, step, recursive = TRUE)

pipe_get_depends_up(pip, step, recursive = TRUE)

Arguments

pip

Pipeline object

step

string name of step

recursive

logical if TRUE, dependencies of dependencies are also returned.

Value

Methods

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

# pipe_get_depends
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y)
pipe_get_depends(p)

# pipe_get_depends_down
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y)
pipe_add(p, "mult3", \(x = ~add1) x * 3)
pipe_add(p, "mult4", \(x = ~add2) x * 4)
pipe_get_depends_down(p, "add1")
pipe_get_depends_down(p, "add1", recursive = FALSE)

# pipe_get_depends_up
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y)
pipe_add(p, "mult3", \(x = ~add1) x * 3)
pipe_add(p, "mult4", \(x = ~add2) x * 4)
pipe_get_depends_up(p, "mult4")
pipe_get_depends_up(p, "mult4", recursive = FALSE)

Pipeline graph

Description

Get the pipeline as a graph with nodes and edges.

Usage

pipe_get_graph(pip, groups = NULL)

Arguments

pip

Pipeline object

groups

character if not NULL, only steps belonging to the given groups are considered.

Value

list with two data frames, one for nodes and one for edges ready to be used with the visNetwork::visNetwork() function of the visNetwork package.

Lifecycle

Deprecated. Legacy API. Use pip_get_graph() instead.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = ~add1) x + y)
pipe_add(p, "mult1", \(x = ~add1, y = ~add2) x * y)
graph <- pipe_get_graph(p)
graph

if (require("visNetwork", quietly = TRUE)) {
  do.call(visNetwork, args = graph)
}

Get output of given step

Description

Get output of given step

Usage

pipe_get_out(pip, step)

Arguments

pip

Pipeline object

step

string name of step

Value

the output at the given step.

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

See Also

pipe_collect_out() to collect output of multiple steps.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y)
pipe_run(p)
pipe_get_out(p, "add1")
pipe_get_out(p, "add2")

Get pipeline parameters

Description

Retrieves unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps.

Usage

pipe_get_params(pip, ignoreHidden = TRUE)

pipe_get_params_at_step(pip, step, ignoreHidden = TRUE)

pipe_get_params_unique(pip, ignoreHidden = TRUE)

pipe_get_params_unique_json(pip, ignoreHidden = TRUE)

Arguments

pip

Pipeline object

ignoreHidden

logical if TRUE, hidden parameters (i.e. all paramater names starting with a dot) are ignored and thus not returned.

step

string name of step

Value

Lifecycle

Deprecated. Legacy API. Use pip_get_params() instead.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

# pipe_get_params
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z)
pipe_add(p, "add3", \() 1 + 2)
pipe_get_params(p, ) |> str()
pipe_get_params(p, ignoreHidden = FALSE) |> str()

# pipe_get_params_at_step
pipe_get_params_at_step(p, "add2")
pipe_get_params_at_step(p, "add2", ignoreHidden = FALSE)
pipe_get_params_at_step(p, "add3")

# pipe_get_params_unique
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z)
pipe_add(p, "mult1", \(x = 4, y = 5, .z = 6, b = ~add2) x * y * b)
pipe_get_params_unique(p)
pipe_get_params_unique(p, ignoreHidden = FALSE)

# get_params_unique_json
pipe_get_params_unique_json(p)
pipe_get_params_unique_json(p, ignoreHidden = FALSE)

Get step information

Description

Get step information

Usage

pipe_get_step(pip, step)

pipe_get_step_names(pip)

pipe_get_step_number(pip, step)

pipe_has_step(pip, step)

Arguments

pip

Pipeline object

step

string name of step

Value

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_has_step() instead.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = 2, z = ~add1) x + y + z)
pipe_run(p)

# pipe_get_step_names
pipe_get_step_names(p)

# get_step_number
pipe_get_step_number(p, "add1")
pipe_get_step_number(p, "add2")

# pipe_has_step
pipe_has_step(p, "add1")
pipe_has_step(p, "foo")

# pipe_get_step
add1 <- pipe_get_step(p, "add1")
add1

add1[["params"]]

add1[["fun"]]

try(p$get_step("foo")) # error: step 'foo' does not exist

Insert step

Description

Insert step

Usage

pipe_insert_after(pip, afterStep, step, ...)

pipe_insert_before(pip, beforeStep, step, ...)

Arguments

pip

Pipeline object

afterStep

string name of step after which to insert

step

string name of step to insert

...

further arguments passed to pipe_add()

beforeStep

string name of step before which to insert

Value

returns the Pipeline object invisibly

Methods

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

# pipe_insert_after
p <- pipe_new("pipe", data = 1)
pipe_add(p, "f1", \(x = 1) x)
pipe_add(p, "f2", \(x = ~f1) x)
pipe_insert_after(p, "f1", step = "after_f1", \(x = ~f1) x)
p

# insert_before
pipe_insert_before(p, "f2", step = "before_f2", \(x = ~f1) 2 * x)
p

Length of the pipeline

Description

Length of the pipeline

Usage

pipe_length(pip)

Arguments

pip

Pipeline object

Value

numeric length of pipeline, that is, the total number of steps

Lifecycle

Deprecated. Legacy API. Use length() instead.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "f1", \(x = 1) x)
pipe_add(p, "f2", \(y = 1) y)
p
pipe_length(p)

Lock steps

Description

Locking a step means that both its parameters and its output (given it has output) are locked such that neither setting new pipeline parameters nor future pipeline runs can change the current parameter and output content. To unlock a locked step, use pipe_unlock_step().

Usage

pipe_lock_step(pip, step)

pipe_unlock_step(pip, step)

Arguments

pip

Pipeline object

step

string name of step to lock or unlock

Value

the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_lock() instead.

Deprecated. Legacy API. Use pip_unlock() instead.

Examples

# pipe_lock_step
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = 1, data = ~data) x + data)
pipe_add(p, "add2", \(x = 1, data = ~data) x + data)
pipe_run(p)
pipe_get_out(p, "add1")
pipe_get_out(p, "add2")
pipe_lock_step(p, "add1")

pipe_set_data(p, 3)
pipe_set_params(p, list(x = 3))
pipe_run(p)
pipe_get_out(p, "add1")
pipe_get_out(p, "add2")

# pipe_unlock_step
pipe_unlock_step(p, "add1")
pipe_set_params(p, list(x = 3))
pipe_run(p)
pipe_get_out(p, "add1")

Create new pipeline

Description

A new pipeline is always initialized with one 'data' step, which basically is a function returning the data.

Usage

pipe_new(name, data = NULL, logger = NULL)

Arguments

name

the name of the Pipeline

data

optional data used at the start of the pipeline. The data also can be set later using the pipe_set_data() function.

logger

custom logger to be used for logging. If no logger is provided, the default logger is used, which should be sufficient for most use cases. If you do want to use your own custom log function, you need to provide a function that obeys the following form:

⁠function(level, msg, ...) { your custom logging code here }⁠

The level argument is a string and will be one of info, warn, or error. The msg argument is a string containing the message to be logged. The ... argument is a list of named parameters, which can be used to add additional information to the log message. Currently, this is only used to add the context in case of a step giving a warning or error.

Note that with the default logger, the log layout can be altered any time via set_log_layout().

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_new() instead.

Examples

data <- data.frame(x = 1:2, y = 3:4)
p <- pipe_new("myPipe", data = data)
p |>
  pipe_run() |>
  pipe_get_out("data")

# Setting data later
p <- pipe_new("myPipe")
pipe_get_data(p)

p <- pipe_set_data(p, data)
pipe_get_data(p)
p |>
  pipe_run() |>
  pipe_get_out("data")

# Initialize with custom logger
my_logger <- function(level, msg, ...) {
  cat(level, msg, "\n")
}
p <- pipe_new("myPipe", data = data, logger = my_logger)
p |>
  pipe_run() |>
  pipe_get_out("data")

Pop steps from the pipeline

Description

Use this function to drop steps from the end of the pipeline.

Usage

pipe_pop_step(pip)

pipe_pop_steps_after(pip, step)

pipe_pop_steps_from(pip, step)

Arguments

pip

Pipeline object

step

string name of step

Value

string the name of the step that was removed

Methods

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

# pipe_pop_step
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "f1", \(x = 1) x)
pipe_add(p, "f2", \(y = 1) y)
p
pipe_pop_step(p)
p

# pipe_pop_steps_after
pipe_add(p, "f2", \(y = 1) y)
pipe_add(p, "f3", \(z = 1) z)
p
pipe_pop_steps_after(p, "f1")
p

# pipe_pop_steps_from
pipe_add(p, "f2", \(y = 1) y)
pipe_add(p, "f3", \(z = 1) z)
p
pipe_pop_steps_from(p, "f1")
p

Print the pipeline as a table

Description

Print the pipeline as a table

Usage

pipe_print(pip, verbose = FALSE)

Arguments

pip

Pipeline object

verbose

logical if TRUE, print all columns of the pipeline, otherwise only the most relevant columns are displayed.

Value

the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use print() instead.

Examples

p <- pipe_new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
pipe_print(p)
pipe_print(p, verbose = TRUE)

# Also works with standard print function
print(p)
print(p, verbose = TRUE)

Remove certain step from the pipeline.

Description

Can be used to remove any given step. If other steps depend on the step to be removed, an error is given and the removal is blocked, unless recursive was set to TRUE.

Usage

pipe_remove_step(pip, step, recursive = FALSE)

Arguments

pip

Pipeline object

step

string the name of the step to be removed

recursive

logical if TRUE the step is removed together with all its downstream dependencies.

Value

the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_remove() instead.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = ~add1) x + y)
pipe_add(p, "mult1", \(x = 1, y = ~add2) x * y)
p

pipe_remove_step(p, "mult1")
p

try(pipe_remove_step(p, "add1"))
pipe_remove_step(p, "add1", recursive = TRUE)
p

Rename step

Description

Safely rename a step in the pipeline. If new step name would result in a name clash, an error is given.

Usage

pipe_rename_step(pip, from, to)

Arguments

pip

Pipeline object

from

string the name of the step to be renamed.

to

string the new name of the step.

Value

the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_rename() instead.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = ~add1) x + y)
p

try(pipe_rename_step(p, from = "add1", to = "add2"))

pipe_rename_step(p, from = "add1", to = "first_add")
p

Replace pipeline step

Description

Replaces an existing pipeline step.

Usage

pipe_replace_step(
  pip,
  step,
  fun,
  params = list(),
  description = "",
  group = step,
  keepOut = FALSE
)

Arguments

pip

Pipeline object

step

string the name of the step. Each step name must be unique.

fun

function or name of the function to be applied at the step. Both existing and anonymous/lambda functions can be used. All function parameters must have default values. If a parameter is missing a default value in the function signature, alternatively, it can be set via the params argument (see Examples section with mean() function).

params

list list of parameters to set or overwrite parameters of the passed function.

description

string optional description of the step

group

string output collected after pipeline execution (see function pipe_collect_out()) is grouped by the defined group names. By default, this is the name of the step, which comes in handy when the pipeline is copy-appended multiple times to keep the results of the same function/step grouped at one place.

keepOut

logical if FALSE (default) the output of the step is not collected when calling pipe_collect_out() after the pipeline run. This option is used to only keep the results that matter and skip intermediate results that are not needed. See also function pipe_collect_out() for more details.

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_replace() instead.

See Also

pipe_add()

Examples

p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "add2", \(x = ~data, y = 2) x + y)
pipe_add(p, "mult", \(x = 1, y = 2) x * y, keepOut = TRUE)
pipe_run(p) |> pipe_collect_out()
pipe_replace_step(p, "mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
pipe_run(p) |> pipe_collect_out()
try(pipe_replace_step(p, "foo", \(x = 1) x)) # step 'foo' does not exist

Reset pipeline

Description

Resets the pipeline to the state before it was run. This means that all output is removed and the state of all steps is reset to 'New'.

Usage

pipe_reset(pip)

Arguments

pip

Pipeline object

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "f1", \(x = 1) x)
pipe_add(p, "f2", \(y = 1) y)
pipe_run(p, )
p

pipe_reset(p)
p

Run pipeline

Description

Runs all new and/or outdated pipeline steps.

Usage

pipe_run(
  pip,
  force = FALSE,
  recursive = TRUE,
  cleanUnkept = FALSE,
  progress = NULL,
  showLog = TRUE
)

Arguments

pip

Pipeline object

force

logical if TRUE all steps are run regardless of whether they are outdated or not.

recursive

logical if TRUE and a step returns a new pipeline, the run of the current pipeline is aborted and the new pipeline is run recursively.

cleanUnkept

logical if TRUE all output that was not marked to be kept is removed after the pipeline run. This option can be useful if temporary results require a lot of memory.

progress

function this parameter can be used to provide a custom progress function of the form ⁠function(value, detail)⁠, which will show the progress of the pipeline run for each step, where value is the current step number and detail is the name of the step.

showLog

logical should the steps be logged during the pipeline run?

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_run() instead.

Examples

# Simple pipeline
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "add2", \(x = ~add1, z = 2) x + z)
pipe_add(p, "final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p |>
  pipe_run() |>
  pipe_collect_out()
pipe_set_params(p, list(z = 4))  # outdates steps add2 and final
p

p |> pipe_run() |> pipe_collect_out()

pipe_run(p, cleanUnkept = TRUE)
p

# Recursive pipeline (for advanced users)
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "new_pipe", \(x = ~add1) {
    p2 <- pipe_new("new_pipe", data = x)
    pipe_add(p2, "add1", \(x = ~data) x + 1)
    pipe_add(p2, "add2", \(x = ~add1) x + 2, keepOut = TRUE)
  }
)
p |> pipe_run() |> pipe_collect_out()

# Run pipeline with progress bar
p <- pipe_new("pipe", data = 1)
pipe_add(p, "first step", \() Sys.sleep(0.5))
pipe_add(p, "second step", \() Sys.sleep(0.5))
pipe_add(p, "last step", \() Sys.sleep(0.5))
pb <- txtProgressBar(min = 1, max = pipe_length(p), style = 3)
fprogress <- function(value, detail) {
   setTxtProgressBar(pb, value)
}
pipe_run(p, progress = fprogress, showLog = FALSE)

Run specific step

Description

Run given pipeline step possibly together with upstream and/or downstream dependencies.

Usage

pipe_run_step(
  pip,
  step,
  upstream = TRUE,
  downstream = FALSE,
  cleanUnkept = FALSE
)

Arguments

pip

Pipeline object

step

string name of step

upstream

logical if TRUE, run all dependent upstream steps first.

downstream

logical if TRUE, run all depdendent downstream afterwards.

cleanUnkept

logical if TRUE all output that was not marked to be kept is removed after the pipeline run. This option can be useful if temporary results require a lot of memory.

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_run() instead.

Examples

p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "add2", \(x = ~add1, z = 2) x + z)
pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y)
pipe_run_step(p, "add2")

pipe_run_step(p, "add2", downstream = TRUE)

pipe_run_step(p, "mult", upstream = TRUE)

Set data

Description

Set data in first step of pipeline.

Usage

pipe_set_data(pip, data)

Arguments

pip

Pipeline object

data

initial data set.

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p |>
  pipe_run() |>
  pipe_collect_out()

pipe_set_data(p, 3)
p |>
  pipe_run() |>
  pipe_collect_out()

Split-multiply pipeline by list of data sets

Description

This function can be used to apply the pipeline repeatedly to various data sets. For this, the pipeline split-copies itself by the list of given data sets. Each sub-pipeline will have one of the data sets set as input data. The step names of the sub-pipelines will be the original step names plus the name of the data set.

Usage

pipe_set_data_split(
  pip,
  dataList,
  toStep = character(),
  groupBySplit = TRUE,
  sep = "."
)

Arguments

pip

Pipeline object

dataList

list of data sets

toStep

string step name marking optional subset of the pipeline, to which the data split should be applied to.

groupBySplit

logical whether to set step groups according to data split.

sep

string separator to be used between step name and data set name when creating the new step names.

Value

new combined Pipeline with each sub-pipeline having set one of the data sets.

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

# Split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipe_set_data_split(p, dataList)
p

p |>
  pipe_run() |>
  pipe_collect_out() |>
  str()

# Don't group output by split
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipe_set_data_split(p, dataList, groupBySplit = FALSE)
p

p |>
  pipe_run() |>
  pipe_collect_out() |>
  str()

# Split up to certain step
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y)
pipe_add(p, "average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE)
p
pipe_get_depends(p)[["average_result"]]

pipe_set_data_split(p, dataList, toStep = "mult")
p
pipe_get_depends(p)[["average_result"]]

p |>
  pipe_run() |>
  pipe_collect_out() |>
  str()

Change output flag

Description

Change the keepOut flag at a given pipeline step, which determines whether the output of that step is collected when calling pipe_collect_out()⁠after the pipeline was run. See column⁠keepOut' when printing a pipeline to view the status.

Usage

pipe_set_keep_out(pip, step, keepOut = TRUE)

Arguments

pip

Pipeline object

step

string name of step

keepOut

logical whether to keep output of step

Value

the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
pipe_add(p, "add2", \(x = ~data, y = 2) x + y)
pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y)
p |>
  pipe_run() |>
  pipe_collect_out()

pipe_set_keep_out(p, "add1", keepOut = FALSE)
pipe_set_keep_out(p, "mult", keepOut = TRUE)
p |>
  pipe_run() |>
  pipe_collect_out()

Set pipeline parameters

Description

Set unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps. Trying to set parameters that don't exist in the pipeline is ignored, by default, with a warning.

Usage

pipe_set_params(pip, params, warnUndefined = TRUE)

Arguments

pip

Pipeline object

params

list of parameters to be set

warnUndefined

logical whether to give a warning when trying to set a parameter that is not defined in the pipeline.

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_set_params() instead.

Examples

p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 2) x + y)
pipe_add(p, "add2", \(x = ~data, y = 3) x + y)
pipe_add(p, "mult", \(x = 4, z = 5) x * z)
pipe_get_params(p)
pipe_set_params(p, params = list(x = 3, y = 3))
pipe_get_params(p)
pipe_set_params(p, params = list(x = 5, z = 3))
pipe_get_params(p)

suppressWarnings(
  pipe_set_params(p, list(foo = 3)) # gives warning as 'foo' is undefined
)
pipe_set_params(p, list(foo = 3), warnUndefined = FALSE)

Set parameters at step

Description

Set unbound function parameters defined at given pipeline step where 'unbound' means parameters that are not linked to other steps. If one or more parameters don't exist, an error is given.

Usage

pipe_set_params_at_step(pip, step, params)

Arguments

pip

Pipeline object

step

string the name of the step

params

list of parameters to be set

Value

returns the Pipeline object invisibly

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 2, z = 3) x + y)
pipe_set_params_at_step(p, step = "add1", params = list(y = 5, z = 6))
pipe_get_params(p)

try(
  pipe_set_params_at_step(p, step = "add1", params = list(foo = 3))
)

Split-up pipeline

Description

Splits pipeline into its independent parts. This can be useful, for example, to split-up the pipeline in order to run each part in parallel.

Usage

pipe_split(pip)

Arguments

pip

Pipeline object

Value

list of Pipeline objects

Lifecycle

Deprecated. Legacy API. Use pip_new() and related pip_* functions.

Examples

# Example for two independent calculation paths
p <- pipe_new("pipe", data = 1)
pipe_add(p, "f1", \(x = ~data) x)
pipe_add(p, "f2", \(x = 1) x)
pipe_add(p, "f3", \(x = ~f1) x)
pipe_add(p, "f4", \(x = ~f2) x)
pipe_split(p)

# Example of split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipes <- pipe_set_data_split(p, dataList) |> pipe_split()
pipes

Print pipeflow objects

Description

Print pipeflow objects

Usage

## S3 method for class 'pipeflow_pip'
print(
  x,
  rows = integer(),
  cols = getOption("pipeflow.print.cols", default = "core"),
  topn = getOption("pipeflow.print.topn", default = 5),
  nrows = getOption("pipeflow.print.nrows", default = 50),
  row.names = getOption("pipeflow.print.rownames", default = TRUE),
  class = getOption("pipeflow.print.class", default = FALSE),
  header = TRUE,
  ...
)

## S3 method for class 'pipeflow_view'
print(x, header = TRUE, ...)

Arguments

x

A pipeflow pipeline or view.

rows

Row indices to be printed. If empty, all rows are printed.

cols

The columns to be printed. Can be either one of core or all to print the core or all columns, respectively, or an explicit character vector of columns to be printed.

topn

The number of rows to be printed from the beginning and end of tables with more than nrows rows.

nrows

The number of rows printed before truncation is enforced.

row.names

If TRUE, row indices will be printed alongside x.

class

If TRUE, the resulting output will include above each column its storage class (or a self-evident abbreviation thereof).

header

If TRUE, a header with the pipeline name and number of steps will be printed.

...

Other arguments passed to print.data.table

Value

Invisibly returns x.

Examples

p <- pip_new("demo") |>
  pip_add("load", \(n = 5) seq_len(n), tags = c("io", "raw")) |>
  pip_add("square", \(x = ~load) x^2, tags = "compute") |>
  pip_add("total", \(x = ~square) sum(x), tags = "compute")

print(p) # core columns: step, depends, tags, out, state
print(p, cols = "all") # all non-hidden columns
print(p, rows = 2:3) # print only steps 2 and 3
p <- pip_new() |>
  pip_add("s1", \(x = 1) x, tags = "io") |>
  pip_add("s2", \(x = ~s1) x + 1, tags = "model")

# A view header shows how many steps are selected out of the total
v <- pip_view(p, tags = "model")
print(v) # "<pipeflow_view> pipe view (1 of 2 steps)"

Set pipeflow log layout

Description

This function provides an easy way to set the basic log layout of the pipeline logging. For a fine-grained control of the logger, which you can retrieve via lgr::get_logger("pipeflow"), see e.g. the logger_config function from the lgr package.

Usage

set_log_layout(layout = c("text", "json"))

Arguments

layout

Layout name, which at this point can be either 'text' or 'json'.

Value

invisibly returns a Logger object

Examples

p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$run()

lg <- set_log_layout("json")
print(lg)

p$run()

set_log_layout("text")
p$run()