2 Basic features
2.1 Multiple processes and publication of workflow outputs
Let’s now step up a bit the complexity and write a workflow containing two processes that communicate with each other.
Add the following process to your main.nf
file.
{
process duplicate_lines "../nextflow_output/duplicate_lines"
publishDir
:
input
path my_file:
output"${my_file.simpleName}.duplicated.txt"
path :
script"""
cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
"""
}
And modify your workflow block as follows
{
workflow say_hello()
duplicate_lines( say_hello.out )
}
If you want to see how your main.nf
should look like at this stage open this hidden section
Code
.enable.dsl = 2
nextflow
// this is a comment
{
process say_hello // comments can be written anywhere
:
output"hello_world.txt"
path :
script"""
echo "This is the EBI predoc course" > hello_world.txt
"""
}
{
process duplicate_lines "../nextflow_output/duplicate_lines"
publishDir
:
input
path my_file:
output"${my_file.simpleName}.duplicated.txt"
path :
script"""
cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
"""
}
{
workflow say_hello()
duplicate_lines( say_hello.out )
}
Now run again the workflow with nextflow run main.nf
.
This time your workflow will not print anything since we omitted the view()
operator.
However, if you read the output file content with this command
cat ../nextflow_output/duplicate_lines/hello_world.duplicated.txt
You should see the following output in your terminal
This is the EBI predoc course
This is the EBI predoc course
So as you see, the content of the hello_world.txt
file that we created before has been duplicated so that it appears twice and it has been placed on a new file, ../nextflow_output/duplicate_lines/hello_world.duplicated.txt
.
Let’s analyse what happened this time:
- We created another process called
duplicate_lines
which duplicates a given file thanks to itsscript:
block containing the commandcat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
- The process
duplicate_lines
declares aninput:
block. The input block is similar to the output block that we saw before in that it can use thepath
qualifier to declare the input to be a file. However, what comes afterpath
does not need to be a real filename, it is just a variable name (note that it is not quoted). Nextflow replaces that variable with the real name of the file given in input. - In the script and output blocks we can refer to the inputs by specifying the variable
$my_file
. Note that we need to use the same name used in the input declaration.- It is possible to enclose the variable name in curly braces to demark it from other text. So
${my_file}
is equivalent to$my_file
. - We applied the operator
simpleName
to the variable$my_file
by writing${my_file.simpleName}
. This removes the path and the extension frommy_file
, so that we can use it to name our output file (if$my_file
contains the value"/some/path/hello_world.txt"
, then${my_file.simpleName}
contains onlyhello_world
).
- It is possible to enclose the variable name in curly braces to demark it from other text. So
- We used a new directive in the process
duplicate_lines
, calledpublishDir
. This specifies the folder where the output of the process should be placed at the end of the execution. This is usually done to put the final outputs of your workflow in a meaningful directory structure. In our case,publishDir "../nextflow_output/duplicate_lines"
places${my_file.simpleName}.duplicated.txt
in the folder../nextflow_output/duplicate_lines
- In the workflow block we called the process
duplicate_lines
with the channelsay_hello.out
as an argument. So the output of the processsay_hello
is used as an input for the processduplicate_lines
. Note that the number of arguments provided to a process must match the number of arguments declared in its input block.
2.2 Creating channels and specifying pipeline parameters
The workflow that we wrote works as expected but it is not very useful since we cannot specify dynamically its input files. We will now learn how to use pipeline parameters to specify an external input file for the whole workflow and how to use channel factories to feed inputs to our processes.
First, create a text file called ../nextflow_inputs/a_file.txt
containing the string "This is the file content"
mkdir ../nextflow_inputs
echo "This is the file content" > ../nextflow_inputs/a_file.txt
Modify the workflow as follows
{
workflow Channel.fromPath( params.input_file ).set{ input_ch }
duplicate_lines( input_ch )
}
If you want to see how your main.nf
should look like at this stage open this hidden section
Code
.enable.dsl = 2
nextflow
// this is a comment
{
process say_hello // comments can be written anywhere
:
output"hello_world.txt"
path :
script"""
echo "This is the EBI predoc course" > hello_world.txt
"""
}
{
process duplicate_lines "../nextflow_output/duplicate_lines"
publishDir
:
input
path my_file:
output"${my_file.simpleName}.duplicated.txt"
path :
script"""
cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
"""
}
{
workflow Channel.fromPath( params.input_file ).set{ input_ch }
duplicate_lines( input_ch )
}
Now open the file nextflow.config
and add the following
{
params = "../nextflow_inputs/a_file.txt"
input_file }
If you want to see how your nextflow.config
should look like at this stage open this hidden section
Code
// you can also put comments in nextflow.config
= "../nextflow_workdir"
workDir
{
params = "../nextflow_inputs/a_file.txt"
input_file }
Now run again the workflow with nextflow run main.nf
.
This time if you examine the folder ../nextflow_output/duplicate_lines
you will find a file called a_file.duplicated.txt
. Let’s see its content.
cat ../nextflow_output/duplicate_lines/a_file.duplicated.txt
This should print
This is the file content
This is the file content
So the content of the file that we created before, a_file.txt
was duplicated and placed in the file a_file.duplicated.txt
in the folder ../nextflow_output/duplicate_lines
.
Let’s examine what happened:
- In
nextflow.config
, we declared aparams
block. Any variable written inside this block, likeinput_file = "../nextflow_inputs/a_file.txt"
, is accessible in the workflow by writingparams.<variable_name>
, soparams.input_file
in this case. - In
main.nf
, we wroteChannel.fromPath( params.input_file )
. This uses the channel factoryChannel.fromPath
to create a new channel using the content ofparams.input_file
. TheChannel.fromPath
factory interprets its argument to be the path to a file. - After
Channel.fromPath( params.input_file )
we addedset{ input_ch }
(note the curly braces: it is a closure, we will explore later what does it mean). This assigns to the newly created channel the nameinput_ch
- We then used the channel
input_ch
as an input for the processduplicate_lines
in the statementduplicate_lines( input_ch )
. Since the channelinput_ch
contains the variableparams.input_file
, which we declared to contain the value../nextflow_inputs/a_file.txt
, this file is given in input toduplicate_lines
duplicate_lines
performed its function as before, putting its output in../nextflow_output/duplicate_lines
Instead of hard-coding parameters in nextflow.config
, it is also possible to pass them on the command line. So for example we could have omitted the params
block in nextflow.config
and run
nextflow run main.nf --input_file ../nextflow_inputs/a_file.txt
This produces the same result. Note that pipeline parameters need to be specified with two prepending dashes (--my_parameter
).
This differentiates them from command line options for Nextflow itself, such as -dry-run
, which use a single dash.
2.3 Multiple input files
We are now able to process a single file with our pipeline, but what if we have many files that we need to process in the same way? One approach for this is to use a glob pattern as an input.
Let’s create a bunch of files to use in input
echo "This is content of file 1" > ../nextflow_inputs/set_of_files_1.txt
echo "This is content of file 2" > ../nextflow_inputs/set_of_files_2.txt
echo "This is content of file 3" > ../nextflow_inputs/set_of_files_3.txt
echo "This is content of file 4" > ../nextflow_inputs/set_of_files_4.txt
echo "This is content of file 5" > ../nextflow_inputs/set_of_files_5.txt
echo "This is content of file 6" > ../nextflow_inputs/set_of_files_6.txt
echo "This is content of file 7" > ../nextflow_inputs/set_of_files_7.txt
echo "This is content of file 8" > ../nextflow_inputs/set_of_files_8.txt
Now we just need to modify input_file
in nextflow.config
{
params = "../nextflow_inputs/set_of_files_*.txt"
input_file }
If you want to see how your nextflow.config
should look like at this stage open this hidden section
Code
// you can also put comments in nextflow.config
= "../nextflow_workdir"
workDir
{
params = "../nextflow_inputs/set_of_files_*.txt"
input_file }
Now run again the workflow with nextflow run main.nf
.
This time if you examine the output folder with ls ../nextflow_output/duplicate_lines
you will find a set of files
a_file.duplicated.txt
set_of_files_1.duplicated.txt
set_of_files_2.duplicated.txt
set_of_files_3.duplicated.txt
set_of_files_4.duplicated.txt
set_of_files_5.duplicated.txt
set_of_files_6.duplicated.txt
set_of_files_7.duplicated.txt
set_of_files_8.duplicated.txt
Like before, each of them will contain the duplicated version of the original files.
Let’s examine what happened:
- We changed
params.input_ch
to contain a glob pattern. This is expanded by Nextflow to yield a list of matching files. - The matching files are fed one by one via the channel
input_ch
to the processduplicate_lines
- The process
duplicate_lines
operates independently but in parallel on all the inputs, producing the output files. Each task is executed in its own private directory.
2.4 Using a sample sheet and value channels
Using glob patterns for specifying samples is useful and quick, but what if we want to specify input files that live in many different directories with very different names? And if we want some files to be processed in pairs with other specific files, or with specific parameters? For such use cases, a sample sheet is the easiest solution and it is the recommended way to specify workflow inputs.
A sample sheet is just a CSV file with one row per file to be processed, and with each column specifying either a file or a parameter.
Create a sample sheet called ../nextflow_inputs/samplesheet.csv
by running the following command.
echo "sample,content" > ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_1.txt,csv_content_1" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_2.txt,csv_content_2" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_3.txt,csv_content_3" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_4.txt,csv_content_4" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_5.txt,csv_content_5" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_6.txt,csv_content_6" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_7.txt,csv_content_7" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_8.txt,csv_content_8" >> ../nextflow_inputs/samplesheet.csv
Now create a process called append_to_file
{
process append_to_file "../nextflow_output/append_to_file"
publishDir
:
input
path my_file
val my_val:
output"${my_file.simpleName}.appended.txt"
path :
script"""
cat $my_file > ${my_file.simpleName}.appended.txt
echo "$my_val" >> ${my_file.simpleName}.appended.txt
"""
}
And modify the workflow as follows
{
workflow Channel.fromPath( params.samplesheet )
.splitCsv( header: true )
.set{ input_ch }
.map{ it["sample"] }.set{ sample_ch }
input_ch.map{ it["content"] }.set{ content_ch }
input_ch
duplicate_lines( sample_ch )
append_to_file( duplicate_lines.out, content_ch )
}
If you want to see how your main.nf
file should look like at this stage open this hidden section
Code
.enable.dsl = 2
nextflow
// this is a comment
{
process say_hello // comments can be written anywhere
:
output"hello_world.txt"
path :
script"""
echo "This is the EBI predoc course" > hello_world.txt
"""
}
{
process duplicate_lines "../nextflow_output/duplicate_lines"
publishDir
:
input
path my_file:
output"${my_file.simpleName}.duplicated.txt"
path :
script"""
cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
"""
}
{
process append_to_file "../nextflow_output/append_to_file"
publishDir
:
input
path my_file
val my_val:
output"${my_file.simpleName}.appended.txt"
path :
script"""
cat $my_file > ${my_file.simpleName}.appended.txt
echo "$my_val" >> ${my_file.simpleName}.appended.txt
"""
}
{
workflow Channel.fromPath( params.samplesheet )
.splitCsv( header: true )
.set{ input_ch }
.map{ it["sample"] }.set{ sample_ch }
input_ch.map{ it["content"] }.set{ content_ch }
input_ch
duplicate_lines( sample_ch )
append_to_file( duplicate_lines.out, content_ch )
}
Now we need to modify the params
block in your nextflow.config
{
params = "../nextflow_inputs/samplesheet.csv"
samplesheet }
If you want to see how your nextflow.config
should look like at this stage open this hidden section
Code
// you can also put comments in nextflow.config
= "../nextflow_workdir"
workDir
{
params = "../nextflow_inputs/samplesheet.csv"
samplesheet }
Now run again the workflow with nextflow run main.nf
.
This time if you examine the output folder with ls ../nextflow_output/append_to_file
you will find a set of files
set_of_files_1.append_to_file.txt
set_of_files_2.append_to_file.txt
set_of_files_3.append_to_file.txt
set_of_files_4.append_to_file.txt
set_of_files_5.append_to_file.txt
set_of_files_6.append_to_file.txt
set_of_files_7.append_to_file.txt
set_of_files_8.append_to_file.txt
Taking as an example the first one, set_of_files_1.append_to_file.txt
we would expect it to contain
This is content of file 1
This is content of file 1
csv_content_1
However, most probably it will contain something like
This is content of file 1
This is content of file 1
csv_content_3
So the first two lines have the correct number but the last line shows the wrong element! What’s more, the element shown on the last line can be different on each run (try!). We said before that channels are guaranteed to emit elements in the same order that they are received, so we should have the correct matches between files and values. What happened?
What happened is that channels are indeed guaranteed to preserve order, but processes and some operators are not because of their asynchronous and parallel nature. In our case, the process duplicate_lines
receives the files in the correct order, but it is not guaranteed to emit them in the same order. This is because all the files are processed in parallel.
So for example, set_of_files_5.append_to_file.txt
could be processed and emitted some milliseconds earlier than set_of_files_1.append_to_file.txt
, altering the order.
A better approach when we want multiple channels to be used by one process with some predictable matching is to join channels using matching keys. We will explore it in the next section. For now, let’s examine what happened in this version of our workflow:
- We created the process
append_to_file
, which takes in input a file (path my_file
) and a value (val my_val
). This process copies the content ofmy_file
to${my_file.simpleName}.append_to_file.txt
and then appends to the newly created file the content ofmy_val
- We defined the parameter
samplesheet
innextflow.config
- We modified the workflow so that we use
params.samplesheet
to create a channel with the channel factoryChannel.fromPath
- We applied the operator
splitCsv
to this channel containing the sample sheet, using the optionheader: true
. This operator reads the file contained in the channel assuming it to be a CSV file with a header line. Then it produces in output another channel containing, for each row in the sample sheet, a groovy map (what in Python would be called a dictionary). This map contains a key for each element in the CSV header, and a value corresponding to the value of that column in the current line of the CSV file. - We apply the
map
operator (map{ it["sample"] }
) to the resulting channel- The map operator is follwed by a closure (
{ it["sample"] }
) - You can think of a closure as an unnamed function (like a lambda function in Python)
- The
map
operator calls this unnamed function for each element in the channel and creates another channel containing the respective function outputs - Closures understand the implicit variable
it
, which represents whatever input is given to the function - If an explicit return statement is not present in the closure, the last evaluated expression is returned by the closure (
it["sample"]
in this case)
- The map operator is follwed by a closure (
- Since
splitCsv
created a dictionary for each item in the sample sheet,map{ it["sample"] }
replaces that dictionary (it
in the closure) with the content of the value contained in it under the key"sample"
input_ch
is used again ininput_ch.map{ it["content"] }.set{ content_ch }
to extract the value of the columncontent
for each sample in the sample sheet- Channels can be used many times, and each time all the files contained in them are processed. So both
input_ch.map{ it["content"] }.set{ content_ch }
andinput_ch.map{ it["sample"] }.set{ sample_ch }
process every element ininput_ch
- Channels can be used many times, and each time all the files contained in them are processed. So both
- We created the channels
sample_ch
andcontent_ch
using theset
operator. They contain respectively the sample files and thecontent
values related to them.- The order of processing in a channel is guaranteed, so we can be sure that each sample will be paired with the correct
content
value
- The order of processing in a channel is guaranteed, so we can be sure that each sample will be paired with the correct
- We fed the channel
sample_ch
to the processduplicate_lines
, and we fed the output of the processduplicate_lines
to the processappend_to_file
together with the values in the channelcontent_ch
- This is the step that messed up the pairing of the channels, since the process
duplicate_lines
is not guaranteed to produce in output elements in the same order in which it receives them in input
- This is the step that messed up the pairing of the channels, since the process
append_to_file
appended the value ofcontent_ch
to the output ofduplicate_lines
, creating the final result
Note that we introduced a new qualifier: val
. The val
qualifier specifies a value, differently from the path
qualifier that specifies a file. If I write an expression like val variable_name
, then the variable variable_name
can contain something like a string or a number, depending on what is fed to the process input.
It is also possible to manually create value channels using the channel factory Channel.value
.
Take as an example the following
{
workflowChannel.value(1, 2, 3).view()
}
This would produce a channel containing in order the values 1, 2, and 3. These would then be printed one by one to the terminal by the view operator.
2.5 The tuple
input qualifier and joining channels
In the previous section, we saw how processes are not guaranteed to emit outputs in the same order in which inputs are received.
This caused unexpected input pairs to occur in downstream processes of our workflow.
Here we will see how we can enforce the correct pairing of different channels by joining them using matching keys.
We will also introduce the tuple
input qualifier.
Let’s first create a sample sheet very similar to the one we used before, but with the added column id
to uniquely identify each line.
echo "id,sample,content" > ../nextflow_inputs/samplesheet.csv
echo "1,$(pwd)/../nextflow_inputs/set_of_files_1.txt,csv_content_1" >> ../nextflow_inputs/samplesheet.csv
echo "2,$(pwd)/../nextflow_inputs/set_of_files_2.txt,csv_content_2" >> ../nextflow_inputs/samplesheet.csv
echo "3,$(pwd)/../nextflow_inputs/set_of_files_3.txt,csv_content_3" >> ../nextflow_inputs/samplesheet.csv
echo "4,$(pwd)/../nextflow_inputs/set_of_files_4.txt,csv_content_4" >> ../nextflow_inputs/samplesheet.csv
echo "5,$(pwd)/../nextflow_inputs/set_of_files_5.txt,csv_content_5" >> ../nextflow_inputs/samplesheet.csv
echo "6,$(pwd)/../nextflow_inputs/set_of_files_6.txt,csv_content_6" >> ../nextflow_inputs/samplesheet.csv
echo "7,$(pwd)/../nextflow_inputs/set_of_files_7.txt,csv_content_7" >> ../nextflow_inputs/samplesheet.csv
echo "8,$(pwd)/../nextflow_inputs/set_of_files_8.txt,csv_content_8" >> ../nextflow_inputs/samplesheet.csv
Let’s now modify the workflow definition as follows
{
workflow Channel.fromPath( params.samplesheet )
.splitCsv( header: true )
.set{ input_ch }
.map{ [ it["id"], it["sample"] ] }.set{ sample_ch }
input_ch
duplicate_lines( sample_ch )
.map{ [ it["id"], it["content"] ] }
input_ch.join(duplicate_lines.out, by: 0)
.set{ append_to_file_in_ch }
append_to_file( append_to_file_in_ch )
}
And let’s modify the definition of the processes duplicate_lines
and append_to_file
as follows
{
process duplicate_lines "../nextflow_output/duplicate_lines"
publishDir
:
inputtuple(
val(id),
path(my_file)
)
:
outputtuple(
val(id),
path("${my_file.simpleName}.duplicated.txt")
)
:
script"""
cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
"""
}
{
process append_to_file "../nextflow_output/append_to_file"
publishDir
:
inputtuple(
val(id),
val(my_val),
path(my_file)
)
:
output"${my_file.simpleName}.appended.txt"
path :
script"""
cat $my_file > ${my_file.simpleName}.appended.txt
echo "$my_val" >> ${my_file.simpleName}.appended.txt
"""
}
If you want to see how your main.nf
file should look like at this stage open this hidden section
Code
.enable.dsl = 2
nextflow
{
process duplicate_lines "../nextflow_output/duplicate_lines"
publishDir
:
inputtuple(
val(id),
path(my_file)
)
:
outputtuple(
val(id),
path("${my_file.simpleName}.duplicated.txt")
)
:
script"""
cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
"""
}
{
process append_to_file "../nextflow_output/append_to_file"
publishDir
:
inputtuple(
val(id),
val(my_val),
path(my_file)
)
:
output"${my_file.simpleName}.appended.txt"
path :
script"""
cat $my_file > ${my_file.simpleName}.appended.txt
echo "$my_val" >> ${my_file.simpleName}.appended.txt
"""
}
{
workflow Channel.fromPath( params.samplesheet )
.splitCsv( header: true )
.set{ input_ch }
.map{ [ it["id"], it["sample"] ] }.set{ sample_ch }
input_ch
duplicate_lines( sample_ch )
.map{ [ it["id"], it["content"] ] }
input_ch.join(duplicate_lines.out, by: 0)
.set{ append_to_file_in_ch }
append_to_file( append_to_file_in_ch )
}
Now run again the workflow with nextflow run main.nf
.
As before, if you examine the output folder with ls ../nextflow_output/append_to_file
you will find a set of files
set_of_files_1.append_to_file.txt
set_of_files_2.append_to_file.txt
set_of_files_3.append_to_file.txt
set_of_files_4.append_to_file.txt
set_of_files_5.append_to_file.txt
set_of_files_6.append_to_file.txt
set_of_files_7.append_to_file.txt
set_of_files_8.append_to_file.txt
This time, if you check their content you will see that the last line is correctly paired with the first two.
Here, as an example, the content of the file set_of_files_1.append_to_file.txt
is shown.
This is content of file 1
This is content of file 1
csv_content_1
Let’s examine how we achieved this result:
- We modified the sample sheet so to include a unique ID for each line
- We modified all the processes to expect in input not a single element but tuples of two or three elements in a single channel
- A tuple is the non-modifiable equivalent of a list
- When a channel emits lists of elements, the input qualifier
tuple
should be used - The input qualifiers in the process definition for the tuple elements must match the order and cardinality of the lists emitted by the channel used as input for the process
- The input channel for the process
duplicate_lines
was modified so as to emit lists of two elements: the content of theid
column and the content of thesample
column (input_ch.map{ [ it["id"], it["sample"] ] }
) - For the process
duplicate_lines
, the input block has been modified to expect tuples of two elements- The first element is expected to be of type
val
and it is not used in the process (but it is passed in the output unchanged) - The second element is expected to be of type
path
and will contain the file to be duplicated
- The first element is expected to be of type
- The output of the process
duplicate_lines
has also been modified to be of typetuple
- This tuple produced in output will contain a first element of type
val
, consisting of the sameid
variable received as input - The second element of the tuple is the output file produced by the process, with a qualifier of type
path
- This tuple produced in output will contain a first element of type
- The
map
operator is used again of the the channelinput_ch
to extract the content of the columnsid
andcontent
, as lists of two elements (input_ch.map{ [ it["id"], it["content"] ] }
) - The channel operator
join
is used to join together the output of the processduplicate_lines
with the channel resulting from the previous operation (join(duplicate_lines.out, by: 0)
)- The
join
operator joins two or more channels together using a matching key, whose position is specified with theby
keyword - In our case, we used
by: 0
as an argument to thejoin
operator, meaning that we want to match the keys in the first position in the respective channels - The items of the two channels are matched in such a way that items with the same value in the first element of the lists (corresponding to the content of the column
id
of the sample sheet in both cases) are joined together - The resulting channel after the joining operation contains lists of three elements: the matching key itself, the remainder of the list emitted by the first channel after removal of the matching key (one element), and finally the remainder of the list emitted by the second channel after removal of the matching key (one element)
- The resulting channel is named
append_to_file_in_ch
(set{ append_to_file_in_ch }
)
- The
- The channel
append_to_file_in_ch
is used as an input to the processappend_to_file
- The process
append_to_file
declares in input a tuple of three items, matching the cardinality of the items produced by the channelappend_to_file_in_ch
- The output of the process
duplicate_lines
and the content of the columncontent
in the sample sheet will now be matched correctly since they were joined using the matching keyid
- The process
2.6 Software dependencies
We can do many things with just shell scripts, but for real-world scenarios, we would probably need to use some additional library or tool. Software dependencies for a process can be specified in Nextflow in two main ways: using software containers or virtual environments.
A software container is like a shipping container: it is a monolithic block of code and data that contains everything needed to run a specific application, including the operating system (but not the kernel, the host kernel is used). The most popular container engine (the software that is used to actually run a container) is Docker. In high-performance clusters, for security reasons, Docker is often discouraged or forbidden. A popular alternative container engine used in bioinformatics is Singularity. The good news is that Singularity is also able to run containers created with Docker, so we can consider them equivalent for our purposes.
For popular tools, a container has most probably already been created by someone and put in a public container registry like DockerHub. In this case, we can provide Nextflow with just a link to the container and it will do the rest. In case we wanted to use a custom container instead, we would have needed to write an appropriate description file for it (called a “Dockerfile”), use it to actually build the container, and then upload the container to DockerHub. This last use case will not be treated in this workshop, and we will limit ourselves to the use of pre-built containers.
On the other hand, virtual environments are a more lightweight and modifiable alternative to containers. They rely on the host operating system but they allow the definition of additional software. Nextflow is compatible with Conda environments, and can also use the faster Mamba implementation of Conda. It is worth noting that while Conda and Mamba are typically thought of as package managers for Python, they are actually able to install also R packages, the R interpreter itself, stand-alone tools, and even GPU drivers.
Differently from containers, environments do not need to be pre-built to be used by Nextflow. It is possible to just define the software that we need and Nextflow will take care to build an appropriate environment with it.
Let’s now write a workflow that makes use of virtual environments.
First download a CSV file containing a set of countries, their population, and their area by running the following
wget https://raw.githubusercontent.com/saulpierotti-ebi/saulpierotti-ebi.github.io/main/assets/population_area_by_country.csv
Now modify your main.nf
file and make it look like this
.enable.dsl = 2
nextflow
{
process calculate_population_density "python=3.10.6 pandas=1.5.0"
conda
:
input
path csv_file:
output"${csv_file.simpleName}.with_density.csv"
path :
script"""
#!/usr/bin/env Python
import pandas
df = pandas.read_csv("${csv_file}")
df["density"] = df["population"] / df["area"]
df.to_csv("${csv_file.simpleName}.with_density.csv")
"""
}
{
process plot_population_by_area "../nextflow_output/plots"
publishDir
"r-base=4.1.3 r-tidyverse=1.3.2"
conda
:
input
path csv_file:
output"${csv_file.simpleName}.pdf"
path :
script"""
#!/usr/bin/env Rscript
library("tidyverse")
df <- read_csv("${csv_file}")
ggplot(df, aes(x=area, y=population, label=country, size=density)) +
geom_text()
ggsave("${csv_file.simpleName}.pdf")
"""
}
{
workflow = Channel.fromPath( params.input )
input_ch calculate_population_density( input_ch )
plot_population_by_area( calculate_population_density.out )
}
Modify your nextflow.config
by adding the following
{
conda // use the faster mamba solver instead of conda
= true
useMamba }
If you want to see how your nextflow.config
should look like at this stage open this hidden section
Code
// you can also put comments in nextflow.config
= "../nextflow_workdir"
workDir
{
params = "../nextflow_inputs/samplesheet.csv"
samplesheet }
{
conda // use the faster mamba solver instead of conda
= true
useMamba }
Now run the workflow as follows
nextflow run main.nf --input population_area_by_country.csv
This time the workflow will take significantly longer to run since Nextflow needs to create two mamba environments.
However, this will affect only the first run of the workflow, since once an environment is created it can be used multiple times.
Now open the file ../nextflow_output/plots/population_area_by_country.pdf
.
It will contain a plot of the population of different countries versus their area, with the size proportional to their population density.
Let’s explore what happened:
- We specified to Nextflow to use Mamba instead than Conda when using the
conda
directive (withconda{use.mamba = true}
innextflow.config
) - As before, we used a command-line flag to pass an input file to the workflow (
--input population_area_by_country.csv
) - We used
population_area_by_country.csv
as an input for the processcalculate_population_density
, which uses Python code to calculate the population density of a country from its area and total population - The output of
calculate_population_density
is fed to the processplot_population_by_area
, which uses R code to produce the plot
The conda
directive used in these processes defines the software dependencies needed (in this case Python and pandas for the first process and R and tidyverse for the second process).
Software versions can be specified with an equal sign after the software name.
Packages are sourced from the repository Anaconda.
Another thing to note here: the script
directive normally interprets the code written in it as bash code, but if a sh-bang (!#
) is included in the first line of the script, then the software specified in the sh-bang is used to run the script section (so !#/usr/bin/env Python
tells to Nextflow to interpret the script section as Python code).
A different approach to reach the same goal is to write a separate Python or R script, place it in the bin
folder, and then execute it with a bash command in the script directive.
So for example you can create a file called bin/calculate_population_density.py
with the following content
import pandas
import sys
= pandas.read_csv(sys.argv[1])
df "density"] = df["population"] / df["area"]
df[2]) df.to_csv(sys.argv[
Make it executable with
chmod +x bin/calculate_population_density.py
And change the calculate_population_density
process as follows
{
process calculate_population_density "python=3.10.6 pandas=1.5.0"
conda
:
input
path csv_file:
output"${csv_file.simpleName}.with_density.csv"
path :
script"""
./calculate_population_density.py $csv_file ${csv_file.simpleName}.with_density.csv
"""
}
Running the following will yield the same result as before
nextflow run main.nf --input population_area_by_country.csv
If we wanted to use containers instead than virtual environments for the process calculate_population_density
to specify our software dependencies, we could have just replaced the line
"python=3.10.6 pandas=1.5.0" conda
with the line
"docker://amancevice/pandas" container
And then we would need to either set docker{enabled = true}
or singularity{enabled = true}
on our nextflow.config
file, or we could specify the flag -with-docker
or -with-singularity
from the command line.
A similar approach would work also for the process plot_population_by_area
, using for example the container docker:/rocker/tidyverse
.
Note that containers in the format docker:/username/container
are pulled by Nextflow from DockerHub.
They can be used either with Docker or Singularity as a container engine.
2.7 Resource allocation
Resource allocation is very useful when our workflow is run on a high-performance cluster or in the cloud. It is possible to specify the amount of RAM, the number of CPU cores, and the running time required by a process. These values are then used by Nextflow to reserve appropriate resources, such as for submitting a job to the cluster with the desired requirements.
Since we do not have a cluster environment available for this workshop, we will not try to implement an actual workflow using resource requirements (we could specify them also for a local workflow, but we would not see any effect).
An example process specifying resource requirements looks like this
{
process calculate_population_density // this can be in GB, MB, KB, TB, ...
"2 GB"
memory // this can be in second(s), minute(s), hour(s), day(s)
// it is also possible to use abbreviations such as s, m, h, d
"5 days"
time // just an integer (note that this is not a string)
4
cpus
:
input
path csv_file:
output"${csv_file.simpleName}.with_density.csv"
path :
script"""
./calculate_population_density.py $csv_file ${csv_file.simpleName}.with_density.csv
"""
}
2.8 The resume feature
One of the most powerful aspects of Nextflow is the resume feature.
This feature can be activated by adding the following to the nextflow.config
file
= true resume
Alternatively, it is also possible to run the Nextflow command with the -resume
flag.
We saw before that each task is executed by Nextflow in its own directory with a long alphanumeric name. This name is not random, but it is a 128-bit hash number. This number is calculated from the following values:
- Input files
- Command line string
- Container ID
- Conda environment
- Environment modules
- Any executed scripts in the bin directory
So, any time one of such features of a process changes, the hash value and hence the working directory changes.
If a working directory with the same hash value as that of the task to be executed is found by Nextflow, and this folder contains all the required outputs declared in the output
block of the process, and it contains a file called .exitcode
containing a value of zero (this is created automatically by Nextflow to save the exit code of a task execution), then the task execution is skipped and the output files present in the directory are returned as if they were produced by the current task.
This feature allows resuming the execution of a previous failed run of a workflow from the last successful task, potentially saving a lot of computing time. A more in-depth explanation of the resume feature is available here.
To test the resume feature by ourselves, let’s modify our main.nf
file as follows
.enable.dsl = 2
nextflow
{
process dumb_process "../nextflow_output"
publishDir
:
output"out.txt"
path :
script"""
sleep 120
echo "Success!" > out.txt
"""
}
{
workflow dumb_process()
}
Now you can run the workflow with nextflow run main.nf
.
Our workflow just runs the process dumb_process
, which does not take any input and declares a single output file, out.txt
.
This execution will take 2 minutes since the script block of the process dumb_process
contains the command sleep 120
, which just waits for 2 minutes (120 seconds), before writing the line “Success!” to the file out.txt
.
After the execution is complete, we can see that the line “Success!” is present in the output file by running
cat ../nextflow_output/out.txt
Now, to test the resume feature run again the workflow
nextflow run main.nf -resume
In this case, the execution will be much quicker since process_dumb
is not actually executed.
Instead, the cached results obtained before are returned.
2.9 nf-core pipelines
Many of the workflows that we need in our work as bioinformaticians are always the same. Mapping sequencing reads to a reference genome and then performing variant calling is an example of this.
For such common use cases, very efficient Nextflow pipelines are already available, so there is no need to reinvent the wheel.
Community-curated pipelines can be found on the nf-core repository.
So for example for mapping reads to a reference and then performing variant calling we would use the nf-core/sarek
pipeline.
These pipelines follow a standardised format where inputs are declared by the user in a sample sheet.
You can read each pipeline’s documentation in nf-core to learn more about the parameters available and the expected format of the sample sheet.
I cannot make you run an actual nf-core pipeline now since there is not a simple and self-contained pipeline that does not require additional data like sequencing files to be run. However, once you have some files to process and you wrote an appropriate sample sheet, running the pipeline is very easy, for example:
nextflow run nf-core/sarek
Note that Nextflow is able to run workflows from GitHub, and nf-core/sarek
in the command above is just a pointer to this GitHub repository.
Nextflow takes care of cloning the repository and running the main.nf
file.
If you want to actually test running a pipeline written by someone else, run the following hello world pipeline:
nextflow run nextflow-io/hello
This will just print “Hello World” in different languages to the terminal. You can explore the code for this workflow here.
2.10 Labels
The label
directive is used in Nextflow to tag processes to which we want to apply specific configurations.
So for example going back to the process calculate_population_density
{
process calculate_population_density "python=3.10.6 pandas=1.5.0"
conda
:
input
path csv_file:
output"${csv_file.simpleName}.with_density.csv"
path :
script"""
./calculate_population_density.py $csv_file ${csv_file.simpleName}.with_density.csv
"""
}
Instead of hard-coding the conda
directive into the process definition, we could have done as follows
{
process calculate_population_density "pandas"
label
:
input
path csv_file:
output"${csv_file.simpleName}.with_density.csv"
path :
script"""
./calculate_population_density.py $csv_file ${csv_file.simpleName}.with_density.csv
"""
}
And then in the nextflow.config
we could have written
:pandas {
withLabel= "python=3.10.6 pandas=1.5.0"
conda }
This may seem of little benefit for a single process, but if we have many processes with similar requirements, using the label
directive allows us to specify such requirements only once and in a central location.
So if for example we have several processes that need Python and pandas as software requirements, and we specified the requirements using the label directive, we can change the version of pandas used just by modifying what we wrote in the nextflow.config
file, instead than having to modify the definition of many processes.
Note that the label
directive can be used to specify many different configurations, not only software requirements.
For example, we could use it also for memory or time requirements.