Optimize Your I/O¶
On this page we discuss some general best practices for mapping data-intensive workflows to DelftBlue.
"Data intensive" may mean that you have a large input dataset, your simulation produces significant output data,
or perhaps you just need a lot of intermediate I/O to start/restart/coordinate a parallel computation.
Some of the advice is rather specific, but the underlying principles are universal.
Information related to the /scratch
drive equally applies to /home
and /projects
as all of these mount points
live on the same parallel BeeGFS file system.
Task 1: Downloading and copying data to DelftBlue /scratch
¶
If you want to use the dataset regularly, it is advised to do this on a TU Delft project drive (mounted on /tudelft.net/staff-umbrella/
).
You can download the data on any machine with access to the central storage and zip it before moving to DelftBlue.
Or you can download the data on one of the login nodes, zip it to reduce the number of files, and copy it to /scratch
.
using rsync
.
Alternatively, you can download the data directly to the node-local SSD (/tmp
) within a job script.
This has the advantage of eliminating network file system accesses on one side of the copy operation, as you
can see in the following example.
Example. The following script downloads about 5GB of data in 24 files from the ESA Gaia mission repository:
#!/bin/bash
# how many data sets do you want? (max 24)
NUM_DATASETS=24
# paths to/on ESA file server
URL="https://cdn.gea.esac.esa.int/Gaia/gdr3/gaia_source"
# we hard-code the first 24 ID's here so that
# we can get up to NUM_DATASETS=24 datasets
INDEXSTRING="\
000000-003111 \
003112-005263 \
005264-006601 \
006602-007952 \
007953-010234 \
010235-012597 \
012598-014045 \
014046-015369 \
015370-016240 \
016241-017018 \
017019-017658 \
017659-018028 \
018029-018472 \
018473-019161 \
019162-019657 \
019658-020091 \
020092-020493 \
020494-020747 \
020748-020984 \
020985-021233 \
021234-021441 \
021442-021665 \
021666-021919 \
021920-022158"
INDEX=( $INDEXSTRING )
for ((i=0; i<${NUM_DATASETS}; i++)); do
FILE=GaiaSource_${INDEX[$i]}.csv.gz
if [[ ! -f ${FILE} ]]; then
wget ${URL}/${FILE}
else
echo "${FILE} exists -- skipping download."
fi
done
echo "done."
/scratch
in the latter case is only required if you want to use the data again
in subsequent jobs.
command |
/tudeflt.net |
/scratch |
/tmp (compute node) |
---|---|---|---|
|
4.5 minutes |
8 minutes |
3.5 minutes |
|
1.5 minutes |
4.5 minutes |
0.5 minutes/n.n |
|
4.0 minutes |
n.n. |
1.3 minutes |
The node-local variant is the fastest, followed by /tudelft.net
/zip/rsync.
The variant downloading directly to /scratch
is not advisable because it is slow and may put a lot of files on the parallel file system,
where your quota is limited to one million files.
What if the rsync
takes long?¶
As of Feb 2024, DelftBlue does not offer dedicated nodes for file transfer, and only the login nodes
have direct access to the central TU Delft storage mounted on /tudelft.net
.
If you need to transfer a very large amount of data from /tudelft.net
to /scratch
, you may
want to start the copying process in the background and log out. However, if you do so naively,
the process will be stopped upon logout (or ssh
disconnect). To keep the copying job running,
use disown
. Furthermore, you can use the nice
and ionice
commands to run at lower priority
(and thus be nice to other users).
Continuing on the example above, this may look like this:
If the process is already running (in the foreground), you can type CTRL+X
, bg %1
and disown -h %1
to stop it, restart it in the background and then disown it
so that you can logout without the process being stopped.
Task 2: Working with the data on DelftBlue¶
Once your data is on /scratch
(ideally in a single file or a few larger files), you want to start a job
that "does something" with it. There are three typical scenarios that we want to address here.
(i) The job accesses (reads/writes) a few large files infrequently¶
This is often the case when you have a large simulation that reads input data at the beginning and writes checkpoints and simulation results at intervals much larger than the time it takes to write them.
The parallel file system is optimized for this case, so you can simply keep your data in separate files on /scratch
.
If you find that the time for I/O is excessive, please contact us and we can help you optimize the application.
This may include
- Adjusting output frequency and volume of data;
- Reorganizing output to write large chunks of consecutive data;
- Using
MPI I/O
to maximize bandwidth.
Note that it is always possible that the file system is slow due to heavy usage by others. The only way to
circumvent this problem is to write checkpoints to the local SSD's on the nodes (/tmp
). This means that the data
of every process has to be written in a separate file, which your application may or may not support.
(ii) Many small jobs need to access different parts of the data¶
Separate processes can only "communicate" via the `/scratch
file system. If you implement your
application in this way, be aware that this is a shared resource that can easily be slowed down
for everyone by a single user if he/she submits many jobs or a job array.
To avoid this, we recommend using MPI to parallelize the computational task and doing all I/O
on one or a few processes. The data can then be distributed to the other processes by e.g. a scatter
.
operation, and results collected using gather
.
This has the advantage that the files will be read in a single/few large chunk(s), and the distribution happens via shared memory or InfiniBand rather than the file servers.
A Python example of this workflow is given below. Python's mpi4py
implementation of MPI
is particularly easy to use: In the example, we read pandas DataFrame
objects from csv files
in a zip archive and distribute them to all MPI processes.
Let's first generate some csv files using the script
#!/bin/bash
nfiles=8
nperfile=10
for ((i=0; i<$nfiles; i++)); do
filename=file-$i.csv
echo "Create $filename..."
echo "ID,IntegerValue,FloatValue" > $filename
for ((j=0; j<$nperfile; j++)) do
k=$(expr $i \* $nperfile + $j)
printf "%d,%d,%.3e\n" $k $((2**$j )) "$(((1000 * $i) / ($j + 1)))e-3" >> $filename
done
done
echo "zip everything into all_files.zip"
zip -9 all_files.zip file-*.csv
rm file-*.csv
all_files.zip
. The Python example is given here:
from zipfile import ZipFile
from io import BytesIO
import numpy as np
import pandas as pd
from mpi4py import MPI
# This Python script will run on multiple processes using MPI
# The root process reads the contents of a zip file one-by-one
# into padas DataFrame objects and distributes them among all processes.
# after some local work, a final result is collected to process 0.
comm=MPI.COMM_WORLD
nproc=comm.size
rank=comm.rank
zipfilename='all_files.zip'
zip_file = None
file_list = None
nfiles = None
if rank == 0:
zip_file = ZipFile(zipfilename, 'r')
file_list = zip_file.namelist()
nfiles = len(file_list)
nfiles = comm.bcast(nfiles, root=0)
my_dataframes = []
# now we need to read all the data on root (rank 0), but if we do it in one big chunk,
# it may run out of memory. We read at most 'nproc' files at a time, and distribute them
# in a cyclic way over the processes. To make this more scalable, e.g. if you have one
# file per process, you could have an increasing number of processes reading part of the
# files instead and distribute them among their "team mates".
for i in range(0, nfiles, nproc):
dfs = None
if rank == 0:
iend=min(nfiles,i+nproc)
print('read files %d:%d'%(i,iend-1))
dfs = []
for j in range(i,iend):
csv_buffer=BytesIO(zip_file.read(file_list[i]))
dfs.append(pd.read_csv(csv_buffer))
# note: MPI scatter needs a value for every process
while len(dfs)<nproc:
dfs.append(None)
my_dataframes.append(comm.scatter(dfs, root=0))
# do some parallel work on the data
my_mean_values = []
for df in my_dataframes:
if df is not None:
my_mean_values.append(df.FloatValue.mean())
# collect the results on the root process
mean_values = comm.gather(np.array(my_mean_values), root=0)
if rank == 0:
mean_values = np.concatenate(mean_values)
max_mean = np.max(mean_values)
print('Largest mean value found: %e'%(max_mean))
The Python script computes the maximum of the mean values in the 'FloatValue' column of each csv-file. Note that we never unzip the file to a file system. To run this example with 5 processes on DelftBlue, the following job script can be used. Note that the cores Slurm assigns may be spread arbitrarily over the nodes of the system.
#!/bin/bash
#SBATCH --ntasks=5
#SBATCH --cpus-per-task=1
#SBATCH --mem-per-cpu=1GB
#SBATCH --time=00:10:00
#SBATCH --output=mpi4py_example.out
# make sure to add your account!
##SBATCH --account=<what>-<faculty>-<group>
module load 2024r1
module load openmpi
module load py-mpi4py
module load py-pandas
module load py-numpy
# create the dataset all_files.zip
./create-input-files.sh
srun python3 mpi4py_example.py
# clean up the input files
rm all_files.zip
(iii) The job essentially requires randomly accessing a large number of files¶
For an HPC system, this is the most difficult scenario to handle. It is often encountered in applications of machine learning, and requires careful optimization of your workflow. Otherwise, I/O may (a) become a severe bottleneck in your computations, rendering even the most powerful computing hardware useless, and (b) disrupt the work of other users on the system, ultimately leading to a concerned e-mail from our system administrator.
There are two things you can do if your work falls into this category.
1) If the data volume is not larger than a few hundred Gigabytes, copy it
as a single (zip or tar) file to /tmp
at the beginning of your job and unpack it there.
Rationale: /tmp
on DelftBlue is a node-local SSD that is only shared with jobs on that node, and
more suitable for small random I/O operations than the larger, spinning-disk based /projects
, /scratch
and /home
.
Warning
``/tmp`` may run out of disk space if two jobs on the same node occupy a large portion of it.
Please clean up after yourself, for example by using ``/tmp/${Slurm_JOBID}/`` as working directory and deleting it at the end of your
``Slurm`` script.
2) If your dataset is extremely large, you should consider working on batches (note that this is common practice in machine learning!) or using distributed memory parallelism across multiple nodes. This is less common in machine learning than in classical HPC, but frameworks like PyTorch offer extensive functionality. Specifically, you may want to look at
- DataLoaders designed to overcome the "million file problem", e.g.
- [WebDatasets](https://webdataset.github.io/webdataset/)
- [DataDings](https://datadings.readthedocs.io/en/stable/)
- [PyTorch DataDistributedParallel class](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html)