2 Basic features

2.1 Multiple processes and publication of workflow outputs

Let’s now step up a bit the complexity and write a workflow containing two processes that communicate with each other. Add the following process to your main.nf file.

process duplicate_lines {
    publishDir "../nextflow_output/duplicate_lines"

    input:
        path my_file
    output:
        path "${my_file.simpleName}.duplicated.txt"
    script:
        """
        cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
        """
}

And modify your workflow block as follows

workflow {
    say_hello()
    duplicate_lines( say_hello.out )
}

If you want to see how your main.nf should look like at this stage open this hidden section

Code
nextflow.enable.dsl = 2

// this is a comment
process say_hello {
    // comments can be written anywhere
    output:
        path "hello_world.txt"
    script:
        """
        echo "This is the EBI predoc course" > hello_world.txt
        """
}

process duplicate_lines {
    publishDir "../nextflow_output/duplicate_lines"

    input:
        path my_file
    output:
        path "${my_file.simpleName}.duplicated.txt"
    script:
        """
        cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
        """
}

workflow {
    say_hello()
    duplicate_lines( say_hello.out )
}

Now run again the workflow with nextflow run main.nf. This time your workflow will not print anything since we omitted the view() operator. However, if you read the output file content with this command

cat ../nextflow_output/duplicate_lines/hello_world.duplicated.txt

You should see the following output in your terminal

This is the EBI predoc course
This is the EBI predoc course

So as you see, the content of the hello_world.txt file that we created before has been duplicated so that it appears twice and it has been placed on a new file, ../nextflow_output/duplicate_lines/hello_world.duplicated.txt.

Let’s analyse what happened this time:

  • We created another process called duplicate_lines which duplicates a given file thanks to its script: block containing the command cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
  • The process duplicate_lines declares an input: block. The input block is similar to the output block that we saw before in that it can use the path qualifier to declare the input to be a file. However, what comes after path does not need to be a real filename, it is just a variable name (note that it is not quoted). Nextflow replaces that variable with the real name of the file given in input.
  • In the script and output blocks we can refer to the inputs by specifying the variable $my_file. Note that we need to use the same name used in the input declaration.
    • It is possible to enclose the variable name in curly braces to demark it from other text. So ${my_file} is equivalent to $my_file.
    • We applied the operator simpleName to the variable $my_file by writing ${my_file.simpleName}. This removes the path and the extension from my_file, so that we can use it to name our output file (if $my_file contains the value "/some/path/hello_world.txt", then ${my_file.simpleName} contains only hello_world).
  • We used a new directive in the process duplicate_lines, called publishDir. This specifies the folder where the output of the process should be placed at the end of the execution. This is usually done to put the final outputs of your workflow in a meaningful directory structure. In our case, publishDir "../nextflow_output/duplicate_lines" places ${my_file.simpleName}.duplicated.txt in the folder ../nextflow_output/duplicate_lines
  • In the workflow block we called the process duplicate_lines with the channel say_hello.out as an argument. So the output of the process say_hello is used as an input for the process duplicate_lines. Note that the number of arguments provided to a process must match the number of arguments declared in its input block.

2.2 Creating channels and specifying pipeline parameters

The workflow that we wrote works as expected but it is not very useful since we cannot specify dynamically its input files. We will now learn how to use pipeline parameters to specify an external input file for the whole workflow and how to use channel factories to feed inputs to our processes.

First, create a text file called ../nextflow_inputs/a_file.txt containing the string "This is the file content"

mkdir ../nextflow_inputs
echo "This is the file content" > ../nextflow_inputs/a_file.txt

Modify the workflow as follows

workflow {
    Channel.fromPath( params.input_file ).set{ input_ch }
    duplicate_lines( input_ch )
}

If you want to see how your main.nf should look like at this stage open this hidden section

Code
nextflow.enable.dsl = 2

// this is a comment
process say_hello {
    // comments can be written anywhere
    output:
        path "hello_world.txt"
    script:
        """
        echo "This is the EBI predoc course" > hello_world.txt
        """
}

process duplicate_lines {
    publishDir "../nextflow_output/duplicate_lines"

    input:
        path my_file
    output:
        path "${my_file.simpleName}.duplicated.txt"
    script:
        """
        cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
        """
}

workflow {
    Channel.fromPath( params.input_file ).set{ input_ch }
    duplicate_lines( input_ch )
}

Now open the file nextflow.config and add the following

params {
  input_file = "../nextflow_inputs/a_file.txt"
}

If you want to see how your nextflow.config should look like at this stage open this hidden section

Code
// you can also put comments in nextflow.config
workDir = "../nextflow_workdir"

params {
  input_file = "../nextflow_inputs/a_file.txt"
}

Now run again the workflow with nextflow run main.nf. This time if you examine the folder ../nextflow_output/duplicate_lines you will find a file called a_file.duplicated.txt. Let’s see its content.

cat ../nextflow_output/duplicate_lines/a_file.duplicated.txt

This should print

This is the file content
This is the file content

So the content of the file that we created before, a_file.txt was duplicated and placed in the file a_file.duplicated.txt in the folder ../nextflow_output/duplicate_lines. Let’s examine what happened:

  • In nextflow.config, we declared a params block. Any variable written inside this block, like input_file = "../nextflow_inputs/a_file.txt", is accessible in the workflow by writing params.<variable_name>, so params.input_file in this case.
  • In main.nf, we wrote Channel.fromPath( params.input_file ). This uses the channel factory Channel.fromPath to create a new channel using the content of params.input_file. The Channel.fromPath factory interprets its argument to be the path to a file.
  • After Channel.fromPath( params.input_file ) we added set{ input_ch } (note the curly braces: it is a closure, we will explore later what does it mean). This assigns to the newly created channel the name input_ch
  • We then used the channel input_ch as an input for the process duplicate_lines in the statement duplicate_lines( input_ch ). Since the channel input_ch contains the variable params.input_file, which we declared to contain the value ../nextflow_inputs/a_file.txt, this file is given in input to duplicate_lines
  • duplicate_lines performed its function as before, putting its output in ../nextflow_output/duplicate_lines

Instead of hard-coding parameters in nextflow.config, it is also possible to pass them on the command line. So for example we could have omitted the params block in nextflow.config and run

nextflow run main.nf --input_file ../nextflow_inputs/a_file.txt

This produces the same result. Note that pipeline parameters need to be specified with two prepending dashes (--my_parameter). This differentiates them from command line options for Nextflow itself, such as -dry-run, which use a single dash.

2.3 Multiple input files

We are now able to process a single file with our pipeline, but what if we have many files that we need to process in the same way? One approach for this is to use a glob pattern as an input.

Let’s create a bunch of files to use in input

echo "This is content of file 1" > ../nextflow_inputs/set_of_files_1.txt
echo "This is content of file 2" > ../nextflow_inputs/set_of_files_2.txt
echo "This is content of file 3" > ../nextflow_inputs/set_of_files_3.txt
echo "This is content of file 4" > ../nextflow_inputs/set_of_files_4.txt
echo "This is content of file 5" > ../nextflow_inputs/set_of_files_5.txt
echo "This is content of file 6" > ../nextflow_inputs/set_of_files_6.txt
echo "This is content of file 7" > ../nextflow_inputs/set_of_files_7.txt
echo "This is content of file 8" > ../nextflow_inputs/set_of_files_8.txt

Now we just need to modify input_file in nextflow.config

params {
  input_file = "../nextflow_inputs/set_of_files_*.txt"
}

If you want to see how your nextflow.config should look like at this stage open this hidden section

Code
// you can also put comments in nextflow.config
workDir = "../nextflow_workdir"

params {
  input_file = "../nextflow_inputs/set_of_files_*.txt"
}

Now run again the workflow with nextflow run main.nf. This time if you examine the output folder with ls ../nextflow_output/duplicate_lines you will find a set of files

a_file.duplicated.txt
set_of_files_1.duplicated.txt
set_of_files_2.duplicated.txt
set_of_files_3.duplicated.txt
set_of_files_4.duplicated.txt
set_of_files_5.duplicated.txt
set_of_files_6.duplicated.txt
set_of_files_7.duplicated.txt
set_of_files_8.duplicated.txt

Like before, each of them will contain the duplicated version of the original files.

Let’s examine what happened:

  • We changed params.input_ch to contain a glob pattern. This is expanded by Nextflow to yield a list of matching files.
  • The matching files are fed one by one via the channel input_ch to the process duplicate_lines
  • The process duplicate_lines operates independently but in parallel on all the inputs, producing the output files. Each task is executed in its own private directory.

2.4 Using a sample sheet and value channels

Using glob patterns for specifying samples is useful and quick, but what if we want to specify input files that live in many different directories with very different names? And if we want some files to be processed in pairs with other specific files, or with specific parameters? For such use cases, a sample sheet is the easiest solution and it is the recommended way to specify workflow inputs.

A sample sheet is just a CSV file with one row per file to be processed, and with each column specifying either a file or a parameter. Create a sample sheet called ../nextflow_inputs/samplesheet.csv by running the following command.

echo "sample,content" > ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_1.txt,csv_content_1" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_2.txt,csv_content_2" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_3.txt,csv_content_3" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_4.txt,csv_content_4" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_5.txt,csv_content_5" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_6.txt,csv_content_6" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_7.txt,csv_content_7" >> ../nextflow_inputs/samplesheet.csv
echo "$(pwd)/../nextflow_inputs/set_of_files_8.txt,csv_content_8" >> ../nextflow_inputs/samplesheet.csv

Now create a process called append_to_file

process append_to_file {
    publishDir "../nextflow_output/append_to_file"

    input:
        path my_file
        val my_val
    output:
        path "${my_file.simpleName}.appended.txt"
    script:
        """
        cat $my_file > ${my_file.simpleName}.appended.txt
        echo "$my_val" >> ${my_file.simpleName}.appended.txt
        """
}

And modify the workflow as follows

workflow {
    Channel.fromPath( params.samplesheet )
      .splitCsv( header: true )
      .set{ input_ch }

    input_ch.map{ it["sample"] }.set{ sample_ch }
    input_ch.map{ it["content"] }.set{ content_ch }

    duplicate_lines( sample_ch )
    append_to_file( duplicate_lines.out, content_ch )
}

If you want to see how your main.nf file should look like at this stage open this hidden section

Code
nextflow.enable.dsl = 2

// this is a comment
process say_hello {
    // comments can be written anywhere
    output:
        path "hello_world.txt"
    script:
        """
        echo "This is the EBI predoc course" > hello_world.txt
        """
}

process duplicate_lines {
    publishDir "../nextflow_output/duplicate_lines"

    input:
        path my_file
    output:
        path "${my_file.simpleName}.duplicated.txt"
    script:
        """
        cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
        """
}

process append_to_file {
    publishDir "../nextflow_output/append_to_file"

    input:
        path my_file
        val my_val
    output:
        path "${my_file.simpleName}.appended.txt"
    script:
        """
        cat $my_file > ${my_file.simpleName}.appended.txt
        echo "$my_val" >> ${my_file.simpleName}.appended.txt
        """
}

workflow {
    Channel.fromPath( params.samplesheet )
      .splitCsv( header: true )
      .set{ input_ch }

    input_ch.map{ it["sample"] }.set{ sample_ch }
    input_ch.map{ it["content"] }.set{ content_ch }

    duplicate_lines( sample_ch )
    append_to_file( duplicate_lines.out, content_ch )
}

Now we need to modify the params block in your nextflow.config

params {
  samplesheet = "../nextflow_inputs/samplesheet.csv"
}

If you want to see how your nextflow.config should look like at this stage open this hidden section

Code
// you can also put comments in nextflow.config
workDir = "../nextflow_workdir"

params {
  samplesheet = "../nextflow_inputs/samplesheet.csv"
}

Now run again the workflow with nextflow run main.nf. This time if you examine the output folder with ls ../nextflow_output/append_to_file you will find a set of files

set_of_files_1.append_to_file.txt
set_of_files_2.append_to_file.txt
set_of_files_3.append_to_file.txt
set_of_files_4.append_to_file.txt
set_of_files_5.append_to_file.txt
set_of_files_6.append_to_file.txt
set_of_files_7.append_to_file.txt
set_of_files_8.append_to_file.txt

Taking as an example the first one, set_of_files_1.append_to_file.txt we would expect it to contain

This is content of file 1
This is content of file 1
csv_content_1

However, most probably it will contain something like

This is content of file 1
This is content of file 1
csv_content_3

So the first two lines have the correct number but the last line shows the wrong element! What’s more, the element shown on the last line can be different on each run (try!). We said before that channels are guaranteed to emit elements in the same order that they are received, so we should have the correct matches between files and values. What happened?

What happened is that channels are indeed guaranteed to preserve order, but processes and some operators are not because of their asynchronous and parallel nature. In our case, the process duplicate_lines receives the files in the correct order, but it is not guaranteed to emit them in the same order. This is because all the files are processed in parallel. So for example, set_of_files_5.append_to_file.txt could be processed and emitted some milliseconds earlier than set_of_files_1.append_to_file.txt, altering the order.

A better approach when we want multiple channels to be used by one process with some predictable matching is to join channels using matching keys. We will explore it in the next section. For now, let’s examine what happened in this version of our workflow:

  • We created the process append_to_file, which takes in input a file (path my_file) and a value (val my_val). This process copies the content of my_file to ${my_file.simpleName}.append_to_file.txt and then appends to the newly created file the content of my_val
  • We defined the parameter samplesheet in nextflow.config
  • We modified the workflow so that we use params.samplesheet to create a channel with the channel factory Channel.fromPath
  • We applied the operator splitCsv to this channel containing the sample sheet, using the option header: true. This operator reads the file contained in the channel assuming it to be a CSV file with a header line. Then it produces in output another channel containing, for each row in the sample sheet, a groovy map (what in Python would be called a dictionary). This map contains a key for each element in the CSV header, and a value corresponding to the value of that column in the current line of the CSV file.
  • We apply the map operator (map{ it["sample"] }) to the resulting channel
    • The map operator is follwed by a closure ({ it["sample"] })
    • You can think of a closure as an unnamed function (like a lambda function in Python)
    • The map operator calls this unnamed function for each element in the channel and creates another channel containing the respective function outputs
    • Closures understand the implicit variable it, which represents whatever input is given to the function
    • If an explicit return statement is not present in the closure, the last evaluated expression is returned by the closure ( it["sample"] in this case)
  • Since splitCsv created a dictionary for each item in the sample sheet, map{ it["sample"] } replaces that dictionary (it in the closure) with the content of the value contained in it under the key "sample"
  • input_ch is used again in input_ch.map{ it["content"] }.set{ content_ch } to extract the value of the column content for each sample in the sample sheet
    • Channels can be used many times, and each time all the files contained in them are processed. So both input_ch.map{ it["content"] }.set{ content_ch } and input_ch.map{ it["sample"] }.set{ sample_ch } process every element in input_ch
  • We created the channels sample_ch and content_ch using the set operator. They contain respectively the sample files and the content values related to them.
    • The order of processing in a channel is guaranteed, so we can be sure that each sample will be paired with the correct content value
  • We fed the channel sample_ch to the process duplicate_lines, and we fed the output of the process duplicate_lines to the process append_to_file together with the values in the channel content_ch
    • This is the step that messed up the pairing of the channels, since the process duplicate_lines is not guaranteed to produce in output elements in the same order in which it receives them in input
  • append_to_file appended the value of content_ch to the output of duplicate_lines, creating the final result

Note that we introduced a new qualifier: val. The val qualifier specifies a value, differently from the path qualifier that specifies a file. If I write an expression like val variable_name, then the variable variable_name can contain something like a string or a number, depending on what is fed to the process input. It is also possible to manually create value channels using the channel factory Channel.value. Take as an example the following

workflow{
  Channel.value(1, 2, 3).view()
}

This would produce a channel containing in order the values 1, 2, and 3. These would then be printed one by one to the terminal by the view operator.

2.5 The tuple input qualifier and joining channels

In the previous section, we saw how processes are not guaranteed to emit outputs in the same order in which inputs are received. This caused unexpected input pairs to occur in downstream processes of our workflow. Here we will see how we can enforce the correct pairing of different channels by joining them using matching keys. We will also introduce the tuple input qualifier.

Let’s first create a sample sheet very similar to the one we used before, but with the added column id to uniquely identify each line.

echo "id,sample,content" > ../nextflow_inputs/samplesheet.csv
echo "1,$(pwd)/../nextflow_inputs/set_of_files_1.txt,csv_content_1" >> ../nextflow_inputs/samplesheet.csv
echo "2,$(pwd)/../nextflow_inputs/set_of_files_2.txt,csv_content_2" >> ../nextflow_inputs/samplesheet.csv
echo "3,$(pwd)/../nextflow_inputs/set_of_files_3.txt,csv_content_3" >> ../nextflow_inputs/samplesheet.csv
echo "4,$(pwd)/../nextflow_inputs/set_of_files_4.txt,csv_content_4" >> ../nextflow_inputs/samplesheet.csv
echo "5,$(pwd)/../nextflow_inputs/set_of_files_5.txt,csv_content_5" >> ../nextflow_inputs/samplesheet.csv
echo "6,$(pwd)/../nextflow_inputs/set_of_files_6.txt,csv_content_6" >> ../nextflow_inputs/samplesheet.csv
echo "7,$(pwd)/../nextflow_inputs/set_of_files_7.txt,csv_content_7" >> ../nextflow_inputs/samplesheet.csv
echo "8,$(pwd)/../nextflow_inputs/set_of_files_8.txt,csv_content_8" >> ../nextflow_inputs/samplesheet.csv

Let’s now modify the workflow definition as follows

workflow {
    Channel.fromPath( params.samplesheet )
      .splitCsv( header: true )
      .set{ input_ch }

    input_ch.map{ [ it["id"], it["sample"] ] }.set{ sample_ch }

    duplicate_lines( sample_ch )

    input_ch.map{ [ it["id"], it["content"] ] }
      .join(duplicate_lines.out, by: 0)
      .set{ append_to_file_in_ch }

    append_to_file( append_to_file_in_ch )
}

And let’s modify the definition of the processes duplicate_lines and append_to_file as follows

process duplicate_lines {
    publishDir "../nextflow_output/duplicate_lines"

    input:
        tuple(
            val(id),
            path(my_file)
        )
    output:
        tuple(
          val(id),
          path("${my_file.simpleName}.duplicated.txt")
        )
    script:
        """
        cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
        """
}

process append_to_file {
    publishDir "../nextflow_output/append_to_file"

    input:
        tuple(
            val(id),
            val(my_val),
            path(my_file)
        )
    output:
        path "${my_file.simpleName}.appended.txt"
    script:
        """
        cat $my_file > ${my_file.simpleName}.appended.txt
        echo "$my_val" >> ${my_file.simpleName}.appended.txt
        """
}

If you want to see how your main.nf file should look like at this stage open this hidden section

Code
nextflow.enable.dsl = 2

process duplicate_lines {
    publishDir "../nextflow_output/duplicate_lines"

    input:
        tuple(
            val(id),
            path(my_file)
        )
    output:
        tuple(
          val(id),
          path("${my_file.simpleName}.duplicated.txt")
        )
    script:
        """
        cat $my_file $my_file > ${my_file.simpleName}.duplicated.txt
        """
}

process append_to_file {
    publishDir "../nextflow_output/append_to_file"

    input:
        tuple(
            val(id),
            val(my_val),
            path(my_file)
        )
    output:
        path "${my_file.simpleName}.appended.txt"
    script:
        """
        cat $my_file > ${my_file.simpleName}.appended.txt
        echo "$my_val" >> ${my_file.simpleName}.appended.txt
        """
}

workflow {
    Channel.fromPath( params.samplesheet )
      .splitCsv( header: true )
      .set{ input_ch }

    input_ch.map{ [ it["id"], it["sample"] ] }.set{ sample_ch }

    duplicate_lines( sample_ch )

    input_ch.map{ [ it["id"], it["content"] ] }
      .join(duplicate_lines.out, by: 0)
      .set{ append_to_file_in_ch }

    append_to_file( append_to_file_in_ch )
}

Now run again the workflow with nextflow run main.nf. As before, if you examine the output folder with ls ../nextflow_output/append_to_file you will find a set of files

set_of_files_1.append_to_file.txt
set_of_files_2.append_to_file.txt
set_of_files_3.append_to_file.txt
set_of_files_4.append_to_file.txt
set_of_files_5.append_to_file.txt
set_of_files_6.append_to_file.txt
set_of_files_7.append_to_file.txt
set_of_files_8.append_to_file.txt

This time, if you check their content you will see that the last line is correctly paired with the first two. Here, as an example, the content of the file set_of_files_1.append_to_file.txt is shown.

This is content of file 1
This is content of file 1
csv_content_1

Let’s examine how we achieved this result:

  • We modified the sample sheet so to include a unique ID for each line
  • We modified all the processes to expect in input not a single element but tuples of two or three elements in a single channel
    • A tuple is the non-modifiable equivalent of a list
    • When a channel emits lists of elements, the input qualifier tuple should be used
    • The input qualifiers in the process definition for the tuple elements must match the order and cardinality of the lists emitted by the channel used as input for the process
  • The input channel for the process duplicate_lines was modified so as to emit lists of two elements: the content of the id column and the content of the sample column (input_ch.map{ [ it["id"], it["sample"] ] })
  • For the process duplicate_lines, the input block has been modified to expect tuples of two elements
    • The first element is expected to be of type val and it is not used in the process (but it is passed in the output unchanged)
    • The second element is expected to be of type path and will contain the file to be duplicated
  • The output of the process duplicate_lines has also been modified to be of type tuple
    • This tuple produced in output will contain a first element of type val, consisting of the same id variable received as input
    • The second element of the tuple is the output file produced by the process, with a qualifier of type path
  • The map operator is used again of the the channel input_ch to extract the content of the columns id and content, as lists of two elements (input_ch.map{ [ it["id"], it["content"] ] })
  • The channel operator join is used to join together the output of the process duplicate_lines with the channel resulting from the previous operation (join(duplicate_lines.out, by: 0))
    • The join operator joins two or more channels together using a matching key, whose position is specified with the by keyword
    • In our case, we used by: 0 as an argument to the join operator, meaning that we want to match the keys in the first position in the respective channels
    • The items of the two channels are matched in such a way that items with the same value in the first element of the lists (corresponding to the content of the column id of the sample sheet in both cases) are joined together
    • The resulting channel after the joining operation contains lists of three elements: the matching key itself, the remainder of the list emitted by the first channel after removal of the matching key (one element), and finally the remainder of the list emitted by the second channel after removal of the matching key (one element)
    • The resulting channel is named append_to_file_in_ch (set{ append_to_file_in_ch })
  • The channel append_to_file_in_ch is used as an input to the process append_to_file
    • The process append_to_file declares in input a tuple of three items, matching the cardinality of the items produced by the channel append_to_file_in_ch
    • The output of the process duplicate_lines and the content of the column content in the sample sheet will now be matched correctly since they were joined using the matching key id

2.6 Software dependencies

We can do many things with just shell scripts, but for real-world scenarios, we would probably need to use some additional library or tool. Software dependencies for a process can be specified in Nextflow in two main ways: using software containers or virtual environments.

A software container is like a shipping container: it is a monolithic block of code and data that contains everything needed to run a specific application, including the operating system (but not the kernel, the host kernel is used). The most popular container engine (the software that is used to actually run a container) is Docker. In high-performance clusters, for security reasons, Docker is often discouraged or forbidden. A popular alternative container engine used in bioinformatics is Singularity. The good news is that Singularity is also able to run containers created with Docker, so we can consider them equivalent for our purposes.

For popular tools, a container has most probably already been created by someone and put in a public container registry like DockerHub. In this case, we can provide Nextflow with just a link to the container and it will do the rest. In case we wanted to use a custom container instead, we would have needed to write an appropriate description file for it (called a “Dockerfile”), use it to actually build the container, and then upload the container to DockerHub. This last use case will not be treated in this workshop, and we will limit ourselves to the use of pre-built containers.

On the other hand, virtual environments are a more lightweight and modifiable alternative to containers. They rely on the host operating system but they allow the definition of additional software. Nextflow is compatible with Conda environments, and can also use the faster Mamba implementation of Conda. It is worth noting that while Conda and Mamba are typically thought of as package managers for Python, they are actually able to install also R packages, the R interpreter itself, stand-alone tools, and even GPU drivers.

Differently from containers, environments do not need to be pre-built to be used by Nextflow. It is possible to just define the software that we need and Nextflow will take care to build an appropriate environment with it.

Let’s now write a workflow that makes use of virtual environments.

First download a CSV file containing a set of countries, their population, and their area by running the following

wget https://raw.githubusercontent.com/saulpierotti-ebi/saulpierotti-ebi.github.io/main/assets/population_area_by_country.csv

Now modify your main.nf file and make it look like this

nextflow.enable.dsl = 2

process calculate_population_density {
    conda "python=3.10.6 pandas=1.5.0"

    input:
        path csv_file
    output:
        path "${csv_file.simpleName}.with_density.csv"
    script:
        """
        #!/usr/bin/env Python

        import pandas

        df = pandas.read_csv("${csv_file}")
        df["density"] = df["population"] / df["area"]
        df.to_csv("${csv_file.simpleName}.with_density.csv")
        """
}

process plot_population_by_area {
    publishDir "../nextflow_output/plots"

    conda "r-base=4.1.3 r-tidyverse=1.3.2"

    input:
        path csv_file
    output:
        path "${csv_file.simpleName}.pdf"
    script:
        """
        #!/usr/bin/env Rscript

        library("tidyverse")

        df <- read_csv("${csv_file}")

        ggplot(df, aes(x=area, y=population, label=country, size=density)) +
            geom_text()

        ggsave("${csv_file.simpleName}.pdf")
        """
}

workflow {
    input_ch = Channel.fromPath( params.input )
    calculate_population_density( input_ch )
    plot_population_by_area( calculate_population_density.out )
}

Modify your nextflow.config by adding the following

conda {
    // use the faster mamba solver instead of conda
    useMamba = true
}

If you want to see how your nextflow.config should look like at this stage open this hidden section

Code
// you can also put comments in nextflow.config
workDir = "../nextflow_workdir"

params {
  samplesheet = "../nextflow_inputs/samplesheet.csv"
}

conda {
    // use the faster mamba solver instead of conda
    useMamba = true
}

Now run the workflow as follows

nextflow run main.nf --input population_area_by_country.csv

This time the workflow will take significantly longer to run since Nextflow needs to create two mamba environments. However, this will affect only the first run of the workflow, since once an environment is created it can be used multiple times. Now open the file ../nextflow_output/plots/population_area_by_country.pdf. It will contain a plot of the population of different countries versus their area, with the size proportional to their population density.

Let’s explore what happened:

  • We specified to Nextflow to use Mamba instead than Conda when using the conda directive (with conda{use.mamba = true} in nextflow.config)
  • As before, we used a command-line flag to pass an input file to the workflow (--input population_area_by_country.csv)
  • We used population_area_by_country.csv as an input for the process calculate_population_density, which uses Python code to calculate the population density of a country from its area and total population
  • The output of calculate_population_density is fed to the process plot_population_by_area, which uses R code to produce the plot

The conda directive used in these processes defines the software dependencies needed (in this case Python and pandas for the first process and R and tidyverse for the second process). Software versions can be specified with an equal sign after the software name. Packages are sourced from the repository Anaconda.

Another thing to note here: the script directive normally interprets the code written in it as bash code, but if a sh-bang (!#) is included in the first line of the script, then the software specified in the sh-bang is used to run the script section (so !#/usr/bin/env Python tells to Nextflow to interpret the script section as Python code).

A different approach to reach the same goal is to write a separate Python or R script, place it in the bin folder, and then execute it with a bash command in the script directive.

So for example you can create a file called bin/calculate_population_density.py with the following content

import pandas
import sys

df = pandas.read_csv(sys.argv[1])
df["density"] = df["population"] / df["area"]
df.to_csv(sys.argv[2])

Make it executable with

chmod +x bin/calculate_population_density.py

And change the calculate_population_density process as follows

process calculate_population_density {
    conda "python=3.10.6 pandas=1.5.0"

    input:
        path csv_file
    output:
        path "${csv_file.simpleName}.with_density.csv"
    script:
        """
        ./calculate_population_density.py $csv_file ${csv_file.simpleName}.with_density.csv
        """
}

Running the following will yield the same result as before

nextflow run main.nf --input population_area_by_country.csv

If we wanted to use containers instead than virtual environments for the process calculate_population_density to specify our software dependencies, we could have just replaced the line

conda "python=3.10.6 pandas=1.5.0"

with the line

container "docker://amancevice/pandas"

And then we would need to either set docker{enabled = true} or singularity{enabled = true} on our nextflow.config file, or we could specify the flag -with-docker or -with-singularity from the command line. A similar approach would work also for the process plot_population_by_area, using for example the container docker:/rocker/tidyverse.

Note that containers in the format docker:/username/container are pulled by Nextflow from DockerHub. They can be used either with Docker or Singularity as a container engine.

2.7 Resource allocation

Resource allocation is very useful when our workflow is run on a high-performance cluster or in the cloud. It is possible to specify the amount of RAM, the number of CPU cores, and the running time required by a process. These values are then used by Nextflow to reserve appropriate resources, such as for submitting a job to the cluster with the desired requirements.

Since we do not have a cluster environment available for this workshop, we will not try to implement an actual workflow using resource requirements (we could specify them also for a local workflow, but we would not see any effect).

An example process specifying resource requirements looks like this

process calculate_population_density {
    // this can be in GB, MB, KB, TB, ...
    memory "2 GB"
    // this can be in second(s), minute(s), hour(s), day(s)
    // it is also possible to use abbreviations such as s, m, h, d
    time "5 days"
    // just an integer (note that this is not a string)
    cpus 4

    input:
        path csv_file
    output:
        path "${csv_file.simpleName}.with_density.csv"
    script:
        """
        ./calculate_population_density.py $csv_file ${csv_file.simpleName}.with_density.csv
        """
}

2.8 The resume feature

One of the most powerful aspects of Nextflow is the resume feature. This feature can be activated by adding the following to the nextflow.config file

resume = true

Alternatively, it is also possible to run the Nextflow command with the -resume flag.

We saw before that each task is executed by Nextflow in its own directory with a long alphanumeric name. This name is not random, but it is a 128-bit hash number. This number is calculated from the following values:

  • Input files
  • Command line string
  • Container ID
  • Conda environment
  • Environment modules
  • Any executed scripts in the bin directory

So, any time one of such features of a process changes, the hash value and hence the working directory changes.

If a working directory with the same hash value as that of the task to be executed is found by Nextflow, and this folder contains all the required outputs declared in the output block of the process, and it contains a file called .exitcode containing a value of zero (this is created automatically by Nextflow to save the exit code of a task execution), then the task execution is skipped and the output files present in the directory are returned as if they were produced by the current task.

This feature allows resuming the execution of a previous failed run of a workflow from the last successful task, potentially saving a lot of computing time. A more in-depth explanation of the resume feature is available here.

To test the resume feature by ourselves, let’s modify our main.nf file as follows

nextflow.enable.dsl = 2

process dumb_process {
    publishDir "../nextflow_output"

    output:
        path "out.txt"
    script:
        """
        sleep 120
        echo "Success!" > out.txt
        """
}

workflow {
    dumb_process()
}

Now you can run the workflow with nextflow run main.nf. Our workflow just runs the process dumb_process, which does not take any input and declares a single output file, out.txt. This execution will take 2 minutes since the script block of the process dumb_process contains the command sleep 120, which just waits for 2 minutes (120 seconds), before writing the line “Success!” to the file out.txt. After the execution is complete, we can see that the line “Success!” is present in the output file by running

cat ../nextflow_output/out.txt

Now, to test the resume feature run again the workflow

nextflow run main.nf -resume

In this case, the execution will be much quicker since process_dumb is not actually executed. Instead, the cached results obtained before are returned.

2.9 nf-core pipelines

Many of the workflows that we need in our work as bioinformaticians are always the same. Mapping sequencing reads to a reference genome and then performing variant calling is an example of this.

For such common use cases, very efficient Nextflow pipelines are already available, so there is no need to reinvent the wheel. Community-curated pipelines can be found on the nf-core repository. So for example for mapping reads to a reference and then performing variant calling we would use the nf-core/sarek pipeline. These pipelines follow a standardised format where inputs are declared by the user in a sample sheet. You can read each pipeline’s documentation in nf-core to learn more about the parameters available and the expected format of the sample sheet.

I cannot make you run an actual nf-core pipeline now since there is not a simple and self-contained pipeline that does not require additional data like sequencing files to be run. However, once you have some files to process and you wrote an appropriate sample sheet, running the pipeline is very easy, for example:

nextflow run nf-core/sarek

Note that Nextflow is able to run workflows from GitHub, and nf-core/sarek in the command above is just a pointer to this GitHub repository. Nextflow takes care of cloning the repository and running the main.nf file.

If you want to actually test running a pipeline written by someone else, run the following hello world pipeline:

nextflow run nextflow-io/hello

This will just print “Hello World” in different languages to the terminal. You can explore the code for this workflow here.

2.10 Labels

The label directive is used in Nextflow to tag processes to which we want to apply specific configurations. So for example going back to the process calculate_population_density

process calculate_population_density {
    conda "python=3.10.6 pandas=1.5.0"

    input:
        path csv_file
    output:
        path "${csv_file.simpleName}.with_density.csv"
    script:
        """
        ./calculate_population_density.py $csv_file ${csv_file.simpleName}.with_density.csv
        """
}

Instead of hard-coding the conda directive into the process definition, we could have done as follows

process calculate_population_density {
    label "pandas"

    input:
        path csv_file
    output:
        path "${csv_file.simpleName}.with_density.csv"
    script:
        """
        ./calculate_population_density.py $csv_file ${csv_file.simpleName}.with_density.csv
        """
}

And then in the nextflow.config we could have written

withLabel:pandas {
    conda = "python=3.10.6 pandas=1.5.0"
}

This may seem of little benefit for a single process, but if we have many processes with similar requirements, using the label directive allows us to specify such requirements only once and in a central location. So if for example we have several processes that need Python and pandas as software requirements, and we specified the requirements using the label directive, we can change the version of pandas used just by modifying what we wrote in the nextflow.config file, instead than having to modify the definition of many processes.

Note that the label directive can be used to specify many different configurations, not only software requirements. For example, we could use it also for memory or time requirements.