| Title: | Fast Interactive Data Analysis Pipelines |
| Version: | 0.3.0 |
| Maintainer: | Roman Pahl <roman.pahl@gmail.com> |
| Description: | A lightweight and intuitive framework for building interactive data analysis pipelines. You add R functions one by one, and 'pipeflow' wires them into a pipeline that stays consistent as you go. Modify, remove, or insert steps at any stage, manage all parameters in one place, fast execution (C++-powered DAG) for interactive use and Shiny backends. |
| License: | MIT + file LICENSE |
| URL: | https://rpahl.github.io/pipeflow/, https://github.com/rpahl/pipeflow |
| BugReports: | https://github.com/rpahl/pipeflow/issues |
| Depends: | R (≥ 4.2.0) |
| Imports: | data.table, jsonlite, lgr, methods, R6, Rcpp (≥ 1.1.1), stats, utils |
| Suggests: | ggplot2, gridExtra, knitr, mockery, rmarkdown, targets, testthat, visNetwork |
| LinkingTo: | Rcpp |
| VignetteBuilder: | knitr |
| Config/roxygen2/version: | 8.0.0 |
| Config/testthat/edition: | 3 |
| Config/testthat/parallel: | true |
| Encoding: | UTF-8 |
| Language: | en-US |
| NeedsCompilation: | yes |
| Packaged: | 2026-06-14 20:27:21 UTC; Roman |
| Author: | Roman Pahl [aut, cre] |
| Repository: | CRAN |
| Date/Publication: | 2026-06-15 07:50:07 UTC |
pipeflow: Lightweight, General-Purpose Data Analysis Pipelines
Description
A lightweight yet powerful framework for building robust data analysis pipelines. With 'pipeflow', you initialize a pipeline with your dataset and construct workflows step by step by adding R functions. You can modify, remove, or insert steps and parameters at any stage, while 'pipeflow' ensures the pipeline's integrity. Overall, this package offers a beginner-friendly framework that simplifies and streamlines the development of data analysis pipelines by making them modular, intuitive, and adaptable.
Author(s)
Maintainer: Roman Pahl roman.pahl@gmail.com
Authors:
Roman Pahl roman.pahl@gmail.com
See Also
Useful links:
Report bugs at https://github.com/rpahl/pipeflow/issues
Pipeline Class
Description
This class implements an analysis pipeline. A pipeline consists of a sequence of analysis steps, which can be added one by one. Each added step may or may not depend on one or more previous steps. The pipeline keeps track of the dependencies among these steps and will ensure that all dependencies are met on creation of the pipeline, that is, before the the pipeline is run. Once the pipeline is run, the output is stored in the pipeline along with each step and can be accessed later. Different pipelines can be bound together while preserving all dependencies within each pipeline.
Lifecycle
Deprecated. Legacy R6 interface. Use pip_new()
and pip_* functions.
Public fields
namestringname of the pipelinepipelinedata.tablethe pipeline where each row represents one step.
Methods
Public methods
Pipeline$new()
constructor
Usage
Pipeline$new(name, data = NULL, logger = NULL)
Arguments
namethe name of the Pipeline
dataoptional data used at the start of the pipeline. The data also can be set later using the
set_datafunction.loggercustom logger to be used for logging. If no logger is provided, the default logger is used, which should be sufficient for most use cases. If you do want to use your own custom log function, you need to provide a function that obeys the following form:
function(level, msg, ...) { your custom logging code here }The
levelargument is a string and will be one ofinfo,warn, orerror. Themsgargument is a string containing the message to be logged. The...argument is a list of named parameters, which can be used to add additional information to the log message. Currently, this is only used to add the context in case of a step giving a warning or error.Note that with the default logger, the log layout can be altered any time via
set_log_layout().
Returns
returns the Pipeline object invisibly
Examples
p <- Pipeline$new("myPipe", data = data.frame(x = 1:8))
p
# Passing custom logger
my_logger <- function(level, msg, ...) {
cat(level, msg, "\n")
}
p <- Pipeline$new("myPipe", logger = my_logger)
Pipeline$add()
Add pipeline step
Usage
Pipeline$add( step, fun, params = list(), description = "", group = step, keepOut = FALSE )
Arguments
stepstringthe name of the step. Each step name must be unique.funfunctionor name of the function to be applied at the step. Both existing and anonymous/lambda functions can be used. All function parameters must have default values. If a parameter is missing a default value in the function signature, alternatively, it can be set via theparamsargument (see Examples section withmean()function).paramslistlist of parameters to set or overwrite parameters of the passed function.descriptionstringoptional description of the stepgroupstringoutput collected after pipeline execution (see functioncollect_out) is grouped by the defined group names. By default, this is the name of the step, which comes in handy when the pipeline is copy-appended multiple times to keep the results of the same function/step grouped at one place.keepOutlogicalifFALSE(default) the output of the step is not collected when callingcollect_outafter the pipeline run. This option is used to only keep the results that matter and skip intermediate results that are not needed. See also functioncollect_outfor more details.
Returns
returns the Pipeline object invisibly
Examples
# Add steps with lambda functions
p <- Pipeline$new("myPipe", data = 1)
p$add("s1", \(x = ~data) 2*x) # use input data
p$add("s2", \(x = ~data, y = ~s1) x * y)
try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already
try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found
p
# Add step with existing function
p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4))
p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE))
p$run()$get_out("calc_mean")
# Step description
p <- Pipeline$new("myPipe", data = 1:10)
p$add("s1", \(x = ~data) 2*x, description = "multiply by 2")
print(p)
print(p, verbose = TRUE) # print all columns
# Group output
p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5))
p$add("prep_x", \(data = ~data) data$x, group = "prep")
p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep")
p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y)
p$run()$collect_out(all = TRUE)
Pipeline$append()
Append another pipeline
When appending, pipeflow takes care of potential name clashes with
respect to step names and dependencies, that is, if needed, it will
automatically adapt step names and dependencies to make sure they
are unique in the merged pipeline.
Usage
Pipeline$append(p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")
Arguments
pPipelineobject to be appended.outAsInlogicalifTRUE, output of first pipeline is used as input for the second pipeline.tryAutofixNameslogicalifTRUE, name clashes are tried to be automatically resolved by appending the 2nd pipeline's name. Only set toFALSE, if you know what you are doing.sepstringseparator used when auto-resolving step names
Returns
returns new combined Pipeline.
Examples
# Append pipeline
p1 <- Pipeline$new("pipe1")
p1$add("step1", \(x = 1) x)
p2 <- Pipeline$new("pipe2")
p2$add("step2", \(y = 1) y)
p1$append(p2)
# Append pipeline with potential name clashes
p3 <- Pipeline$new("pipe3")
p3$add("step1", \(z = 1) z)
p1$append(p2)$append(p3)
# Use output of first pipeline as input for second pipeline
p1 <- Pipeline$new("pipe1", data = 8)
p2 <- Pipeline$new("pipe2")
p1$add("square", \(x = ~data) x^2)
p2$add("log2", \(x = ~data) log2(x))
p12 <- p1$append(p2, outAsIn = TRUE)
p12$run()$get_out("log2")
p12
# Custom name separator
p1$append(p2, sep = "___")
Pipeline$append_to_step_names()
Appends string to all step names and takes care of updating step dependencies accordingly.
Usage
Pipeline$append_to_step_names(postfix, sep = ".")
Arguments
postfixstringto be appended to each step name.sepstringseparator between step name and postfix.
Returns
returns the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe")
p$add("step1", \(x = 1) x)
p$add("step2", \(y = 1) y)
p$append_to_step_names("new")
p
p$append_to_step_names("foo", sep = "__")
p
Pipeline$collect_out()
Collect output afer pipeline run, by default, from all
steps for which keepOut was set to TRUE. The output is grouped
by the group names (see group parameter in function add),
which by default are set identical to the step names.
Usage
Pipeline$collect_out(groupBy = "group", all = FALSE)
Arguments
groupBystringcolumn of pipeline by which to group the output.alllogicalifTRUEall output is collected regardless of thekeepOutflag. This can be useful for debugging.
Returns
list containing the output, named after the groups, which,
by default, are the steps.
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2)
p$add("step2", \(x = ~step1) x + 2, keepOut = TRUE)
p$run()
p$collect_out()
p$collect_out(all = TRUE) |> str()
# Grouped output
p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2, group = "add")
p$add("step2", \(x = ~step1, y = 2) x + y, group = "add")
p$add("step3", \(x = ~data) x * 3, group = "mult")
p$add("step4", \(x = ~data, y = 2) x * y, group = "mult")
p
p$run()
p$collect_out(all = TRUE) |> str()
# Grouped by state
p$set_params(list(y = 5))
p
p$collect_out(groupBy = "state", all = TRUE) |> str()
Pipeline$discard_steps()
Discard all steps that match a given pattern.
Usage
Pipeline$discard_steps(pattern, recursive = FALSE, fixed = TRUE, ...)
Arguments
patternstringcontaining a regular expression (or character string forfixed = TRUE) to be matched.recursivelogicalifTRUEthe step is removed together with all its downstream dependencies.fixedlogicalIfTRUE,patternis a string to be matched as is. Overrides all conflicting arguments....further arguments passed to
grep().
Returns
the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~add1) x + 2)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p
p$discard_steps("mult")
p
# Re-add steps
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p
# Discarding 'add1' does not work ...
try(p$discard_steps("add1"))
# ... unless we enforce to remove its downstream dependencies as well
p$discard_steps("add1", recursive = TRUE) # this works
p
# Trying to discard non-existent steps is just ignored
p$discard_steps("non-existent")
Pipeline$get_data()
Get data
Usage
Pipeline$get_data()
Returns
the output defined in the data step, which by default is
the first step of the pipeline
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$get_data()
p$set_data(3:4)
p$get_data()
Pipeline$get_depends()
Get all dependencies defined in the pipeline
Usage
Pipeline$get_depends()
Returns
named list of dependencies for each step
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$get_depends()
Pipeline$get_depends_down()
Get all downstream dependencies of given step, by default descending recursively.
Usage
Pipeline$get_depends_down(step, recursive = TRUE)
Arguments
stepstringname of steprecursivelogicalifTRUE, dependencies of dependencies are also returned.
Returns
list of downstream dependencies
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_down("add1")
p$get_depends_down("add1", recursive = FALSE)
Pipeline$get_depends_up()
Get all upstream dependencies of given step, by default descending recursively.
Usage
Pipeline$get_depends_up(step, recursive = TRUE)
Arguments
stepstringname of steprecursivelogicalifTRUE, dependencies of dependencies are also returned.
Returns
list of upstream dependencies
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_up("mult4")
p$get_depends_up("mult4", recursive = FALSE)
Pipeline$get_graph()
Visualize the pipeline as a graph.
Usage
Pipeline$get_graph(groups = NULL)
Arguments
groupscharacterif notNULL, only steps belonging to the given groups are considered.
Returns
two data frames, one for nodes and one for edges ready to be
used with the visNetwork package.
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = ~add1, y = ~add2) x * y)
graph <- pipe_get_graph(p)
graph
if (require("visNetwork", quietly = TRUE)) {
do.call(visNetwork, args = p$get_graph())
}
Pipeline$get_out()
Get output of given step
Usage
Pipeline$get_out(step)
Arguments
stepstringname of step
Returns
the output at the given step.
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$run()
p$get_out("add1")
p$get_out("add2")
Pipeline$get_params()
Set unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps. Trying #' to set parameters that don't exist in the pipeline is ignored, by default, with a warning.
Usage
Pipeline$get_params(ignoreHidden = TRUE)
Arguments
ignoreHiddenlogicalif TRUE, hidden parameters (i.e. all names starting with a dot) are ignored and thus not returned.
Returns
list of parameters, sorted and named by step. Steps with
no parameters are filtered out.
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params() |> str()
p$get_params(ignoreHidden = FALSE) |> str()
Pipeline$get_params_at_step()
Get all unbound (i.e. not referring to other steps) at given step name.
Usage
Pipeline$get_params_at_step(step, ignoreHidden = TRUE)
Arguments
stepstringname of stepignoreHiddenlogicalif TRUE, hidden parameters (i.e. all names starting with a dot) are ignored and thus not returned.
Returns
list of parameters defined at given step.
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params_at_step("add2")
p$get_params_at_step("add2", ignoreHidden = FALSE)
p$get_params_at_step("add3")
Pipeline$get_params_unique()
Get all unbound (i.e. not referring to other steps)
parameters defined in the pipeline,
but only list each parameter once. The values of the parameters,
will be the values of the first step where the parameter was defined.
This is particularly useful after the parameters where set using
the set_params function, which will set the same value
for all steps.
Usage
Pipeline$get_params_unique(ignoreHidden = TRUE)
Arguments
ignoreHiddenlogicalif TRUE, hidden parameters (i.e. all names starting with a dot) are ignored and thus not returned.
Returns
list of unique parameters
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
p$get_params_unique()
p$get_params_unique(ignoreHidden = FALSE)
Pipeline$get_params_unique_json()
Get all unique function parameters in json format.
Usage
Pipeline$get_params_unique_json(ignoreHidden = TRUE)
Arguments
ignoreHiddenlogicalif TRUE, hidden parameters (i.e. all names starting with a dot) are ignored and thus not returned.
Returns
list flat unnamed json list of unique function parameters
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
p$get_params_unique_json()
p$get_params_unique_json(ignoreHidden = FALSE)
Pipeline$get_step()
Get step of pipeline
Usage
Pipeline$get_step(step)
Arguments
stepstringname of step
Returns
data.table row containing the step.
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z)
p$run()
add1 <- p$get_step("add1")
print(add1)
add1[["params"]]
add1[["fun"]]
try()
try(p$get_step("foo")) # error: step 'foo' does not exist
Pipeline$get_step_names()
Get step names of pipeline
Usage
Pipeline$get_step_names()
Returns
character vector of step names
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_names()
Pipeline$get_step_number()
Get step number
Usage
Pipeline$get_step_number(step)
Arguments
stepstringname of step
Returns
the step number in the pipeline
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_number("f2")
Pipeline$has_step()
Check if pipeline has given step
Usage
Pipeline$has_step(step)
Arguments
stepstringname of step
Returns
logical whether step exists
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$has_step("f2")
p$has_step("foo")
Pipeline$insert_after()
Insert step after a certain step
Usage
Pipeline$insert_after(afterStep, step, ...)
Arguments
afterStepstringname of step after which to insertstepstringname of step to insert...further arguments passed to
addmethod of the pipeline
Returns
returns the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_after("f1", "f3", \(x = ~f1) x)
p
Pipeline$insert_before()
Insert step before a certain step
Usage
Pipeline$insert_before(beforeStep, step, ...)
Arguments
beforeStepstringname of step before which to insertstepstringname of step to insert...further arguments passed to
addmethod of the pipeline
Returns
returns the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_before("f2", "f3", \(x = ~f1) x)
p
Pipeline$length()
Length of the pipeline aka number of pipeline steps.
Usage
Pipeline$length()
Returns
numeric length of pipeline.
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$length()
Pipeline$lock_step()
Locking a step means that both its parameters and its output (given it has output) are locked such that neither setting new pipeline parameters nor future pipeline runs can change the current parameter and output content.
Usage
Pipeline$lock_step(step)
Arguments
stepstringname of step
Returns
the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$run()
p$get_out("add1")
p$get_out("add2")
p$lock_step("add1")
p$set_data(3)
p$set_params(list(x = 3))
p$run()
p$get_out("add1")
p$get_out("add2")
Pipeline$pop_step()
Drop last step from the pipeline.
Usage
Pipeline$pop_step()
Returns
string the name of the step that was removed
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p
p$pop_step() # "f2"
p
Pipeline$pop_steps_after()
Drop all steps after the given step.
Usage
Pipeline$pop_steps_after(step)
Arguments
stepstringname of step
Returns
character vector of steps that were removed.
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_after("f1") # "f2", "f3"
p
Pipeline$pop_steps_from()
Drop all steps from and including the given step.
Usage
Pipeline$pop_steps_from(step)
Arguments
stepstringname of step
Returns
character vector of steps that were removed.
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_from("f2") # "f2", "f3"
p
Pipeline$print()
Print the pipeline as a table.
Usage
Pipeline$print(verbose = FALSE)
Arguments
verboselogicalifTRUE, print all columns of the pipeline, otherwise only the most relevant columns are displayed.
Returns
the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$print()
Pipeline$remove_step()
Remove certain step from the pipeline.
If other steps depend on the step to be removed, an error is
given and the removal is blocked, unless recursive was set to
TRUE.
Usage
Pipeline$remove_step(step, recursive = FALSE)
Arguments
stepstringthe name of the step to be removed.recursivelogicalifTRUEthe step is removed together with all its downstream dependencies.
Returns
the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = 1, y = ~add2) x * y)
p$remove_step("mult1")
p
try(p$remove_step("add1")) # fails because "add2" depends on "add1"
p$remove_step("add1", recursive = TRUE) # removes "add1" and "add2"
p
Pipeline$rename_step()
Safely rename a step in the pipeline. If new step name would result in a name clash, an error is given.
Usage
Pipeline$rename_step(from, to)
Arguments
fromstringthe name of the step to be renamed.tostringthe new name of the step.
Returns
the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p
try(p$rename_step("add1", "add2")) # fails because "add2" exists
p$rename_step("add1", "first_add") # Ok
p
Pipeline$replace_step()
Replaces an existing pipeline step.
Usage
Pipeline$replace_step( step, fun, params = list(), description = "", group = step, keepOut = FALSE )
Arguments
stepstringthe name of the step to be replaced. Step must exist.funstringorfunctionoperation to be applied at the step. Both existing and lambda/anonymous functions can be used.paramslistlist of parameters to overwrite default parameters of existing functions.descriptionstringoptional description of the stepgroupstringgrouping information (by default the same as the name of the step. Any output collected later (see functioncollect_outby default is put together by these group names. This, for example, comes in handy when the pipeline is copy-appended multiple times to keep the results of the same function/step at one place.keepOutlogicalifFALSEthe output of the function will be cleaned at the end of the whole pipeline execution. This option is used to only keep the results that matter.
Returns
the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~data, y = 2) x + y)
p$add("mult", \(x = 1, y = 2) x * y, keepOut = TRUE)
p$run()$collect_out()
p$replace_step("mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p$run()$collect_out()
try(p$replace_step("foo", \(x = 1) x)) # step 'foo' does not exist
Pipeline$reset()
Resets the pipeline to the state before it was run. This means that all output is removed and the state of all steps is reset to 'New'.
Usage
Pipeline$reset()
Returns
returns the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$run()
p
p$reset()
p
Pipeline$run()
Run all new and/or outdated pipeline steps.
Usage
Pipeline$run( force = FALSE, recursive = TRUE, cleanUnkept = FALSE, progress = NULL, showLog = TRUE )
Arguments
forcelogicalifTRUEall steps are run regardless of whether they are outdated or not.recursivelogicalifTRUEand a step returns a new pipeline, the run of the current pipeline is aborted and the new pipeline is run recursively.cleanUnkeptlogicalifTRUEall output that was not marked to be kept is removed after the pipeline run. This option can be useful if temporary results require a lot of memory.progressfunctionthis parameter can be used to provide a custom progress function of the formfunction(value, detail), which will show the progress of the pipeline run for each step, wherevalueis the current step number anddetailis the name of the step.showLoglogicalshould the steps be logged during the pipeline run?
Returns
returns the Pipeline object invisibly
Examples
# Simple pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p$run()$collect_out()
p$set_params(list(z = 4)) # outdates steps add2 and final
p
p$run()$collect_out()
p$run(cleanUnkept = TRUE) # clean up temporary results
p
# Recursive pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("new_pipe", \(x = ~add1) {
pp <- Pipeline$new("new_pipe", data = x)
pp$add("add1", \(x = ~data) x + 1)
pp$add("add2", \(x = ~add1) x + 2, keepOut = TRUE)
}
)
p$run(recursive = TRUE)$collect_out()
# Run pipeline with progress bar
p <- Pipeline$new("pipe", data = 1)
p$add("first step", \() Sys.sleep(1))
p$add("second step", \() Sys.sleep(1))
p$add("last step", \() Sys.sleep(1))
pb <- txtProgressBar(min = 1, max = p$length(), style = 3)
fprogress <- function(value, detail) {
setTxtProgressBar(pb, value)
}
p$run(progress = fprogress, showLog = FALSE)
Pipeline$run_step()
Run given pipeline step possibly together with upstream and downstream dependencies.
Usage
Pipeline$run_step( step, upstream = TRUE, downstream = FALSE, cleanUnkept = FALSE )
Arguments
stepstringname of stepupstreamlogicalifTRUE, run all dependent upstream steps first.downstreamlogicalifTRUE, run all depdendent downstream afterwards.cleanUnkeptlogicalifTRUEall output that was not marked to be kept is removed after the pipeline run. This option can be useful if temporary results require a lot of memory.
Returns
returns the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("mult", \(x = ~add1, y = ~add2) x * y)
p$run_step("add2")
p$run_step("add2", downstream = TRUE)
p$run_step("mult", upstream = TRUE)
Pipeline$set_data()
Set data in first step of pipeline.
Usage
Pipeline$set_data(data)
Arguments
datainitial data set
Returns
returns the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p$run()$collect_out()
p$set_data(3)
p$run()$collect_out()
Pipeline$set_data_split()
This function can be used to apply the pipeline repeatedly to various data sets. For this, the pipeline split-copies itself by the list of given data sets. Each sub-pipeline will have one of the data sets set as input data. The step names of the sub-pipelines will be the original step names plus the name of the data set.
Usage
Pipeline$set_data_split( dataList, toStep = character(), groupBySplit = TRUE, sep = "." )
Arguments
dataListlistof data setstoStepstringstep name marking optional subset of the pipeline, at which the data split should be applied to.groupBySplitlogicalwhether to set step groups according to data split.sepstringseparator to be used between step name and data set name when creating the new step names.
Returns
new combined Pipeline with each sub-pipeline having set
one of the data sets.
Examples
# Split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
p$set_data_split(dataList)
p
p$run()$collect_out() |> str()
# Don't group output by split
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
p$set_data_split(dataList, groupBySplit = FALSE)
p
p$run()$collect_out() |> str()
# Split up to certain step
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1)
p$add("mult", \(x = ~data, y = ~add1) x * y)
p$add("average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE)
p
p$get_depends()[["average_result"]]
p$set_data_split(dataList, toStep = "mult")
p
p$get_depends()[["average_result"]]
p$run()$collect_out()
Pipeline$set_keep_out()
Change the keepOut flag at a given pipeline step,
which determines whether the output of that step is collected
when calling collect_out() after the pipeline was run.
Usage
Pipeline$set_keep_out(step, keepOut = TRUE)
Arguments
stepstringname of stepkeepOutlogicalwhether to keep output of step
Returns
the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p$add("add2", \(x = ~data, y = 2) x + y)
p$add("mult", \(x = ~add1, y = ~add2) x * y)
p$run()$collect_out()
p$set_keep_out("add1", keepOut = FALSE)
p$set_keep_out("mult", keepOut = TRUE)
p$collect_out()
Pipeline$set_params()
Set parameters in the pipeline. If a parameter occurs in several steps, the parameter is set commonly in all steps. Trying to set parameters that don't exist in the pipeline is ignored, by default, with a warning.
Usage
Pipeline$set_params(params, warnUndefined = TRUE)
Arguments
paramslistof parameters to be setwarnUndefinedlogicalwhether to give a warning when trying to set a parameter that is not defined in the pipeline.
Returns
returns the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2) x + y)
p$add("add2", \(x = ~data, y = 3) x + y)
p$add("mult", \(x = 4, z = 5) x * z)
p$get_params()
p$set_params(list(x = 3, y = 3))
p$get_params()
p$set_params(list(x = 5, z = 3))
p$get_params()
suppressWarnings(
p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined
)
p$set_params(list(foo = 3), warnUndefined = FALSE)
Pipeline$set_params_at_step()
Set unbound function parameters defined at given pipeline step where 'unbound' means parameters that are not linked to other steps.
Usage
Pipeline$set_params_at_step(step, params)
Arguments
stepstringthe name of the stepparamslistof parameters to be set
Returns
returns the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2, z = 3) x + y)
p$set_params_at_step("add1", list(y = 5, z = 6))
p$get_params()
try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined
Pipeline$split()
Splits pipeline into its independent parts.
Usage
Pipeline$split()
Returns
list of Pipeline objects
Examples
# Example for two independent calculation paths
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = ~data) x)
p$add("f2", \(x = 1) x)
p$add("f3", \(x = ~f1) x)
p$add("f4", \(x = ~f2) x)
p$split()
# Example of split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipes <- p$set_data_split(dataList)$split()
pipes
Pipeline$unlock_step()
Unlock previously locked step. If step was not locked, the command is ignored.
Usage
Pipeline$unlock_step(step)
Arguments
stepstringname of step
Returns
the Pipeline object invisibly
Examples
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$lock_step("add1")
p$set_params(list(x = 3))
p$get_params()
p$unlock_step("add1")
p$set_params(list(x = 3))
p$get_params()
Pipeline$clone()
The objects of this class are cloneable with this method.
Usage
Pipeline$clone(deep = FALSE)
Arguments
deepWhether to make a deep clone.
Author(s)
Roman Pahl
Examples
## ------------------------------------------------
## Method `Pipeline$new()`
## ------------------------------------------------
p <- Pipeline$new("myPipe", data = data.frame(x = 1:8))
p
# Passing custom logger
my_logger <- function(level, msg, ...) {
cat(level, msg, "\n")
}
p <- Pipeline$new("myPipe", logger = my_logger)
## ------------------------------------------------
## Method `Pipeline$add()`
## ------------------------------------------------
# Add steps with lambda functions
p <- Pipeline$new("myPipe", data = 1)
p$add("s1", \(x = ~data) 2*x) # use input data
p$add("s2", \(x = ~data, y = ~s1) x * y)
try(p$add("s2", \(z = 3) 3)) # error: step 's2' exists already
try(p$add("s3", \(z = ~foo) 3)) # dependency 'foo' not found
p
# Add step with existing function
p <- Pipeline$new("myPipe", data = c(1, 2, NA, 3, 4))
p$add("calc_mean", mean, params = list(x = ~data, na.rm = TRUE))
p$run()$get_out("calc_mean")
# Step description
p <- Pipeline$new("myPipe", data = 1:10)
p$add("s1", \(x = ~data) 2*x, description = "multiply by 2")
print(p)
print(p, verbose = TRUE) # print all columns
# Group output
p <- Pipeline$new("myPipe", data = data.frame(x = 1:5, y = 1:5))
p$add("prep_x", \(data = ~data) data$x, group = "prep")
p$add("prep_y", \(data = ~data) (data$y)^2, group = "prep")
p$add("sum", \(x = ~prep_x, y = ~prep_y) x + y)
p$run()$collect_out(all = TRUE)
## ------------------------------------------------
## Method `Pipeline$append()`
## ------------------------------------------------
# Append pipeline
p1 <- Pipeline$new("pipe1")
p1$add("step1", \(x = 1) x)
p2 <- Pipeline$new("pipe2")
p2$add("step2", \(y = 1) y)
p1$append(p2)
# Append pipeline with potential name clashes
p3 <- Pipeline$new("pipe3")
p3$add("step1", \(z = 1) z)
p1$append(p2)$append(p3)
# Use output of first pipeline as input for second pipeline
p1 <- Pipeline$new("pipe1", data = 8)
p2 <- Pipeline$new("pipe2")
p1$add("square", \(x = ~data) x^2)
p2$add("log2", \(x = ~data) log2(x))
p12 <- p1$append(p2, outAsIn = TRUE)
p12$run()$get_out("log2")
p12
# Custom name separator
p1$append(p2, sep = "___")
## ------------------------------------------------
## Method `Pipeline$append_to_step_names()`
## ------------------------------------------------
p <- Pipeline$new("pipe")
p$add("step1", \(x = 1) x)
p$add("step2", \(y = 1) y)
p$append_to_step_names("new")
p
p$append_to_step_names("foo", sep = "__")
p
## ------------------------------------------------
## Method `Pipeline$collect_out()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2)
p$add("step2", \(x = ~step1) x + 2, keepOut = TRUE)
p$run()
p$collect_out()
p$collect_out(all = TRUE) |> str()
# Grouped output
p <- Pipeline$new("pipe", data = 1:2)
p$add("step1", \(x = ~data) x + 2, group = "add")
p$add("step2", \(x = ~step1, y = 2) x + y, group = "add")
p$add("step3", \(x = ~data) x * 3, group = "mult")
p$add("step4", \(x = ~data, y = 2) x * y, group = "mult")
p
p$run()
p$collect_out(all = TRUE) |> str()
# Grouped by state
p$set_params(list(y = 5))
p
p$collect_out(groupBy = "state", all = TRUE) |> str()
## ------------------------------------------------
## Method `Pipeline$discard_steps()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~add1) x + 2)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p
p$discard_steps("mult")
p
# Re-add steps
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p
# Discarding 'add1' does not work ...
try(p$discard_steps("add1"))
# ... unless we enforce to remove its downstream dependencies as well
p$discard_steps("add1", recursive = TRUE) # this works
p
# Trying to discard non-existent steps is just ignored
p$discard_steps("non-existent")
## ------------------------------------------------
## Method `Pipeline$get_data()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$get_data()
p$set_data(3:4)
p$get_data()
## ------------------------------------------------
## Method `Pipeline$get_depends()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$get_depends()
## ------------------------------------------------
## Method `Pipeline$get_depends_down()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_down("add1")
p$get_depends_down("add1", recursive = FALSE)
## ------------------------------------------------
## Method `Pipeline$get_depends_up()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$add("mult3", \(x = ~add1) x * 3)
p$add("mult4", \(x = ~add2) x * 4)
p$get_depends_up("mult4")
p$get_depends_up("mult4", recursive = FALSE)
## ------------------------------------------------
## Method `Pipeline$get_graph()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = ~add1, y = ~add2) x * y)
graph <- pipe_get_graph(p)
graph
if (require("visNetwork", quietly = TRUE)) {
do.call(visNetwork, args = p$get_graph())
}
## ------------------------------------------------
## Method `Pipeline$get_out()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(x = ~data) x + 1)
p$add("add2", \(x = ~data, y = ~add1) x + y)
p$run()
p$get_out("add1")
p$get_out("add2")
## ------------------------------------------------
## Method `Pipeline$get_params()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params() |> str()
p$get_params(ignoreHidden = FALSE) |> str()
## ------------------------------------------------
## Method `Pipeline$get_params_at_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("add3", \() 1 + 2)
p$get_params_at_step("add2")
p$get_params_at_step("add2", ignoreHidden = FALSE)
p$get_params_at_step("add3")
## ------------------------------------------------
## Method `Pipeline$get_params_unique()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
p$get_params_unique()
p$get_params_unique(ignoreHidden = FALSE)
## ------------------------------------------------
## Method `Pipeline$get_params_unique_json()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, .z = 3) x + y + .z)
p$add("mult1", \(x = 1, y = 2, .z = 3, b = ~add2) x * y * b)
p$get_params_unique_json()
p$get_params_unique_json(ignoreHidden = FALSE)
## ------------------------------------------------
## Method `Pipeline$get_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = 2, z = ~add1) x + y + z)
p$run()
add1 <- p$get_step("add1")
print(add1)
add1[["params"]]
add1[["fun"]]
try()
try(p$get_step("foo")) # error: step 'foo' does not exist
## ------------------------------------------------
## Method `Pipeline$get_step_names()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_names()
## ------------------------------------------------
## Method `Pipeline$get_step_number()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$get_step_number("f2")
## ------------------------------------------------
## Method `Pipeline$has_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$has_step("f2")
p$has_step("foo")
## ------------------------------------------------
## Method `Pipeline$insert_after()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_after("f1", "f3", \(x = ~f1) x)
p
## ------------------------------------------------
## Method `Pipeline$insert_before()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = 1) x)
p$add("f2", \(x = ~f1) x)
p$insert_before("f2", "f3", \(x = ~f1) x)
p
## ------------------------------------------------
## Method `Pipeline$length()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$length()
## ------------------------------------------------
## Method `Pipeline$lock_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$run()
p$get_out("add1")
p$get_out("add2")
p$lock_step("add1")
p$set_data(3)
p$set_params(list(x = 3))
p$run()
p$get_out("add1")
p$get_out("add2")
## ------------------------------------------------
## Method `Pipeline$pop_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p
p$pop_step() # "f2"
p
## ------------------------------------------------
## Method `Pipeline$pop_steps_after()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_after("f1") # "f2", "f3"
p
## ------------------------------------------------
## Method `Pipeline$pop_steps_from()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$add("f3", \(z = 1) z)
p$pop_steps_from("f2") # "f2", "f3"
p
## ------------------------------------------------
## Method `Pipeline$print()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$print()
## ------------------------------------------------
## Method `Pipeline$remove_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p$add("mult1", \(x = 1, y = ~add2) x * y)
p$remove_step("mult1")
p
try(p$remove_step("add1")) # fails because "add2" depends on "add1"
p$remove_step("add1", recursive = TRUE) # removes "add1" and "add2"
p
## ------------------------------------------------
## Method `Pipeline$rename_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$add("add2", \(x = 1, y = ~add1) x + y)
p
try(p$rename_step("add1", "add2")) # fails because "add2" exists
p$rename_step("add1", "first_add") # Ok
p
## ------------------------------------------------
## Method `Pipeline$replace_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~data, y = 2) x + y)
p$add("mult", \(x = 1, y = 2) x * y, keepOut = TRUE)
p$run()$collect_out()
p$replace_step("mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p$run()$collect_out()
try(p$replace_step("foo", \(x = 1) x)) # step 'foo' does not exist
## ------------------------------------------------
## Method `Pipeline$reset()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
p$run()
p
p$reset()
p
## ------------------------------------------------
## Method `Pipeline$run()`
## ------------------------------------------------
# Simple pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p$run()$collect_out()
p$set_params(list(z = 4)) # outdates steps add2 and final
p
p$run()$collect_out()
p$run(cleanUnkept = TRUE) # clean up temporary results
p
# Recursive pipeline
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("new_pipe", \(x = ~add1) {
pp <- Pipeline$new("new_pipe", data = x)
pp$add("add1", \(x = ~data) x + 1)
pp$add("add2", \(x = ~add1) x + 2, keepOut = TRUE)
}
)
p$run(recursive = TRUE)$collect_out()
# Run pipeline with progress bar
p <- Pipeline$new("pipe", data = 1)
p$add("first step", \() Sys.sleep(1))
p$add("second step", \() Sys.sleep(1))
p$add("last step", \() Sys.sleep(1))
pb <- txtProgressBar(min = 1, max = p$length(), style = 3)
fprogress <- function(value, detail) {
setTxtProgressBar(pb, value)
}
p$run(progress = fprogress, showLog = FALSE)
## ------------------------------------------------
## Method `Pipeline$run_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y)
p$add("add2", \(x = ~add1, z = 2) x + z)
p$add("mult", \(x = ~add1, y = ~add2) x * y)
p$run_step("add2")
p$run_step("add2", downstream = TRUE)
p$run_step("mult", upstream = TRUE)
## ------------------------------------------------
## Method `Pipeline$set_data()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p$run()$collect_out()
p$set_data(3)
p$run()$collect_out()
## ------------------------------------------------
## Method `Pipeline$set_data_split()`
## ------------------------------------------------
# Split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
p$set_data_split(dataList)
p
p$run()$collect_out() |> str()
# Don't group output by split
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
p$set_data_split(dataList, groupBySplit = FALSE)
p
p$run()$collect_out() |> str()
# Split up to certain step
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1)
p$add("mult", \(x = ~data, y = ~add1) x * y)
p$add("average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE)
p
p$get_depends()[["average_result"]]
p$set_data_split(dataList, toStep = "mult")
p
p$get_depends()[["average_result"]]
p$run()$collect_out()
## ------------------------------------------------
## Method `Pipeline$set_keep_out()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p$add("add2", \(x = ~data, y = 2) x + y)
p$add("mult", \(x = ~add1, y = ~add2) x * y)
p$run()$collect_out()
p$set_keep_out("add1", keepOut = FALSE)
p$set_keep_out("mult", keepOut = TRUE)
p$collect_out()
## ------------------------------------------------
## Method `Pipeline$set_params()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2) x + y)
p$add("add2", \(x = ~data, y = 3) x + y)
p$add("mult", \(x = 4, z = 5) x * z)
p$get_params()
p$set_params(list(x = 3, y = 3))
p$get_params()
p$set_params(list(x = 5, z = 3))
p$get_params()
suppressWarnings(
p$set_params(list(foo = 3)) # gives warning as 'foo' is undefined
)
p$set_params(list(foo = 3), warnUndefined = FALSE)
## ------------------------------------------------
## Method `Pipeline$set_params_at_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = ~data, y = 2, z = 3) x + y)
p$set_params_at_step("add1", list(y = 5, z = 6))
p$get_params()
try(p$set_params_at_step("add1", list(foo = 3))) # foo not defined
## ------------------------------------------------
## Method `Pipeline$split()`
## ------------------------------------------------
# Example for two independent calculation paths
p <- Pipeline$new("pipe", data = 1)
p$add("f1", \(x = ~data) x)
p$add("f2", \(x = 1) x)
p$add("f3", \(x = ~f1) x)
p$add("f4", \(x = ~f2) x)
p$split()
# Example of split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- Pipeline$new("pipe")
p$add("add1", \(x = ~data) x + 1, keepOut = TRUE)
p$add("mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipes <- p$set_data_split(dataList)$split()
pipes
## ------------------------------------------------
## Method `Pipeline$unlock_step()`
## ------------------------------------------------
p <- Pipeline$new("pipe", data = 1)
p$add("add1", \(x = 1, data = ~data) x + data)
p$add("add2", \(x = 1, data = ~data) x + data)
p$lock_step("add1")
p$set_params(list(x = 3))
p$get_params()
p$unlock_step("add1")
p$set_params(list(x = 3))
p$get_params()
Extract or subset a pipeline
Description
Returns a new pipeline containing selected steps and all required upstream dependencies.
Extracts values from a pipeline using one or two indices.
With a single string name, named fields such as "pipeline" or "name"
are returned first; anything else returns the matching step-table column.
With two indices (row, column), a single cell is extracted.
Usage
## S3 method for class 'pipeflow_pip'
x[i, ...]
## S3 method for class 'pipeflow_pip'
x[[i, j, ...]]
Arguments
x |
A pipeflow pipeline object. |
i |
integer (row indices) or character vector (step names) of steps to select |
... |
not used |
j |
column names to select |
Value
A new pipeflow pipeline object.
Extracted value(s), depending on i and j.
Examples
p <- pip_new() |>
pip_add("load", \(n = 5) seq_len(n)) |>
pip_add("square", \(x = ~load) x^2) |>
pip_add("total", \(x = ~square) sum(x))
# Select by step name — upstream deps are pulled in automatically.
# Selecting only "total" still includes "load" and "square".
sub <- p["total"]
sub[["pipeline"]][["step"]] # "load", "square", "total"
# Select a subset of steps by name vector
p[c("load", "square")][["pipeline"]][["step"]] # "load", "square"
# Select by integer row index
p[1:2][["pipeline"]][["step"]] # "load", "square"
p <- pip_new() |>
pip_add("load", \(x = 1) x) |>
pip_add("fit", \(x = ~load) x + 1)
# Access internal objects by name
p[["pipeline"]] # the full step table
p[["name"]] # "pipe"
# Shorthand column access (equivalent to p[["pipeline"]][["step"]])
p[["step"]]
# Two-index form: p[[row, column]] extracts a single cell
p[["fit", "depends"]] # "load"
p[[2, "state"]] # state of the second step
Length of a pipeflow pipeline or view
Description
Length of a pipeflow pipeline or view
Usage
## S3 method for class 'pipeflow_pip'
length(x)
## S3 method for class 'pipeflow_view'
length(x)
Arguments
x |
A pipeflow pipeline or view |
Value
Number of steps as an integer.
Examples
p <- pip_new() |>
pip_add("s1", \(x = 1) x) |>
pip_add("s2", \(x = ~s1) x + 1) |>
pip_add("s3", \(x = ~s2) x * 2)
length(p) # 3 — total steps in the pipeline
# A view reports only the number of selected (visible) steps
v <- pip_view(p, i = c("s2", "s3"))
length(v) # 2
Add a step
Description
Adds a named step to the pipeline. Each step is a function whose parameters
either hold constant defaults or reference the output of a prior step using
formula notation (~step_name). Dependencies are validated when the step
is added.
Usage
pip_add(x, step, fun, tags = character(0), after = length(x), exec = "auto")
Arguments
x |
A pipeflow pipeline object. |
step |
Unique step name. |
fun |
Function to execute for the step. Each function parameter must
have a default value. Default values that are simple constants are resolved
immediately. Default values that are formulas like |
tags |
Optional character vector of tags belonging to the step.
Can also be adjusted later using |
after |
Optional position after which the new step should be inserted (defaults to last position). Can be a step name or an integer index. If set to 0, the new step will be inserted at the beginning of the pipeline. |
exec |
Execution mode for this step. One of "auto", "split",
"reduce" or "plain".
Using execution mode
|
Details
If after was specified, the new step will be inserted after the given
step or position. Be aware that in contrast to adding a step at the end,
inserting a step in the middle is a rather expensive operation as it
requires re-wiring parts of the internal pipeline structure, especially
if the new step is inserted at an early position.
Value
The updated pipeline, invisibly.
Examples
# --- Tags, and view filtering ---
p <- pip_new("analysis") |>
pip_add("load", \(n = 5) seq_len(n), tags = c("io", "raw")) |>
pip_add("clean", \(x = ~load) x * 2, tags = c("io", "process")) |>
pip_add("fit", \(x = ~clean) sum(x), tags = c("model", "core", "daily")) |>
pip_add("report", \(x = ~fit) paste("result:", x), tags = "report")
pip_run(p)
p
# Filter by tag using pip_view — keeps steps with any matching tag
pip_view(p, tags = "daily")
pip_view(p, tags = "core")
pip_view(p, tags = c("raw", "report"))
# --- Split / reduce execution modes ---
q <- pip_new("split-demo") |>
pip_add("data", \(x = iris) x) |>
pip_add("split", \(x = ~data) split(x, x$Species),
exec = "split"
) |>
pip_add("stats", \(x = ~split) summary(x)) |>
pip_add("combine", \(x = ~stats) do.call(rbind, x),
exec = "reduce"
)
pip_run(q)
q[["stats", "out"]] # partitioned list — one summary per species
q[["combine", "out"]] # combined table
Copy a step from another pipeline
Description
Copies one step from pipeline y into pipeline x, preserving its
function, parameters, tags, and dependency links.
Usage
pip_add_from(x, y, step)
Arguments
x |
Target pipeflow pipeline object. |
y |
Source pipeflow pipeline object. |
step |
Step name to copy from |
Value
The updated target pipeline, invisibly.
Examples
# Build a source pipeline with reusable steps
src <- pip_new("source") |>
pip_add("load", \(n = 3) seq_len(n)) |>
pip_add("square", \(x = ~load) x^2)
# Copy steps into a new pipeline one at a time.
# The dependency of "square" on "load" is re-established automatically.
dst <- pip_new("target")
pip_add_from(dst, src, "load")
pip_add_from(dst, src, "square")
pip_run(dst)
pip_collect_out(dst)
Bind pipelines
Description
Bind two pipelines together by concatenating their steps. If both pipelines have steps with the same name, the step names of the second pipeline will be automatically adapted to avoid name clashes.
Usage
pip_bind(x, y)
Arguments
x |
A pipeflow pipeline object. |
y |
A pipeflow pipeline object. |
Value
A new pipeflow pipeline object representing the bound pipelines.
Examples
a <- pip_new("a") |>
pip_add("prep", \(x = 1) x * 2) |>
pip_add("fit", \(x = ~prep) x + 10)
# "prep" exists in both pipelines; the one from b gets a numeric suffix
b <- pip_new("b") |> pip_add("prep", \(x = 5) x * 3)
ab <- pip_bind(a, b)
ab[["step"]] # "prep", "fit", "prep2" (step name conflict auto-resolved)
ab
Clone a pipeline
Description
Creates an independent copy of the pipeline. Changes to the cloned pipeline do not affect the original pipeline, and vice versa.
Usage
pip_clone(x, name = NULL)
Arguments
x |
A pipeflow pipeline object. |
name |
Optional name for the cloned pipeline. If |
Value
A cloned pipeflow pipeline object.
Examples
p <- pip_new("original") |>
pip_add("s1", \(x = 1) x) |>
pip_add("s2", \(x = ~s1) x + 1)
# Clone produces a fully independent copy
cp <- pip_clone(p, name = "copy")
pip_add(cp, "s3", \(x = ~s2) x * 10)
# As a result, the clone has the new step ...
cp
# ... while the original is left unchanged
p
Collect step outputs
Description
Returns the outputs of all pipeline steps as a flat named list keyed by
step name. Use pip_view() to narrow the selection before collecting,
and compose calls if grouped output is needed.
Usage
pip_collect_out(x)
Arguments
x |
A pipeflow pip or view. |
Value
A named list of outputs, one element per step.
Examples
p <- pip_new() |>
pip_add("load", \(x = 1) x, tags = "io") |>
pip_add("clean", \(x = ~load) x + 1, tags = "io") |>
pip_add("model", \(x = ~clean) x * 2, tags = "model")
pip_run(p)
# Flat named list with one entry per step
pip_collect_out(p)
# Combine with pip_view to collect output for specific tags
grouped <- list(
io = pip_view(p, tags = "io") |> pip_collect_out(),
model = pip_view(p, tags = "model") |> pip_collect_out()
)
grouped
Build pipeline graph data
Description
Builds graph data (nodes and edges) describing the pipeline's step
structure, suitable for visualisation with visNetwork::visNetwork().
Usage
pip_get_graph(x, include_upstream = FALSE)
Arguments
x |
A pipeflow pip or view. |
include_upstream |
Logical. Only relevant for views. If |
Details
Node shapes reflect execution mode:
-
auto/plain:hexagon -
reduce:dot -
split:star
Value
A named list with two data.frames: nodes and edges.
Examples
p <- pip_new()
pip_add(p, "load", \(x = 1) x, tags = "io")
pip_add(p, "clean", \(x = ~load) x + 1, tags = "io")
pip_add(p, "fit", \(x = ~clean) x * 2, tags = "model")
graph <- pip_get_graph(p)
graph$nodes # data.frame: id, label, shape, color
graph$edges # data.frame: from, to, arrows
# For a view, include_upstream = TRUE adds upstream deps to the graph
v <- pip_view(p, i = "fit")
pip_get_graph(v, include_upstream = TRUE)
if (require("visNetwork", quietly = TRUE)) {
do.call(what = visNetwork::visNetwork, args = graph)
}
Get independent parameters
Description
Returns the current default values of all tunable (non-dependency)
parameters across the pipeline. These are the parameters that can be
updated via pip_set_params(). Parameters wired to another step's output
via ~step_name are excluded.
Usage
pip_get_params(x)
Arguments
x |
A pipeflow pip or view |
Value
Named list of tunable parameter values. If the same parameter name appears in multiple steps, the first occurrence in pipeline order is returned.
Examples
p <- pip_new() |>
pip_add("load", \(n = 100, seed = 42) seq_len(n)) |>
pip_add("model", \(x = ~load, lambda = 0.1) x * lambda)
# ~load is a dependency — only non-dependency params are returned
pip_get_params(p) # list(n = 100, seed = 42, lambda = 0.1)
# Useful as a guide for pip_set_params()
pip_set_params(p, params = list(n = 20, lambda = 0.5))
pip_run(p) |> pip_collect_out()
Check whether a step exists
Description
Check whether a step exists
Usage
pip_has_step(x, step)
Arguments
x |
A pipeflow pip |
step |
A step name |
Value
Logical indicating if the step exists
Examples
p <- pip_new() |>
pip_add("load", \(x = 1) x) |>
pip_add("fit", \(x = ~load) x + 1)
pip_has_step(p, "load") # TRUE
pip_has_step(p, "fit") # TRUE
pip_has_step(p, "predict") # FALSE — step not yet added
Lock selected steps against updates
Description
Locks all selected steps in the pipeline unless p is a view, in which
case only steps covered by the view are locked.
Usage
pip_lock(p)
Arguments
p |
A pipeflow pip or view. |
Value
The updated pipeline or view, invisibly.
Examples
p <- pip_new() |>
pip_add("load", \(x = 10) x) |>
pip_add("fit", \(x = ~load) x * 2)
pip_run(p, lgr = NULL)
# Lock only "load" via a view so it won't be re-executed or overwritten
pip_lock(pip_view(p, i = "load"))
p[["pipeline"]][["locked"]] # TRUE, FALSE
# Locked steps are silently skipped during pip_run()
pip_run(p, lgr = NULL, force = TRUE)
p[["pipeline"]][["out"]][[1]] # still 10 — locked, not re-executed
pip_unlock(p)
p[["pipeline"]][["locked"]] # FALSE, FALSE
Create a pipeline
Description
Creates a new, empty pipeline. Add steps with pip_add() and execute
them with pip_run().
Usage
pip_new(name = "pipe")
Arguments
name |
Single name used for printing and for derived view names. |
Value
A pipeflow pipeline object.
Examples
# Create a named pipeline
p <- pip_new("my_analysis")
p[["name"]] # "my_analysis"
# Build a simple pipeline and run it
pip_add(p, "load", \(n = 5) seq_len(n))
pip_add(p, "double", \(x = ~load) x * 2) # x depends on load's output
p
pip_run(p)
p[["out"]] # list of outputs, one per step
Remove a step
Description
If other steps depend on the step to be removed, an error is
given and the removal is blocked, unless recursive was set to
TRUE. In recursive mode, the selected step and all downstream
dependent steps are removed together.
Usage
pip_remove(x, step, recursive = FALSE)
Arguments
x |
A pipeflow pip |
step |
|
recursive |
|
Value
The updated pipeline, invisibly.
Examples
p <- pip_new() |>
pip_add("load", \(x = 1) x) |>
pip_add("transform", \(x = ~load) x * 2) |>
pip_add("model", \(x = ~transform) x + 10)
# Removing a leaf step (nothing depends on it) works directly
pip_remove(p, "model")
p # "load", "transform"
# Trying to remove a step that others depend on raises an error:
# pip_remove(p, "load") # Error!
# recursive = TRUE removes the step and all its downstream dependents
pip_remove(p, "load", recursive = TRUE)
p # pipeline is now empty
Rename a step
Description
Renames the selected step and updates dependency references in downstream steps.
Usage
pip_rename(x, from, to)
Arguments
x |
A pipeflow pip |
from |
Existing step name |
to |
New step name |
Value
The updated pipeline, invisibly.
Examples
p <- pip_new() |>
pip_add("s1", \(x = 1) x) |>
pip_add("s2", \(x = ~s1) x + 1) # "s2" depends on "s1"
# Downstream dependency references are updated automatically
pip_rename(p, from = "s1", to = "load_data")
p
#' # Trying to rename to an existing step name raises an error:
try(pip_rename(p, "load_data", to = "s2")) # step 's2' already exists!
Replace a step
Description
Replaces a step's function while keeping it in the same position in the
pipeline. Downstream steps are automatically marked as outdated and will
re-run on the next pip_run().
Usage
pip_replace(x, step, fun, tags = character(0))
Arguments
x |
A pipeflow pipeline object. |
step |
Step name. |
fun |
Function to execute for the step. |
tags |
Optional character vector of tags belonging to the step.
Can also be adjusted later using |
Value
The updated pipeline, invisibly.
Examples
p <- pip_new() |>
pip_add("load", \(n = 5) seq_len(n)) |>
pip_add("double", \(x = ~load) x * 2)
pip_run(p)
p
# Replace "load" — downstream steps are automatically marked "outdated"
pip_replace(p, "load", \(n = 3) seq_len(n))
p
# Re-run to bring everything up to date
pip_run(p)
p
Run a pipeline
Description
Executes all pending steps in order. Steps already in state "done" are
skipped unless force = TRUE.
Usage
pip_run(
x,
lgr = pipeflow_lgr,
force = FALSE,
progress = NULL,
recursive = FALSE
)
Arguments
x |
A pipeflow pip or view |
lgr |
A logging function of the form |
force |
Logical indicating if all steps should be forced to run, regardless of whether they are outdated or not. |
progress |
Optional callback of the form
|
recursive |
If |
Details
When x is a view, requested rows are run together with required
upstream dependencies.
Value
The updated pipeline or view, invisibly.
See Also
vignette("v06-self-modify-pipeline", package = "pipeflow")
for an advanced example of recursive/dynamic pipelines.
Examples
p <- pip_new() |>
pip_add("load", \(n = 3) seq_len(n)) |>
pip_add("square", \(x = ~load) x^2) |>
pip_add("total", \(x = ~square) sum(x))
pip_run(p)
p
# Already-done steps are skipped on a second run
pip_run(p) # all steps skipped
# lgr = NULL suppresses log output
pip_run(p, lgr = NULL)
# force = TRUE re-executes every step regardless of state
pip_run(p, force = TRUE)
# Run only a subset of steps via a view;
# upstream dependencies are automatically included
v <- pip_view(p, i = "total")
pip_run(v)
Set independent parameters
Description
Updates the default values of tunable parameters across the pipeline. Affected steps and their downstream dependents are automatically marked as outdated.
Usage
pip_set_params(p, params = list())
Arguments
p |
A pipeflow pip or view |
params |
Named list of parameters to set. |
Details
Parameters of locked steps are never changed and their state remains unchanged.
Value
The updated pipeline or view, invisibly.
Examples
p <- pip_new() |>
pip_add("load", \(n = 10) seq_len(n)) |>
pip_add("scale", \(x = ~load, factor = 0.5) x * factor)
# See all adjustable parameters before running
pip_get_params(p) # list(n = 10, factor = 0.5)
# Updating params marks affected steps (and their dependents) outdated
pip_set_params(p, params = list(n = 5, factor = 2.0))
p
pip_run(p)
p
Add tags to selected steps
Description
Adds tags to existing tags for all steps in the pipeline unless p is a
view, in which case tags are only added for steps covered by the view.
Locked steps are skipped and not updated.
Usage
pip_tag(p, tags = character())
Arguments
p |
A pipeflow pip or view. |
tags |
Character vector of tags to add for each selected step. |
Value
The updated pipeline or view, invisibly.
Examples
p <- pip_new() |>
pip_add("load", \(x = 1) x) |>
pip_add("fit", \(x = ~load) x + 1)
# Tag every step in the pipeline at once
pip_tag(p, tags = c("daily", "core"))
p[["pipeline"]][["tags"]] # both steps have c("daily", "core")
# Add an extra tag to only one step via a view
v <- pip_view(p, i = "fit")
pip_tag(v, tags = "model")
p[["pipeline"]][["tags"]] # "fit" also has "model"
Unlock selected steps
Description
Unlocks all selected steps in the pipeline unless p is a view, in which
case only steps covered by the view are unlocked.
Usage
pip_unlock(p)
Arguments
p |
A pipeflow pip or view. |
Value
The updated pipeline or view, invisibly.
Examples
p <- pip_new() |>
pip_add("load", \(x = 1) x) |>
pip_add("fit", \(x = ~load) x * 2)
# Lock all steps, then unlock to restore normal execution
pip_lock(p)
p[["pipeline"]][["locked"]] # TRUE, TRUE
pip_unlock(p)
p[["pipeline"]][["locked"]] # FALSE, FALSE
Remove tags from selected steps
Description
Removes tags from existing tags for all steps in the pipeline unless p
is a view, in which case tags are only removed for steps covered by the
view. Locked steps are skipped and not updated.
Usage
pip_untag(p, tags = character())
Arguments
p |
A pipeflow pip or view. |
tags |
Character vector of tags to remove for each selected step. |
Value
The updated pipeline or view, invisibly.
Examples
p <- pip_new() |>
pip_add("load", \(x = 1) x, tags = c("daily", "core")) |>
pip_add("fit", \(x = ~load) x + 1, tags = c("daily", "model"))
# Remove "daily" from all steps
pip_untag(p, tags = "daily")
# "load" retains "core"; "fit" retains "model"
p[["pipeline"]][["tags"]]
Create a pipeline view
Description
Creates a filtered view showing only a selected subset of steps.
A view references the underlying pipeline without copying it, so
operations like pip_run() and pip_set_params() applied to a view
affect only the selected steps.
Usage
pip_view(
x,
i = integer(),
filter = list(),
tags = character(),
fixed = TRUE,
...
)
Arguments
x |
A pipeflow pipeline or view. |
i |
Optional row indices or step names to keep. |
filter |
A named list of filters to apply. Each element can be a
character vector specifying the values to keep for the corresponding
property or, if |
tags |
Tag filter (character). Keeps steps with any matching tag. |
fixed |
If TRUE, values in |
... |
further args passed to |
Value
A pipeflow_view object.
Examples
p <- pip_new()
pip_add(p, "load_raw", \(x = 1) x,
tags = c("io", "core", "daily")
)
pip_add(p, "fit_model", \(x = 2) x + 1,
tags = c("model")
)
pip_add(p, "eval_model", \(x = ~fit_model) x,
tags = c("model", "daily", "report")
)
# Filter by a fixed column value (one or more states)
pip_view(p, filter = list(state = "new"))
# Combine filters: step pattern AND state
pip_view(p, filter = list(step = "model", state = "new"))
# Filter by tag — keeps steps that have *any* of the given tags
pip_view(p, tags = "daily")
# Combine explicit step selection with a filter (intersection)
pip_view(p,
i = c("load_raw", "fit_model"),
filter = list(state = "new")
)
# Select by integer row indices
pip_view(p, i = c(1L, 2L), filter = list(state = "new"))
# Use a regex pattern to match step names
pip_view(p, filter = list(step = "_model$"), fixed = FALSE)
# Views are composable: create a view-of-view for progressive narrowing
v1 <- pip_view(p, tags = "daily")
print(v1) # load_raw, eval_model
v2 <- pip_view(v1, tags = "report")
print(v2) # eval_model only
Add pipeline step
Description
A pipeline consists of a series of steps, which usually
are added one by one. Each step is made up of a function computing
something once the pipeline is run. This function can be an existing
R function (e.g. mean()) or an anonymous/lambda function specifically
defined for the pipeline. One useful feature is that function
parameters can refer to results of earlier pipeline steps using the
syntax x = ~earlier_step_name - see the Examples for more details.
Usage
pipe_add(
pip,
step,
fun,
params = list(),
description = "",
group = step,
keepOut = FALSE
)
Arguments
pip |
|
step |
|
fun |
|
params |
|
description |
|
group |
|
keepOut |
|
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_add() instead.
Examples
# Add steps with lambda functions
p <- pipe_new("myPipe", data = 1)
pipe_add(p, "s1", \(x = ~data) 2 * x) # use input data
pipe_add(p, "s2", \(x = ~data, y = ~s1) x * y)
try(pipe_add(p, "s2", \(z = 3) 3)) # error: step 's2' exists already
try(pipe_add(p, "s3", \(z = ~foo) 3)) # dependency 'foo' not found
p
# Add step with existing function
p <- pipe_new("myPipe", data = c(1, 2, NA, 3, 4))
try(pipe_add(p, "calc_mean", mean)) # default value for x is missing
pipe_add(p, "calc_mean", mean, params = list(x = ~data, na.rm = TRUE))
p |>
pipe_run() |>
pipe_get_out("calc_mean")
# Step description
p <- pipe_new("myPipe", data = 1:10)
pipe_add(p, "s1", \(x = ~data) 2 * x, description = "multiply by 2")
print(p, verbose = TRUE) # print all columns including description
# Group output
p <- pipe_new("myPipe", data = data.frame(x = 1:2, y = 3:4))
pipe_add(p, "prep_x", \(data = ~data) data$x, group = "prep")
pipe_add(p, "prep_y", \(data = ~data) (data$y)^2, group = "prep")
pipe_add(p, "sum", \(x = ~prep_x, y = ~prep_y) x + y)
p |>
pipe_run() |>
pipe_collect_out(all = TRUE)
Append two pipelines
Description
When appending, pipeflow takes care of potential name
clashes with respect to step names and dependencies, that is, if
needed, it will automatically adapt step names and dependencies to
make sure they are unique in the merged pipeline.
Usage
pipe_append(pip, p, outAsIn = FALSE, tryAutofixNames = TRUE, sep = ".")
Arguments
pip |
|
p |
|
outAsIn |
|
tryAutofixNames |
|
sep |
|
Value
returns new combined Pipeline object.
Lifecycle
Deprecated. Legacy API. Use pip_bind() instead.
Examples
# Append pipeline
p1 <- pipe_new("pipe1")
pipe_add(p1, "step1", \(x = 1) x)
p2 <- pipe_new("pipe2")
pipe_add(p2, "step2", \(y = 1) y)
p1 |> pipe_append(p2)
# Append pipeline with potential name clashes
p3 <- pipe_new("pipe3")
pipe_add(p3, "step1", \(z = 1) z)
p1 |>
pipe_append(p2) |>
pipe_append(p3)
# Use output of first pipeline as input for second pipeline
p1 <- pipe_new("pipe1", data = 8)
p2 <- pipe_new("pipe2")
pipe_add(p1, "square", \(x = ~data) x^2)
pipe_add(p2, "log2", \(x = ~data) log2(x))
p12 <- p1 |> pipe_append(p2, outAsIn = TRUE)
p12 |>
pipe_run() |>
pipe_get_out("log2")
p12
# Custom name separator for adapted step names
p1 |> pipe_append(p2, sep = "___")
Append string to all step names
Description
Appends string to all step names and takes care of updating step dependencies accordingly.
Usage
pipe_append_to_step_names(pip, postfix, sep = ".")
Arguments
pip |
|
postfix |
|
sep |
|
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
p <- pipe_new("pipe")
pipe_add(p, "step1", \(x = 1) x)
pipe_add(p, "step2", \(y = 1) y)
pipe_append_to_step_names(p, "new")
p
pipe_append_to_step_names(p, "foo", sep = "__")
p
Clone pipeline
Description
Creates a copy of a pipeline object.
Usage
pipe_clone(pip, deep = FALSE)
Arguments
pip |
|
deep |
|
Value
returns the copied Pipeline object
Lifecycle
Deprecated. Legacy API. Use pip_clone() instead.
Examples
p1 <- pipe_new("pipe")
pipe_add(p1, "step1", \(x = 1) x)
p2 <- pipe_clone(p1)
pipe_add(p2, "step2", \(y = 1) y)
p1
p2
Collect output from entire pipeline
Description
Collects output afer pipeline run, by default, from all
steps for which keepOut was set to TRUE when steps were added
(see pipe_add()). The output is grouped by the group names (see
group parameter in pipe_add()),
which by default are set identical to the step names.
Usage
pipe_collect_out(pip, groupBy = "group", all = FALSE)
Arguments
pip |
|
groupBy |
|
all |
|
Value
list containing the output, named after the groups, which,
by default, are the steps.
Lifecycle
Deprecated. Legacy API. Use pip_collect_out() instead.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "step1", \(x = ~data) x + 2)
pipe_add(p, "step2", \(x = ~step1) x + 2, keepOut = TRUE)
pipe_run(p)
pipe_collect_out(p)
pipe_collect_out(p, all = TRUE) |> str()
# Grouped output
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "step1", \(x = ~data) x + 2, group = "add")
pipe_add(p, "step2", \(x = ~step1, y = 2) x + y, group = "add")
pipe_add(p, "step3", \(x = ~data) x * 3, group = "mult")
pipe_add(p, "step4", \(x = ~data, y = 2) x * y, group = "mult")
p
pipe_run(p)
pipe_collect_out(p, all = TRUE) |> str()
# Grouped by state
pipe_set_params(p, list(y = 5))
p
pipe_collect_out(p, groupBy = "state", all = TRUE) |> str()
Discard steps from the pipeline
Description
Discard all steps that match a given pattern.
Usage
pipe_discard_steps(pip, pattern, recursive = FALSE, fixed = TRUE, ...)
Arguments
pip |
|
pattern |
|
recursive |
|
fixed |
|
... |
further arguments passed to |
Value
the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~add1) x + 2)
pipe_add(p, "mult3", \(x = ~add1) x * 3)
pipe_add(p, "mult4", \(x = ~add2) x * 4)
p
pipe_discard_steps(p, "mult")
p
# Re-add steps
pipe_add(p, "mult3", \(x = ~add1) x * 3)
pipe_add(p, "mult4", \(x = ~add2) x * 4)
p
# Discarding 'add1' does not work ...
try(pipe_discard_steps(p, "add1"))
# ... unless we enforce to remove its downstream dependencies as well
pipe_discard_steps(p, "add1", recursive = TRUE)
p
# Trying to discard non-existent steps is just ignored
pipe_discard_steps(p, "non-existent")
Get data
Description
Get the data set for the pipeline
Usage
pipe_get_data(pip)
Arguments
pip |
|
Value
the output defined in the data step, which by default is
the first step of the pipeline
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_get_data(p)
pipe_set_data(p, 3:4)
pipe_get_data(p)
Get step dependencies
Description
Get step dependencies
Usage
pipe_get_depends(pip)
pipe_get_depends_down(pip, step, recursive = TRUE)
pipe_get_depends_up(pip, step, recursive = TRUE)
Arguments
pip |
|
step |
|
recursive |
|
Value
-
pipe_get_depends: named list of dependencies for each step -
pipe_get_depends_down: list of downstream dependencies -
pipe_get_depends_up: list of downstream dependencies
Methods
-
pipe_get_depends: get all dependencies for all steps defined in the pipeline -
pipe_get_depends_down: get all downstream dependencies of a given step, by default descending recursively. -
pipe_get_depends_up: get all upstream dependencies of a given step, by default descending recursively.
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
# pipe_get_depends
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y)
pipe_get_depends(p)
# pipe_get_depends_down
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y)
pipe_add(p, "mult3", \(x = ~add1) x * 3)
pipe_add(p, "mult4", \(x = ~add2) x * 4)
pipe_get_depends_down(p, "add1")
pipe_get_depends_down(p, "add1", recursive = FALSE)
# pipe_get_depends_up
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y)
pipe_add(p, "mult3", \(x = ~add1) x * 3)
pipe_add(p, "mult4", \(x = ~add2) x * 4)
pipe_get_depends_up(p, "mult4")
pipe_get_depends_up(p, "mult4", recursive = FALSE)
Pipeline graph
Description
Get the pipeline as a graph with nodes and edges.
Usage
pipe_get_graph(pip, groups = NULL)
Arguments
pip |
|
groups |
|
Value
list with two data frames, one for nodes and one for edges
ready to be used with the visNetwork::visNetwork() function of the
visNetwork package.
Lifecycle
Deprecated. Legacy API. Use pip_get_graph() instead.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = ~add1) x + y)
pipe_add(p, "mult1", \(x = ~add1, y = ~add2) x * y)
graph <- pipe_get_graph(p)
graph
if (require("visNetwork", quietly = TRUE)) {
do.call(visNetwork, args = graph)
}
Get output of given step
Description
Get output of given step
Usage
pipe_get_out(pip, step)
Arguments
pip |
|
step |
|
Value
the output at the given step.
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
See Also
pipe_collect_out() to collect output of multiple steps.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "add2", \(x = ~data, y = ~add1) x + y)
pipe_run(p)
pipe_get_out(p, "add1")
pipe_get_out(p, "add2")
Get pipeline parameters
Description
Retrieves unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps.
Usage
pipe_get_params(pip, ignoreHidden = TRUE)
pipe_get_params_at_step(pip, step, ignoreHidden = TRUE)
pipe_get_params_unique(pip, ignoreHidden = TRUE)
pipe_get_params_unique_json(pip, ignoreHidden = TRUE)
Arguments
pip |
|
ignoreHidden |
|
step |
|
Value
-
pipe_get_params: list of parameters, sorted and named by step - steps with no parameters are filtered out -
pipe_get_params_at_step: list of parameters at given step -
pipe_get_params_unique: list of parameters where each parameter is only listed once. The values of the parameters will be the values of the first step where the parameters were defined, respectively. -
get_params_unique_json: flat unnamed json list of unique parameters
Lifecycle
Deprecated. Legacy API. Use pip_get_params() instead.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
# pipe_get_params
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z)
pipe_add(p, "add3", \() 1 + 2)
pipe_get_params(p, ) |> str()
pipe_get_params(p, ignoreHidden = FALSE) |> str()
# pipe_get_params_at_step
pipe_get_params_at_step(p, "add2")
pipe_get_params_at_step(p, "add2", ignoreHidden = FALSE)
pipe_get_params_at_step(p, "add3")
# pipe_get_params_unique
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = 2, .z = 3) x + y + .z)
pipe_add(p, "mult1", \(x = 4, y = 5, .z = 6, b = ~add2) x * y * b)
pipe_get_params_unique(p)
pipe_get_params_unique(p, ignoreHidden = FALSE)
# get_params_unique_json
pipe_get_params_unique_json(p)
pipe_get_params_unique_json(p, ignoreHidden = FALSE)
Get step information
Description
Get step information
Usage
pipe_get_step(pip, step)
pipe_get_step_names(pip)
pipe_get_step_number(pip, step)
pipe_has_step(pip, step)
Arguments
pip |
|
step |
|
Value
-
pipe_get_step:data.tablerow containing the step -
pipe_get_step_names:charactervector of step names -
pipe_get_step_number: the step number in the pipeline -
pipe_get_step_number: whether step exists
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_has_step() instead.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = 2, z = ~add1) x + y + z)
pipe_run(p)
# pipe_get_step_names
pipe_get_step_names(p)
# get_step_number
pipe_get_step_number(p, "add1")
pipe_get_step_number(p, "add2")
# pipe_has_step
pipe_has_step(p, "add1")
pipe_has_step(p, "foo")
# pipe_get_step
add1 <- pipe_get_step(p, "add1")
add1
add1[["params"]]
add1[["fun"]]
try(p$get_step("foo")) # error: step 'foo' does not exist
Insert step
Description
Insert step
Usage
pipe_insert_after(pip, afterStep, step, ...)
pipe_insert_before(pip, beforeStep, step, ...)
Arguments
pip |
|
afterStep |
|
step |
|
... |
further arguments passed to |
beforeStep |
|
Value
returns the Pipeline object invisibly
Methods
-
pipe_insert_after: insert step after a certain step of the pipeline -
pipe_insert_before: insert step before a certain step of the pipeline
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
# pipe_insert_after
p <- pipe_new("pipe", data = 1)
pipe_add(p, "f1", \(x = 1) x)
pipe_add(p, "f2", \(x = ~f1) x)
pipe_insert_after(p, "f1", step = "after_f1", \(x = ~f1) x)
p
# insert_before
pipe_insert_before(p, "f2", step = "before_f2", \(x = ~f1) 2 * x)
p
Length of the pipeline
Description
Length of the pipeline
Usage
pipe_length(pip)
Arguments
pip |
|
Value
numeric length of pipeline, that is, the total number of steps
Lifecycle
Deprecated. Legacy API. Use length() instead.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "f1", \(x = 1) x)
pipe_add(p, "f2", \(y = 1) y)
p
pipe_length(p)
Lock steps
Description
Locking a step means that both its parameters and its
output (given it has output) are locked such that neither
setting new pipeline parameters nor future pipeline runs can change
the current parameter and output content. To unlock a locked step,
use pipe_unlock_step().
Usage
pipe_lock_step(pip, step)
pipe_unlock_step(pip, step)
Arguments
pip |
|
step |
|
Value
the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_lock() instead.
Deprecated. Legacy API. Use pip_unlock() instead.
Examples
# pipe_lock_step
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = 1, data = ~data) x + data)
pipe_add(p, "add2", \(x = 1, data = ~data) x + data)
pipe_run(p)
pipe_get_out(p, "add1")
pipe_get_out(p, "add2")
pipe_lock_step(p, "add1")
pipe_set_data(p, 3)
pipe_set_params(p, list(x = 3))
pipe_run(p)
pipe_get_out(p, "add1")
pipe_get_out(p, "add2")
# pipe_unlock_step
pipe_unlock_step(p, "add1")
pipe_set_params(p, list(x = 3))
pipe_run(p)
pipe_get_out(p, "add1")
Create new pipeline
Description
A new pipeline is always initialized with one 'data' step, which basically is a function returning the data.
Usage
pipe_new(name, data = NULL, logger = NULL)
Arguments
name |
the name of the Pipeline |
data |
optional data used at the start of the pipeline. The
data also can be set later using the |
logger |
custom logger to be used for logging. If no logger is provided, the default logger is used, which should be sufficient for most use cases. If you do want to use your own custom log function, you need to provide a function that obeys the following form:
The Note that with the default logger, the log layout can be altered
any time via |
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_new() instead.
Examples
data <- data.frame(x = 1:2, y = 3:4)
p <- pipe_new("myPipe", data = data)
p |>
pipe_run() |>
pipe_get_out("data")
# Setting data later
p <- pipe_new("myPipe")
pipe_get_data(p)
p <- pipe_set_data(p, data)
pipe_get_data(p)
p |>
pipe_run() |>
pipe_get_out("data")
# Initialize with custom logger
my_logger <- function(level, msg, ...) {
cat(level, msg, "\n")
}
p <- pipe_new("myPipe", data = data, logger = my_logger)
p |>
pipe_run() |>
pipe_get_out("data")
Pop steps from the pipeline
Description
Use this function to drop steps from the end of the pipeline.
Usage
pipe_pop_step(pip)
pipe_pop_steps_after(pip, step)
pipe_pop_steps_from(pip, step)
Arguments
pip |
|
step |
|
Value
string the name of the step that was removed
Methods
-
pipe_pop_step: drop last step from the pipeline -
pipe_pop_steps_after: drop all steps after given steps -
pipe_pop_steps_from: drop all steps from and including given steps
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
# pipe_pop_step
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "f1", \(x = 1) x)
pipe_add(p, "f2", \(y = 1) y)
p
pipe_pop_step(p)
p
# pipe_pop_steps_after
pipe_add(p, "f2", \(y = 1) y)
pipe_add(p, "f3", \(z = 1) z)
p
pipe_pop_steps_after(p, "f1")
p
# pipe_pop_steps_from
pipe_add(p, "f2", \(y = 1) y)
pipe_add(p, "f3", \(z = 1) z)
p
pipe_pop_steps_from(p, "f1")
p
Print the pipeline as a table
Description
Print the pipeline as a table
Usage
pipe_print(pip, verbose = FALSE)
Arguments
pip |
|
verbose |
|
Value
the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use print() instead.
Examples
p <- pipe_new("pipe", data = 1:2)
p$add("f1", \(x = 1) x)
p$add("f2", \(y = 1) y)
pipe_print(p)
pipe_print(p, verbose = TRUE)
# Also works with standard print function
print(p)
print(p, verbose = TRUE)
Remove certain step from the pipeline.
Description
Can be used to remove any given step.
If other steps depend on the step to be removed, an error is
given and the removal is blocked, unless recursive was set to TRUE.
Usage
pipe_remove_step(pip, step, recursive = FALSE)
Arguments
pip |
|
step |
|
recursive |
|
Value
the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_remove() instead.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = ~add1) x + y)
pipe_add(p, "mult1", \(x = 1, y = ~add2) x * y)
p
pipe_remove_step(p, "mult1")
p
try(pipe_remove_step(p, "add1"))
pipe_remove_step(p, "add1", recursive = TRUE)
p
Rename step
Description
Safely rename a step in the pipeline. If new step name would result in a name clash, an error is given.
Usage
pipe_rename_step(pip, from, to)
Arguments
pip |
|
from |
|
to |
|
Value
the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_rename() instead.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "add1", \(data = ~data, x = 1) x + data)
pipe_add(p, "add2", \(x = 1, y = ~add1) x + y)
p
try(pipe_rename_step(p, from = "add1", to = "add2"))
pipe_rename_step(p, from = "add1", to = "first_add")
p
Replace pipeline step
Description
Replaces an existing pipeline step.
Usage
pipe_replace_step(
pip,
step,
fun,
params = list(),
description = "",
group = step,
keepOut = FALSE
)
Arguments
pip |
|
step |
|
fun |
|
params |
|
description |
|
group |
|
keepOut |
|
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_replace() instead.
See Also
Examples
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "add2", \(x = ~data, y = 2) x + y)
pipe_add(p, "mult", \(x = 1, y = 2) x * y, keepOut = TRUE)
pipe_run(p) |> pipe_collect_out()
pipe_replace_step(p, "mult", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
pipe_run(p) |> pipe_collect_out()
try(pipe_replace_step(p, "foo", \(x = 1) x)) # step 'foo' does not exist
Reset pipeline
Description
Resets the pipeline to the state before it was run. This means that all output is removed and the state of all steps is reset to 'New'.
Usage
pipe_reset(pip)
Arguments
pip |
|
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
p <- pipe_new("pipe", data = 1:2)
pipe_add(p, "f1", \(x = 1) x)
pipe_add(p, "f2", \(y = 1) y)
pipe_run(p, )
p
pipe_reset(p)
p
Run pipeline
Description
Runs all new and/or outdated pipeline steps.
Usage
pipe_run(
pip,
force = FALSE,
recursive = TRUE,
cleanUnkept = FALSE,
progress = NULL,
showLog = TRUE
)
Arguments
pip |
|
force |
|
recursive |
|
cleanUnkept |
|
progress |
|
showLog |
|
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_run() instead.
Examples
# Simple pipeline
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "add2", \(x = ~add1, z = 2) x + z)
pipe_add(p, "final", \(x = ~add1, y = ~add2) x * y, keepOut = TRUE)
p |>
pipe_run() |>
pipe_collect_out()
pipe_set_params(p, list(z = 4)) # outdates steps add2 and final
p
p |> pipe_run() |> pipe_collect_out()
pipe_run(p, cleanUnkept = TRUE)
p
# Recursive pipeline (for advanced users)
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "new_pipe", \(x = ~add1) {
p2 <- pipe_new("new_pipe", data = x)
pipe_add(p2, "add1", \(x = ~data) x + 1)
pipe_add(p2, "add2", \(x = ~add1) x + 2, keepOut = TRUE)
}
)
p |> pipe_run() |> pipe_collect_out()
# Run pipeline with progress bar
p <- pipe_new("pipe", data = 1)
pipe_add(p, "first step", \() Sys.sleep(0.5))
pipe_add(p, "second step", \() Sys.sleep(0.5))
pipe_add(p, "last step", \() Sys.sleep(0.5))
pb <- txtProgressBar(min = 1, max = pipe_length(p), style = 3)
fprogress <- function(value, detail) {
setTxtProgressBar(pb, value)
}
pipe_run(p, progress = fprogress, showLog = FALSE)
Run specific step
Description
Run given pipeline step possibly together with upstream and/or downstream dependencies.
Usage
pipe_run_step(
pip,
step,
upstream = TRUE,
downstream = FALSE,
cleanUnkept = FALSE
)
Arguments
pip |
|
step |
|
upstream |
|
downstream |
|
cleanUnkept |
|
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_run() instead.
Examples
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y)
pipe_add(p, "add2", \(x = ~add1, z = 2) x + z)
pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y)
pipe_run_step(p, "add2")
pipe_run_step(p, "add2", downstream = TRUE)
pipe_run_step(p, "mult", upstream = TRUE)
Set data
Description
Set data in first step of pipeline.
Usage
pipe_set_data(pip, data)
Arguments
pip |
|
data |
initial data set. |
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
p |>
pipe_run() |>
pipe_collect_out()
pipe_set_data(p, 3)
p |>
pipe_run() |>
pipe_collect_out()
Split-multiply pipeline by list of data sets
Description
This function can be used to apply the pipeline repeatedly to various data sets. For this, the pipeline split-copies itself by the list of given data sets. Each sub-pipeline will have one of the data sets set as input data. The step names of the sub-pipelines will be the original step names plus the name of the data set.
Usage
pipe_set_data_split(
pip,
dataList,
toStep = character(),
groupBySplit = TRUE,
sep = "."
)
Arguments
pip |
|
dataList |
|
toStep |
|
groupBySplit |
|
sep |
|
Value
new combined Pipeline with each sub-pipeline having set
one of the data sets.
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
# Split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipe_set_data_split(p, dataList)
p
p |>
pipe_run() |>
pipe_collect_out() |>
str()
# Don't group output by split
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipe_set_data_split(p, dataList, groupBySplit = FALSE)
p
p |>
pipe_run() |>
pipe_collect_out() |>
str()
# Split up to certain step
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y)
pipe_add(p, "average_result", \(x = ~mult) mean(unlist(x)), keepOut = TRUE)
p
pipe_get_depends(p)[["average_result"]]
pipe_set_data_split(p, dataList, toStep = "mult")
p
pipe_get_depends(p)[["average_result"]]
p |>
pipe_run() |>
pipe_collect_out() |>
str()
Change output flag
Description
Change the keepOut flag at a given pipeline step,
which determines whether the output of that step is collected
when calling pipe_collect_out()after the pipeline was run. See columnkeepOut' when printing a pipeline to view the status.
Usage
pipe_set_keep_out(pip, step, keepOut = TRUE)
Arguments
pip |
|
step |
|
keepOut |
|
Value
the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 1) x + y, keepOut = TRUE)
pipe_add(p, "add2", \(x = ~data, y = 2) x + y)
pipe_add(p, "mult", \(x = ~add1, y = ~add2) x * y)
p |>
pipe_run() |>
pipe_collect_out()
pipe_set_keep_out(p, "add1", keepOut = FALSE)
pipe_set_keep_out(p, "mult", keepOut = TRUE)
p |>
pipe_run() |>
pipe_collect_out()
Set pipeline parameters
Description
Set unbound function parameters defined in the pipeline where 'unbound' means parameters that are not linked to other steps. Trying to set parameters that don't exist in the pipeline is ignored, by default, with a warning.
Usage
pipe_set_params(pip, params, warnUndefined = TRUE)
Arguments
pip |
|
params |
|
warnUndefined |
|
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_set_params() instead.
Examples
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 2) x + y)
pipe_add(p, "add2", \(x = ~data, y = 3) x + y)
pipe_add(p, "mult", \(x = 4, z = 5) x * z)
pipe_get_params(p)
pipe_set_params(p, params = list(x = 3, y = 3))
pipe_get_params(p)
pipe_set_params(p, params = list(x = 5, z = 3))
pipe_get_params(p)
suppressWarnings(
pipe_set_params(p, list(foo = 3)) # gives warning as 'foo' is undefined
)
pipe_set_params(p, list(foo = 3), warnUndefined = FALSE)
Set parameters at step
Description
Set unbound function parameters defined at given pipeline step where 'unbound' means parameters that are not linked to other steps. If one or more parameters don't exist, an error is given.
Usage
pipe_set_params_at_step(pip, step, params)
Arguments
pip |
|
step |
|
params |
|
Value
returns the Pipeline object invisibly
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
p <- pipe_new("pipe", data = 1)
pipe_add(p, "add1", \(x = ~data, y = 2, z = 3) x + y)
pipe_set_params_at_step(p, step = "add1", params = list(y = 5, z = 6))
pipe_get_params(p)
try(
pipe_set_params_at_step(p, step = "add1", params = list(foo = 3))
)
Split-up pipeline
Description
Splits pipeline into its independent parts. This can be useful, for example, to split-up the pipeline in order to run each part in parallel.
Usage
pipe_split(pip)
Arguments
pip |
|
Value
list of Pipeline objects
Lifecycle
Deprecated. Legacy API. Use pip_new() and
related pip_* functions.
Examples
# Example for two independent calculation paths
p <- pipe_new("pipe", data = 1)
pipe_add(p, "f1", \(x = ~data) x)
pipe_add(p, "f2", \(x = 1) x)
pipe_add(p, "f3", \(x = ~f1) x)
pipe_add(p, "f4", \(x = ~f2) x)
pipe_split(p)
# Example of split by three data sets
dataList <- list(a = 1, b = 2, c = 3)
p <- pipe_new("pipe")
pipe_add(p, "add1", \(x = ~data) x + 1, keepOut = TRUE)
pipe_add(p, "mult", \(x = ~data, y = ~add1) x * y, keepOut = TRUE)
pipes <- pipe_set_data_split(p, dataList) |> pipe_split()
pipes
Print pipeflow objects
Description
Print pipeflow objects
Usage
## S3 method for class 'pipeflow_pip'
print(
x,
rows = integer(),
cols = getOption("pipeflow.print.cols", default = "core"),
topn = getOption("pipeflow.print.topn", default = 5),
nrows = getOption("pipeflow.print.nrows", default = 50),
row.names = getOption("pipeflow.print.rownames", default = TRUE),
class = getOption("pipeflow.print.class", default = FALSE),
header = TRUE,
...
)
## S3 method for class 'pipeflow_view'
print(x, header = TRUE, ...)
Arguments
x |
A pipeflow pipeline or view. |
rows |
Row indices to be printed. If empty, all rows are printed. |
cols |
The columns to be printed. Can be either one of
|
topn |
The number of rows to be printed from the beginning
and end of tables with more than |
nrows |
The number of rows printed before truncation is enforced. |
row.names |
If TRUE, row indices will be printed alongside x. |
class |
If TRUE, the resulting output will include above each column its storage class (or a self-evident abbreviation thereof). |
header |
If TRUE, a header with the pipeline name and number of steps will be printed. |
... |
Other arguments passed to |
Value
Invisibly returns x.
Examples
p <- pip_new("demo") |>
pip_add("load", \(n = 5) seq_len(n), tags = c("io", "raw")) |>
pip_add("square", \(x = ~load) x^2, tags = "compute") |>
pip_add("total", \(x = ~square) sum(x), tags = "compute")
print(p) # core columns: step, depends, tags, out, state
print(p, cols = "all") # all non-hidden columns
print(p, rows = 2:3) # print only steps 2 and 3
p <- pip_new() |>
pip_add("s1", \(x = 1) x, tags = "io") |>
pip_add("s2", \(x = ~s1) x + 1, tags = "model")
# A view header shows how many steps are selected out of the total
v <- pip_view(p, tags = "model")
print(v) # "<pipeflow_view> pipe view (1 of 2 steps)"
Set pipeflow log layout
Description
This function provides an easy way to set the basic log
layout of the pipeline logging. For a fine-grained control of the logger,
which you can retrieve via lgr::get_logger("pipeflow"), see e.g. the
logger_config function from the lgr package.
Usage
set_log_layout(layout = c("text", "json"))
Arguments
layout |
Layout name, which at this point can be either 'text' or 'json'. |
Value
invisibly returns a Logger object
Examples
p <- Pipeline$new("pipe", data = 1:2)
p$add("add1", \(data = ~data, x = 1) x + data)
p$run()
lg <- set_log_layout("json")
print(lg)
p$run()
set_log_layout("text")
p$run()