GitHub - huggingface/datatrove: Freeing data processing from scripting madness by providing a set of platform-agnostic customizable pipeline processing blocks. (original) (raw)

DataTrove is a library to process, filter and deduplicate text data at a very large scale. It provides a set of prebuilt commonly used processing blocks with a framework to easily add custom functionality.

DataTrove processing pipelines are platform-agnostic, running out of the box locally or on a slurm cluster. Its (relatively) low memory usage and multiple step design makes it ideal for large workloads, such as to process an LLM's training data.

Local, remote and other file systems are supported through fsspec.

Table of contents

Installation

Requires Python 3.10+.

Available flavours (combine them by repeating --extra, e.g. uv sync --extra processing --extra s3):

Quickstart examples

You can check the following examples:

Terminology

Tip

Note that each file will be processed by a single task. Datatrove does not automatically split a file into multiple parts, so to fully parallelize you should have multiple medium sized files rather than a single large file)

Tip

Your number of tasks controls how much you can parallelize and also how much time each individual processing unit will take. If you have a small number of tasks (and they each therefore have to process a large number of files) and they fail, you will have to restart from scratch, whereas if you have a larger number of small tasks each failed task will take way less time to rerun.

Caution

If your tasks > files, some tasks will not process any data, so there usually isn't a point in setting tasks to a number larger than files.

Example

Running a job to process 10000 files, on a machine with 100 cpu cores (workers). If we choose to use 1000 tasks, each one will process a shard of 10 files. workers=100 means that we can process 100 tasks at a time.

Pipeline

DataTrove Document

Each pipeline block processes data in the datatrove Document format:

Types of pipeline blocks

Each pipeline block takes a generator of Document as input and returns another generator of Document.

Full pipeline

A pipeline is defined as a list of pipeline blocks. As an example, the following pipeline would read data from disk, randomly filter (remove) some documents and write them back to disk:

from datatrove.pipeline.readers import CSVReader from datatrove.pipeline.filters import SamplerFilter from datatrove.pipeline.writers import JsonlWriter

pipeline = [ CSVReader( data_folder="/my/input/path" ), SamplerFilter(rate=0.5), JsonlWriter( output_folder="/my/output/path" ) ]

Executors

Pipelines are platform-agnostic, which means that the same pipeline can smoothly run on different execution environments without any changes to its steps. Each environment has its own PipelineExecutor.

Some options common to all executors:

Call an executor's run method to execute its pipeline.

Tip

Datatrove keeps track of which tasks successfully completed by creating a marker (an empty file) in the ${logging_dir}/completions folder. Once the job finishes, if some of its tasks have failed, you can simply relaunch the exact same executor and datatrove will check and only run the tasks that were not previously completed.

Caution

If you relaunch a pipeline because some tasks failed, do not change the total number of tasks as this will affect the distribution of input files/sharding.

LocalPipelineExecutor

This executor will launch a pipeline on a local machine. Options:

from datatrove.executor import LocalPipelineExecutor executor = LocalPipelineExecutor( pipeline=[ ... ], logging_dir="logs/", tasks=10, workers=5 ) executor.run()

Multi-node parallelism

You can have different nodes/machines process different parts of the total tasks by using the local_tasks and local_rank_offset. For each node/instance/machine, launch with the following options:

To get final merged stats you will have to invoke the merge_stats script manually on a path containing the stats from all machines.

SlurmPipelineExecutor

This executor will launch a pipeline on a slurm cluster, using slurm job arrays to group and manage tasks. Options:

from datatrove.executor import SlurmPipelineExecutor executor1 = SlurmPipelineExecutor( pipeline=[ ... ], job_name="my_cool_job1", logging_dir="logs/job1", tasks=500, workers=100, # omit to run all at once time="10:00:00", # 10 hours partition="hopper-cpu" ) executor2 = SlurmPipelineExecutor( pipeline=[ ... ], job_name="my_cool_job2", logging_dir="logs/job2", tasks=1, time="5:00:00", # 5 hours partition="hopper-cpu", depends=executor1 # this pipeline will only be launched after executor1 successfully completes )

executor1.run()

executor2.run() # this will actually launch executor1, as it is a dependency, so no need to launch it explicitly

RayPipelineExecutor

This executor will launch a pipeline on a ray cluster, using ray tasks for parallel execution. Options:

import ray from datatrove.executor import RayPipelineExecutor ray.init() executor = RayPipelineExecutor( pipeline=[ ... ], logging_dir="logs/", tasks=500, workers=100, # omit to run all at once ) executor.run()

Logging

For a pipeline with logging_dir mylogspath/exp1, the following folder structure would be created:

See folder structure

└── mylogspath/exp1
    │── executor.json ⟵ json dump of the executor options and pipeline steps
    │── launch_script.slurm ⟵ the slurm config created and used to launch this job (if running on slurm)
    │── executor.pik ⟵ the slurm config created and used to launch this job (if running on slurm)
    │── ranks_to_run.json ⟵ list of tasks that are being run
    │── logs/
    │   └──[task_00000.log, task_00001.log, task_00002.log, ...] ⟵ individual logging files for each task
    │── completions/
    │   └──[00004, 00007, 00204, ...] ⟵ empty files marking a task as completed. Using when relaunching/resuming a job (only unfinished tasks will be run)
    │── stats/
    │   └──[00000.json, 00001.json, 00002.json, ...] ⟵ individual stats for each task (number of samples processed, filtered, removed, etc)
    └── stats.json ⟵ global stats from all tasks

Colorization

Log messages support colorization. By default, colorization will be auto detected for console messages and disabled for log files (logs/task_XXXXX.log). To explicitly enable or disable colorization, you may set the following environment variables:

DataFolder / paths

Datatrove supports a wide variety of input/output sources through fsspec.

There are a few ways to provide a path to a datatrove block (for input_folder, logging_dir, data_folder and so on arguments):

Use hf://buckets/... for raw and intermediate data (S3-like, mutable, no versioning). Use hf://datasets/... for datasets ready to be published. See HF Storage Buckets below.

Under the hood these argument combinations are parsed by get_datafolder.

Practical guides

Reading data

Usually, pipelines will start with a Reader block. Most readers take a data_folder argument — a path to a folder containing the data to be read.

These files will be distributed across each task. If you have N tasks, task with rank i (0-based) will process files i, i+N, i+2N, i+3N,....

Internally, each reader reads data and converts it into a dictionary before creating a Document object.

Some options common to most readers:

Synthetic data generation

Install the inference extras with uv sync --extra inference to pull in the lightweight HTTP client, checkpointing dependencies and async sqlite cache.

We support vLLM, SGLang, OpenAI-compatible HTTPS endpoints and a local dummy server through the InferenceRunner block. Each datatrove task can spin up its own server replica (for vllm, sglang or dummy) or talk directly to an external endpoint while asynchronous batching keeps GPU utilization high.

Custom rollouts

The core abstraction is a rollout function—a plain async callable that receives a Document, a generate(payload) callback, and any extra kwargs from shared_context. You can freely orchestrate multiple sequential or parallel generate calls inside the rollout. This gives you full control over how prompts are constructed and how generations are combined. See inference_chunked.py for examples of:

Set rollouts_per_document to automatically run the same rollout multiple times per sample; the runner collects successful outputs under document.metadata["rollout_results"].

Ready-to-use generation script

For a ready-to-use script for synthetic data generation at scale (supporting models from 1B to 1T parameters, local/SLURM execution, and multi-node setups), see generate_data.py. This script handles prompt-based generation with configurable system prompts and templates.

For raw generation output, write to hf://buckets/<org>/<bucket>/... (or use HuggingFaceBucketWriter) and only promote the cleaned, ready-to-share data to hf://datasets/.... See HF Storage Buckets.

Advanced configuration

shared_context lets you inject shared state into every rollout invocation. It accepts:

Recoverable generation:

Tune batching with max_concurrent_generations and, when pre/post-processing is heavy, raise max_concurrent_documents to allow more rollout coroutines to build payloads while requests are in flight.

Minimal end-to-end example

from datatrove.data import Document
from datatrove.executor.local import LocalPipelineExecutor
from datatrove.pipeline.inference.run_inference import InferenceConfig, InferenceRunner
from datatrove.pipeline.writers import JsonlWriter

async def simple_rollout(doc: Document, generate):
    payload = {"messages": [{"role": "user", "content": [{"type": "text", "text": doc.text}]}], "max_tokens": 2048}
    return await generate(payload)

documents = [Document(text="What's the weather in Tokyo?", id=str(i)) for i in range(1005)]
config = InferenceConfig(server_type="vllm", model_name_or_path="google/gemma-3-27b-it", rollouts_per_document=1, max_concurrent_generations=500)

LocalPipelineExecutor(
    pipeline=[
        documents,
        InferenceRunner(
            rollout_fn=simple_rollout,
            config=config,
            skip_bad_requests=True,
            records_per_chunk=500,
            checkpoints_local_dir="/fsx/.../translate-checkpoints",
            output_writer=JsonlWriter("s3://.../final_output_data", output_filename="${rank}_chunk_${chunk_index}.jsonl"),
        ),
    ],
    logging_dir="/fsx/.../inference_logs",
    tasks=1,
).run()

The extended inference_chunked.py script demonstrates single- and multi-rollout flows, resumable checkpoints and sharing a process pool across rollouts.

Progress monitoring

For long-running inference jobs, you can use InferenceProgressMonitor to periodically update a HuggingFace dataset card with a progress bar and ETA. After inference completes, InferenceDatasetCardGenerator creates a final dataset card with statistics.

from datatrove.pipeline.inference import InferenceDatasetCardParams, InferenceProgressMonitor, InferenceDatasetCardGenerator

params = InferenceDatasetCardParams( output_repo_id="your-username/output-dataset", input_dataset_name="simplescaling/s1K-1.1", input_dataset_split="train", model_name="Qwen/Qwen3-0.6B", # ... other params )

Monitor pipeline (runs in parallel with inference on Slurm)

monitor_pipeline = [InferenceProgressMonitor(params=params, update_interval=3600)]

Final card generation (runs after inference completes)

datacard_pipeline = [InferenceDatasetCardGenerator(params=params)]

See progress_monitoring.py for a complete example with Slurm integration.

Benchmarking

To measure vLLM throughput across different models and configurations (TP, PP, speculative decoding), use the benchmark tools. The benchmark suite provides:

Extracting text

You can use extractors to extract text content from raw html. The most commonly used extractor in datatrove is Trafilatura, which uses the trafilatura library.

Filtering data

Filters are some of the most important blocks of any data processing pipeline. Datatrove's filter blocks take a Document and return a boolean (True to keep a document, False to remove it). Removed samples do not continue to the next pipeline stage. You can also save the removed samples to disk by passing a Writer to the exclusion_writer parameter.

Saving data

Once you are done processing your data you will probably want to save it somewhere. For this you can use a writer. Writers require an output_folder (the path where data should be saved). You can choose the compression to use (default: gzip) and the filename to save each file as. For the output_filename, a template is applied using the following arguments:

An example to separate samples by language based on their lang metadata field:

JsonlWriter(
    f"{MAIN_OUTPUT_PATH}/non_english/",
    output_filename="${language}/" + DUMP + "/${rank}.jsonl.gz",  # folder structure: language/dump/file
)

Where to write data on the Hugging Face Hub

For Hub-backed output, prefer buckets for raw / intermediate data anddatasets for the published, ready-to-share version. Both have dedicated writers:

from datatrove.pipeline.writers import HuggingFaceBucketWriter, HuggingFaceDatasetWriter

Recommended for raw / intermediate output of large pipelines:

HuggingFaceBucketWriter( bucket="myorg/my-bucket", prefix="v1/raw", # path inside the bucket private=True, overwrite=True, # delete existing files at prefix first (default: False = append) )

Use this when promoting the final dataset to a published HF dataset:

HuggingFaceDatasetWriter( dataset="myorg/my-dataset", private=True, )

See HF Storage Buckets for the full picture (including direct fsspec writes, hf-mount, and HF Jobs volume mounts).

HF Storage Buckets

Hugging Face storage buckets are S3-like, mutable object storage backed by Xet. They are the recommended destination for raw and intermediate data; promote the final, ready-to-publish version to a dataset (hf://datasets/...).

You can read from and write to buckets in four ways — pick the one that fits your deployment:

Approach When to use
HuggingFaceBucketWriter Large datasets, staged Xet uploads, auto-create the bucket. Supports overwrite=True to replace existing files.
Direct fsspec path (hf://buckets/...) on any reader/writer Simple read/write through HfFileSystem; no extra setup.
hf-mount (FUSE/NFS) Best read performance with zero code changes; treat the bucket like a local dir.
HF Jobs volume mounts Zero setup when running on HF infra; the bucket is mounted at the path you choose.

Reading uses the existing ParquetReader / JsonlReader blocks — no dedicated bucket reader is needed because buckets are raw object storage. Seeexamples/bucket_synthetic_data.py for a side-by-side comparison of all four approaches.

from datatrove.pipeline.readers import ParquetReader from datatrove.pipeline.writers import HuggingFaceBucketWriter

Reading: any reader with an hf://buckets/... path works.

reader = ParquetReader(data_folder="hf://buckets/myorg/my-bucket/raw/")

Writing: HuggingFaceBucketWriter stages files locally, then pushes via Xet.

Set overwrite=True to replace existing files at the prefix (default: append).

writer = HuggingFaceBucketWriter( bucket="myorg/my-bucket", prefix="v1/filtered", private=True, cleanup=True, overwrite=True, )

Bucket URLs also work as logging_dir on any executor, e.g.logging_dir="hf://buckets/myorg/my-bucket/logs/v1".

Deduplicating data

For deduplication check the examples minhash_deduplication.py, sentence_deduplication.py and exact_substrings.py.

Summary Statistics

For summary statistics on your data you can use the Stats blocks. These blocks provide an easy way to collect data-profiles on your dataset in a distributed manner. It's a two step process in which you first:

  1. For each shard iterate over documents and collect stats into of the following groupings summary (all docs counted to "summary" key), fqdn (fully qualified domain name grouping), suffix (the last part of the url path grouping) or histogram (value based grouping).
  2. Merge the stats from different shards into a single file. See the summary_stats.py for more details.

Each resulting stat is saved in a separate file with following structure: output_folder/{fqdn,suffix,summary,histogram}/{stat_name}/metric.json

Each such file is a MetricStatsDict object, which you can easily load using:

from datatrove.pipeline.stats.summary_stats import MetricStatsDict import json stats = MetricStatsDict.from_dict(json.load(open("fqdn/length/metric.json")))

E.g for total length of nytimes.com docs

stats["nytimes.com"].total

Or for mean of cnn.com docs

stats["cnn.com"].mean

Following stats are available:

Custom blocks

Simple data

You can pass an iterable of Document directly as a pipeline block like so:

from datatrove.data import Document from datatrove.pipeline.filters import SamplerFilter from datatrove.pipeline.writers import JsonlWriter

pipeline = [ [ Document(text="some data", id="0"), Document(text="some more data", id="1"), Document(text="even more data", id="2"), ], SamplerFilter(rate=0.5), JsonlWriter( output_folder="/my/output/path" ) ]

Do note, however, that this iterable will not be sharded (if you launch more than 1 task they will all get the full iterable). This is usually useful for small workloads/testing.

Custom function

For simple processing you can simply pass in a custom function with the following signature:

from datatrove.data import DocumentsPipeline

def uppercase_everything(data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline: """ data is a generator of Document. You must also return a generator of Document (yield) You can optionally use rank and world_size for sharding """ for document in data: document.text = document.text.upper() yield document

pipeline = [ ..., uppercase_everything, ... ]

Tip

You might have some pickling issues due to the imports. If this happens, simply move whatever imports you need inside the function body.

Custom block

You can also define a full block inheriting from PipelineStep or one of its subclasses:

from datatrove.pipeline.base import PipelineStep from datatrove.data import DocumentsPipeline from datatrove.io import DataFolderLike, get_datafolder

class UppercaserBlock(PipelineStep): def init(self, some_folder: DataFolderLike, some_param: int = 5): super().init() # you can take whatever parameters you need and save them here self.some_param = some_param # to load datafolders use get_datafolder() self.some_folder = get_datafolder(some_folder)

def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
    # you could also load data from the `some_folder`:
    for filepath in self.some_folder.get_shard(rank, world_size): # it also accepts a glob pattern, among other things
        with self.some_folder.open(filepath, "rt") as f:
            # do something
            ...
            yield doc

    #
    # OR process data from previous blocks (`data`)
    #

    for doc in data:
        with self.track_time():
            # you can wrap the main processing code in `track_time` to know how much each document took to process
            nr_uppercase_letters = sum(map(lambda c: c.isupper(), doc.text))
            # you can also keep track of stats per document using stat_update
            self.stat_update("og_upper_letters", value=nr_uppercase_letters)
            doc.text = doc.text.upper()
        # make sure you keep the yield outside the track_time block, or it will affect the time calculation
        yield doc

    #
    # OR save data to disk
    #

    with self.some_folder.open("myoutput", "wt") as f:
        for doc in data:
            f.write(doc...)

pipeline = [ ..., UppercaserBlock("somepath"), ... ]

You could also inherit from BaseExtractor, BaseFilter, BaseReader/BaseDiskReader, or DiskWriter.

Contributing

git clone git@github.com:huggingface/datatrove.git && cd datatrove uv sync --extra dev

Install pre-commit code style hooks:

Run code style checks:

Fast local loop (changed Python files only)

make quality make style

Full repository checks (same scope as CI)

make quality-full make style-full

Run the tests:

Citation

@misc{penedo2024datatrove, author = {Penedo, Guilherme and Kydlíček, Hynek and Cappelli, Alessandro and Sasko, Mario and Wolf, Thomas}, title = {DataTrove: large scale data processing}, year = {2024}, publisher = {GitHub}, journal = {GitHub repository}, url = {https://github.com/huggingface/datatrove} }