Skip to content

Handling large datasets

On this page we discuss some general best practices for mapping data-intensive workflows to DelftBlue. Some of the advice is rather specific, but the underlying principles are universal. Information related to the /scratch drive equally applies to /home and /projects as all of these mount points live on the same parallel BeeGFS file system.

Task 1: Downloading and copying data to DelftBlue /scratch

If you want to use the dataset regularly, it is advised to do this on a TU Delft project drive (mounted on /tudelft.net/staff-umbrella/). You can download the data on any machine with access to the central storage and zip it before moving to DelftBlue. Or you can download the data on one of the login nodes, zip it to reduce the number of files, and copy it to /scratch. using rsync.

Alternatively, you can download the data directly to the node-local SSD (/tmp) within a job script. This has the advantage of eliminating network file system accesses on one side of the copy operation, as you can see in the following example.

Example. The following script downloads about 5GB of data in 24 files from the ESA Gaia mission repository:

get_data.sh
#!/bin/bash

# how many data sets do you want? (max 24)
NUM_DATASETS=24

# paths to/on ESA file server
URL="https://cdn.gea.esac.esa.int/Gaia/gdr3/gaia_source"

# we hard-code the first 24 ID's here so that
# we can get up to NUM_DATASETS=24 datasets
INDEXSTRING="\
000000-003111 \
003112-005263 \
005264-006601 \
006602-007952 \
007953-010234 \
010235-012597 \
012598-014045 \
014046-015369 \
015370-016240 \
016241-017018 \
017019-017658 \
017659-018028 \
018029-018472 \
018473-019161 \
019162-019657 \
019658-020091 \
020092-020493 \
020494-020747 \
020748-020984 \
020985-021233 \
021234-021441 \
021442-021665 \
021666-021919 \
021920-022158"

INDEX=( $INDEXSTRING )

for ((i=0; i<${NUM_DATASETS}; i++)); do
  FILE=GaiaSource_${INDEX[$i]}.csv.gz
  if [[ ! -f ${FILE} ]]; then
      wget ${URL}/${FILE}
  else
      echo "${FILE} exists -- skipping download."
  fi
done
echo "done."
The three variants give the following timings, where the first two are run on the login node and the third on a compute node. "n.n." stands for "not noeeded": zipping and copying the files to /scratch in the latter case is only required if you want to use the data again in subsequent jobs.

command

/tudeflt.net

/scratch

/tmp (compute node)

./get_data.sh

4.5 minutes

8 minutes

3.5 minutes

zip -0 all_data.zip *.csv.gz

1.5 minutes

4.5 minutes

0.5 minutes/n.n

rsync all_data.zip /scratch/$USER

4.0 minutes

n.n.

1.3 minutes

The node-local variant is the fastest, followed by /tudelft.net/zip/rsync. The variant downloading directly to /scratch is not advisable because it is slow and may put a lot of files on the parallel file system, where your quota is limited to one million files.

What if the rsync takes long?

As of Feb 2024, DelftBlue does not offer dedicated nodes for file transfer, and only the login nodes have direct access to the central TU Delft storage mounted on /tudelft.net.

If you need to transfer a very large amount of data from /tudelft.net to /scratch, you may want to start the copying process in the background and log out. However, if you do so naively, the process will be stopped upon logout (or ssh disconnect). To keep the copying job running, use disown. Furthermore, you can use the nice and ionice commands to run at lower priority (and thus be nice to other users). Continuing on the example above, this may look like this:

nice -n 19 ionice -c2 -n7 rsync -aq all_data.zip /scratch/$USER/ &
disown -h %1
logout

If the process is already running (in the foreground), you can type CTRL+X, bg %1 and disown -h %1 to stop it, restart it in the background and then disown it so that you can logout without the process being stopped.

Task 2: Working with the data on DelftBlue

Once your data is on /scratch (ideally in a single file or a few larger files), you want to start a job that "does something" with it. There are three typical scenarios that we want to address here.

(i) The job accesses (reads/writes) a few large files infrequently

This is often the case when you have a large simulation that reads input data at the beginning and writes checkpoints and simulation results at intervals much larger than the time it takes to write them.

The parallel file system is optimized for this case, so you can simply keep your data in separate files on /scratch. If you find that the time for I/O is excessive, please contact us and we can help you optimize the application. This may include

  • Adjusting output frequency and volume of data;
  • Reorganizing output to write large chunks of consecutive data;
  • Using MPI I/O to maximize bandwidth.

Note that it is always possible that the file system is slow due to heavy usage by others. The only way to circumvent this problem is to write checkpoints to the local SSD's on the nodes (/tmp). This means that the data of every process has to be written in a separate file, which your application may or may not support.

(ii) Many small jobs need to access different parts of the data

Separate processes can only "communicate" via the `/scratch file system. If you implement your application in this way, be aware that this is a shared resource that can easily be slowed down for everyone by a single user if he/she submits many jobs or a job array.

To avoid this, we recommend using MPI to parallelize the computational task and doing all I/O on one or a few processes. The data can then be distributed to the other processes by e.g. a scatter. operation, and results collected using gather.

This has the advantage that the files will be read in a single/few large chunk(s), and the distribution happens via shared memory or InfiniBand rather than the file servers.

A Python example of this workflow is given below. Python's mpi4py implementation of MPI is particularly easy to use: In the example, we read pandas DataFrame objects from csv files in a zip archive and distribute them to all MPI processes. Let's first generate some csv files using the script

create-input-files.sh
#!/bin/bash

nfiles=8
nperfile=10

for ((i=0; i<$nfiles; i++)); do
    filename=file-$i.csv
    echo "Create $filename..."
    echo "ID,IntegerValue,FloatValue" > $filename
    for ((j=0; j<$nperfile; j++)) do
      k=$(expr $i \* $nperfile + $j)
      printf "%d,%d,%.3e\n" $k $((2**$j )) "$(((1000 * $i) / ($j + 1)))e-3" >> $filename
    done
done

echo "zip everything into all_files.zip"
zip -9 all_files.zip file-*.csv
rm file-*.csv
The csv files are compressed into a single file all_files.zip. The Python example is given here:

mpi4py_example.py
from zipfile import ZipFile
from io import BytesIO

import numpy as np
import pandas as pd
from mpi4py import MPI

# This Python script will run on multiple processes using MPI
# The root process reads the contents of a zip file one-by-one
# into  padas DataFrame objects and distributes them among all processes.
# after some local work, a final result is collected to process 0.

comm=MPI.COMM_WORLD
nproc=comm.size
rank=comm.rank
zipfilename='all_files.zip'

zip_file = None
file_list = None
nfiles = None

if rank == 0:
    zip_file = ZipFile(zipfilename, 'r')
    file_list = zip_file.namelist()
    nfiles = len(file_list)

nfiles = comm.bcast(nfiles, root=0)

my_dataframes = []

# now we need to read all the data on root (rank 0), but if we do it in one big chunk,
# it may run out of memory. We read at most 'nproc' files at a time, and distribute them
# in a cyclic way over the processes. To make this more scalable, e.g. if you have one
# file per process, you could have an increasing number of processes reading part of the
# files instead and distribute them among their "team mates".
for i in range(0, nfiles, nproc):
    dfs = None
    if rank == 0:
        iend=min(nfiles,i+nproc)
        print('read files %d:%d'%(i,iend-1))
        dfs = []
        for j in range(i,iend):
            csv_buffer=BytesIO(zip_file.read(file_list[i]))
            dfs.append(pd.read_csv(csv_buffer))
            # note: MPI scatter needs a value for every process
        while len(dfs)<nproc:
            dfs.append(None)
    my_dataframes.append(comm.scatter(dfs, root=0))

# do some parallel work on the data
my_mean_values = []
for df in my_dataframes:
    if df is not None:
        my_mean_values.append(df.FloatValue.mean())

# collect the results on the root process
mean_values = comm.gather(np.array(my_mean_values), root=0)

if rank == 0:
    mean_values = np.concatenate(mean_values)
    max_mean = np.max(mean_values)

    print('Largest mean value found: %e'%(max_mean))

The Python script computes the maximum of the mean values in the 'FloatValue' column of each csv-file. Note that we never unzip the file to a file system. To run this example with 5 processes on DelftBlue, the following job script can be used. Note that the cores Slurm assigns may be spread arbitrarily over the nodes of the system.

mpi4py_example.slurm
#!/bin/bash
#SBATCH --ntasks=5
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=1GB
#SBATCH --time=00:10:00
#SBATCH --output=mpi4py_example.out
# make sure to add your account!
##SBATCH --account=<what>-<faculty>-<group>

module load 2023r1
module load openmpi
module load py-mpi4py
module load py-pandas
module load py-numpy

# create the dataset all_files.zip
./create-input-files.sh

srun python3 mpi4py_example.py

# clean up the input files
rm all_files.zip

(iii) The job essentially requires randomly accessing a large number of files

For an HPC system, this is the most difficult scenario to handle. It is often encountered in applications of machine learning, and requires careful optimization of your workflow. Otherwise, I/O may (a) become a severe bottleneck in your computations, rendering even the most powerful computing hardware useless, and (b) disrupt the work of other users on the system, ultimately leading to a concerned e-mail from our system administrator.

There are two things you can do if your work falls into this category.

1) If the data volume is not larger than a few hundred Gigabytes, copy it as a single (zip or tar) file to /tmp at the beginning of your job and unpack it there. Rationale: /tmp on DelftBlue is a node-local SSD that is only shared with jobs on that node, and more suitable for small random I/O operations than the larger, spinning-disk based /projects, /scratch and /home.

Warning

``/tmp`` may run out of disk space if two jobs on the same node occupy a large portion of it.
Please clean up after yourself, for example by using ``/tmp/${Slurm_JOBID}/`` as working directory and deleting it at the end of your
``Slurm`` script.

2) If your dataset is extremely large, you should consider working on batches (note that this is common practice in machine learning!) or using distributed memory parallelism across multiple nodes. This is less common in machine learning than in classical HPC, but frameworks like PyTorch offer extensive functionality. Specifically, you may want to look at

- DataLoaders designed to overcome the "million file problem", e.g.
    - [WebDatasets](https://webdataset.github.io/webdataset/)
    - [DataDings](https://datadings.readthedocs.io/en/stable/)
- [PyTorch DataDistributedParallel class](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)