Get started

library(flowr)
setup()

This will copy the flowr helper script to ~/bin. Please make sure that this folder is in your $PATH variable. For more details refer to setup’s help section.

Running flowr from the terminal will fetch you the following:

Usage: flowr function [arguments]

status          Detailed status of a flow(s).
rerun           rerun a previously failed flow
kill            Kill the flow, upon providing working directory
fetch_pipes     Checking what modules and pipelines are available; flowr fetch_pipes

Please use 'flowr -h function' to obtain further information about the usage of a specific function.

Toy example

Consider, a simple example where we have three instances of linux’s sleep command. After its completion three tmp files are created with some random data. Then, a merging step follows, combining the tmp files into one big file. Next, we use du to calculate the size of the merged file.

To create this flow in flowr, we need the actual commands to run; and a set of instructions regarding how to stitch the individual steps into a coherent pipeline.

Here is a table with the commands we would like to run ( or flow mat ).

samplename jobname cmd
sample1 sleep sleep 10 && sleep 2;echo ‘hello’
sample1 sleep sleep 11 && sleep 8;echo ‘hello’
sample1 sleep sleep 11 && sleep 17;echo ‘hello’
sample1 create_tmp head -c 100000 /dev/urandom > sample1_tmp_1
sample1 create_tmp head -c 100000 /dev/urandom > sample1_tmp_2
sample1 create_tmp head -c 100000 /dev/urandom > sample1_tmp_3
sample1 merge cat sample1_tmp_1 sample1_tmp_2 sample1_tmp_3 > sample1_merged
sample1 size du -sh sample1_merged; echo ‘MY shell:’ $SHELL

Further, we use an additional file specifying the relationship between the steps, and also other resource requirements: flow_def.

jobname sub_type prev_jobs dep_type queue memory_reserved walltime cpu_reserved platform jobid
sleep scatter none none short 2000 1:00 1 local 1
create_tmp scatter sleep serial short 2000 1:00 1 local 2
merge serial create_tmp gather short 2000 1:00 1 local 3
size serial merge serial short 2000 1:00 1 local 4

Stitch it

We use the two files described above and stitch them to create a flow object (which contains all the information we need for cluster submission).

ex = file.path(system.file(package = "flowr"), "pipelines")
flowmat = as.flowmat(file.path(ex, "sleep_pipe.tsv"))
flowdef = as.flowdef(file.path(ex, "sleep_pipe.def"))

fobj <- to_flow(x = flowmat, 
                 def = flowdef,
                 flowname = "example1", ## give it a name
                 platform = "lsf")      ## override platform mentioned in flow def

Refer to to_flow’s help section for more details.

Plot it

We can use plot_flow to quickly visualize the flow; this really helps when developing complex workflows.

plot_flow(fobj)     # ?plot_flow for more information
plot_flow(flowdef) # plot_flow works on flow definition as well

Flow chart describing process for example 1

Refer to plot_flow’s help section for more details.

Dry Run

submit_flow(fobj)
Test Successful!
You may check this folder for consistency. Also you may re-run submit with execute=TRUE
 ~/flowr/sleep_pipe-20150520-15-18-27-5mSd32G0

Submit it

Once, we have a flow we can submit it to the cluster using submit_flow

submit_flow(fobj, execute = TRUE)
Flow has been submitted. Track it from terminal using:
flowr status x=~/flowr/type1-20150520-15-18-46-sySOzZnE

Refer to submit_flow’s help section for more details.

Check its status

Next, you may use status to monitor the status of a flow.

flowr status x=~/flowr/runs/sleep_pipe-20150520*

|          | total| started| completed| exit_status|    status|
|:---------|-----:|-------:|---------:|-----------:|---------:|
|001.sleep |    10|      10|        10|           0| completed|
|002.tmp   |    10|      10|        10|           0| completed|
|003.merge |     1|       1|         1|           0| completed|
|004.size  |     1|       1|         1|           0| completed|

Notice, how we skipped specifying the complete path. Status would try to use the basename and show status of any folder it can match. If there are multiple matched, status would show a summary of each.

Alternatively, to check a summarized status of several flows, use the parent folder. In this case the parent folder has 3 flows, and here is the summary:

flowr status x=~/flowr/runs

Showing status of: ~/flowr/runs
|          | total| started| completed| exit_status|    status|
|:---------|-----:|-------:|---------:|-----------:|---------:|
|001.sleep |    30|      30|        10|           0|processing|
|002.tmp   |    30|      30|        10|           0|processing|
|003.merge |     3|       3|         1|           0|   pending|
|004.size  |     3|       3|         1|           0|   pending|

Refer to status’s help section for more details.

Kill it

Incase something goes wrong, one may use to kill command to terminate all the relating jobs of a single flow OR multiple flows.

kill one flow:

flowr kill_flow x=flow_wd
flowr kill x='~/flowr/runs/sleep_pipe'
found multiple wds:
  ~/flowr/runs/sleep_pipe-20150825-16-24-04-0Lv1PbpI
  ~/flowr/runs/sleep_pipe-20150825-17-47-52-5vFIkrMD
Really kill all of them ? kill again with force=TRUE

To kill multiple flow, set force=TRUE:

kill x='~/flowr/runs/sleep_pipe*' force = TRUE

Refer to kill’s help section for more details.

Re-run a flow

flowr also enables you to re-run a pipeline in case of hardware or software failures.

  • hardware failure: no change to the pipeline is required, simply rerun it: rerun x=flow_wd start_from=<intermediate step>
  • software failure: either a change to flowmat or flowdef has been made: rerun x=flow_wdmat = new_flowmat def = new_flowdef start_from=<intermediate step>

Refer to rerun’s help section for more details.

Input files

An easy and quick way to build a workflow is to create a set of two tab delimited files. First is a table with commands to run (for each step of the pipeline), while second has details regarding how the modules are stitched together. In the rest of this document we would refer to them as flow_mat and flow_def respectively (as introduced in the previous sections).

Let us read in examples of both these files to understand their structure.

ex = file.path(system.file(package = "flowr"), "pipelines")
flow_mat = as.flowmat(file.path(ex, "sleep_pipe.tsv"))
flow_def = as.flowdef(file.path(ex, "sleep_pipe.def"))

1. Flow matrix

Essentially, this is a tab delimited file with three columns:

  • samplename: A grouping column. The table is split using this column and each subset is treated as an individual flow. Thus we may have one flowmat for a series of samples, and the whole set would be submitted as a batch.
    • If all the commands are for a single sample, one can just repeat a dummy name like sample1 all throughout.
  • jobname: This corresponds to the name of the step. This should match exactly with the jobname column in flow_def table described below.
  • cmd: A shell command to run. One can get quite creative here. These could be multiple shell commands separated by a ; or &&, more on this here. Though to keep this clean you may just wrap a multi-line command into a script and just source the bash script from here.

Here is an example flow_mat for the flowr described above.

samplename jobname cmd
sample1 sleep sleep 10 && sleep 2;echo ‘hello’
sample1 sleep sleep 11 && sleep 8;echo ‘hello’
sample1 sleep sleep 11 && sleep 17;echo ‘hello’
sample1 create_tmp head -c 100000 /dev/urandom > sample1_tmp_1
sample1 create_tmp head -c 100000 /dev/urandom > sample1_tmp_2
sample1 create_tmp head -c 100000 /dev/urandom > sample1_tmp_3
sample1 merge cat sample1_tmp_1 sample1_tmp_2 sample1_tmp_3 > sample1_merged
sample1 size du -sh sample1_merged; echo ‘MY shell:’ $SHELL

2. Flow definition

It is a tab separated file, with a minimum of 4 columns:

  • jobname: Name of the step
  • sub_type: Short for submission type, refers to, how should multiple commands of this step be submitted. Possible values are serial or scatter.
  • prev_job: Short for previous job, this would be jobname of the previous job. This can be NA/./none if this is a independent/initial step, and no previous step is required for this to start.
  • dep_type: Short for dependency type, refers to the relationship of this job with the one defined in prev_job. This can take values none, gather, serial or burst.

These would be explained in detail, below.

Apart from the above described variables, several others defining the resource requirements of each step are also available. These give great amount of flexibility to the user in choosing CPU, wall time, memory and queue for each step (and are passed along to the HPCC platform).

  • cpu_reserved
  • memory_reserved
  • nodes
  • walltime
  • queue

Most cluster platforms accept these resource arguments. Essentially a file like this is used as a template, and variables defined in curly braces ( ex. {{{CPU}}} ) are filled up using the flow definition file.

Here is an example of a typical flow_def file.

jobname sub_type prev_jobs dep_type queue memory_reserved walltime cpu_reserved platform jobid
sleep scatter none none short 2000 1:00 1 local 1
create_tmp scatter sleep serial short 2000 1:00 1 local 2
merge serial create_tmp gather short 2000 1:00 1 local 3
size serial merge serial short 2000 1:00 1 local 4

Example:

Let us use an example flow, to understand submission and dependency types.

Consider three steps A, B and C, where A has 10 commands from A1 to A10, similarly B has 10 commands B1 through B10 and C has a single command, C1. Consider another step D (with D1-D3), which comes after C.

step:       A   ----> B  -----> C -----> D
# of cmds  10        10         1        3

Submission types

This refers to the sub_type column in flow definition.

Dependency types

This refers to the dep_type column in flow definition.

Relationships

Using the above submission and dependency types one can create several types of relationships between former and later jobs. Here are a few examples of relationships one may typically use.

One to One (serial)

                A1 --------> B1
                A2 --------> B1
                .. --------> ..
               A10 --------> B10
 dependency submission  dependency submission   
    none     scatter      serial     scatter
                 relationship
                  ONE-to-ONE

Relationship between steps A and B is best defined as serial. Step A (A1 through A10) is submitted as scatter. Further, \(i^th\) jobs of B depends on \(i^th\) jobs of A. i.e. B1 requires A1 to complete; B2 requires A2 and so on. Also, we note that defining dependency as serial, makes sure that B does not wait for all elements of A to complete.

Many to One (gather)

                B1 ----\ 
                B2 -----\
                ..        -----> C1
                B9 ------/
                B10-----/
 dependency submission  dependency submission   
    serial     scatter    gather     serial
                 relationship
                  MANY-to-ONE

Since C is a single command which requires all steps of B to complete, intuitively it needs to gather pieces of data generated by B. In this case dep_type would be gather and sub_type type would be serial since it is a single command.

One to Many (Burst)

                     /-----> D1
                C1 --------> D2
                     \-----> D3
 dependency submission  dependency submission   
    gather   serial       burst     scatter
                 relationship
                  ONE-to-MANY

Further, D is a set of three commands (D1-D3), which need to wait for a single process (C1) to complete. They would be submitted as scatter after waiting on C in a burst type dependency.

In essence, an example flow_def would look like as follows (with additional resource requirements not shown for brevity):

ex2def = as.flowdef(file.path(ex, "abcd.def"))
ex2mat = as.flowmat(file.path(ex, "abcd.tsv"))
kable(ex2def[, 1:4])
jobname sub_type prev_jobs dep_type
A scatter none none
B scatter A serial
C serial B gather
D scatter C burst
plot_flow(ex2def)

Cluster Support

As of now we have tested this on the following clusters:

Platform command status queue.type
LSF 7 bsub Beta lsf
LSF 9.1 bsub Stable lsf
Torque qsub Stable torque
Moab msub Stable moab
SGE qsub Beta sge
SLURM sbatch alpha slurm

For more details, refer to the configuration section