Data Preparation Module#
The nemotron.data_prep module focuses on last-mile data processing—transforming curated datasets into training-ready formats. It bridges the gap between data curation (handled by NeMo Curator) and model training, producing outputs compatible with the NVIDIA AI training stack.
Coming Soon: Native integration with NeMo Curator for data curation and NeMo Data Designer for synthetic data generation. These integrations will enable end-to-end data pipelines from raw sources to training-ready formats.
Overview#
Built on top of Ray, the data preparation module provides:
Last-mile processing — Convert curated datasets to training-ready formats (tokenization, packing, chat templating)
Distributed processing — Scale from a single machine to a cluster of workers using Ray actors
Cloud-native I/O — Read from HuggingFace Hub (
hf://), S3 (s3://), GCS (gs://), or local paths via fsspecDeterministic output — Frozen shard plans ensure reproducible results across runs
Resumable pipelines — Skip completed shards on restart; verify output integrity with checksums
Data Pipeline: Curator → Data Prep → Training#
This module is designed to work alongside NeMo Curator in a two-stage data pipeline:
%%{init: {'theme': 'base', 'themeVariables': { 'primaryBorderColor': '#333333', 'lineColor': '#333333', 'primaryTextColor': '#333333', 'clusterBkg': '#ffffff', 'clusterBorder': '#333333'}}}%%
flowchart LR
subgraph sources["📁 Raw Data"]
CC["CommonCrawl"]
HF["HuggingFace"]
Custom["Custom Sources"]
end
subgraph curator["🔧 NeMo Curator"]
C1["Deduplication"]
C2["Quality Filtering"]
C3["Language ID"]
C4["PII Removal"]
C1 --- C2 --- C3 --- C4
end
subgraph dataprep["⚡ Data Prep"]
D1["Tokenization"]
D2["Chat Templating"]
D3["Sequence Packing"]
D4["Loss Mask Creation"]
D1 --- D2 --- D3 --- D4
end
subgraph training["🚀 Training"]
T1["Megatron-Bridge"]
T2["NeMo-RL"]
end
sources --> curator
curator -->|"Curated Data"| dataprep
dataprep -->|"bin/idx, .npy, JSONL"| training
Typical workflow:
Use NeMo Curator to curate raw data at scale—deduplication, quality filtering, language identification, PII removal
Use Data Prep to transform curated data into training-ready formats—tokenization, chat templating, sequence packing, loss mask generation
This separation allows you to curate once and prepare data multiple times for different training configurations (different tokenizers, sequence lengths, or output formats).
Recipe Integration#
Each training stage in a recipe includes a dedicated data preparation step that transforms source data into the format required by that stage’s training framework:
Stage |
Data Prep Output |
Training Framework |
Guide |
|---|---|---|---|
Stage 0: Pretrain |
bin/idx indexed datasets |
||
Stage 1: SFT |
Packed .npy with loss masks |
||
Stage 2: RL |
JSONL with OpenAI chat format |
Run data preparation for any stage using the CLI:
uv run nemotron nano3 data prep pretrain --run YOUR-CLUSTER # Stage 0
uv run nemotron nano3 data prep sft --run YOUR-CLUSTER # Stage 1
uv run nemotron nano3 data prep rl --run YOUR-CLUSTER # Stage 2
Note: The
--run YOUR-CLUSTERflag submits jobs via NeMo-Run. See Execution through NeMo-Run for setup.
NeMo-Run Integration#
The module integrates natively with NeMo-Run for job orchestration. Submit data preparation jobs to various executors (Slurm, local, Docker, cloud) directly from your local machine:
# Submit to Slurm cluster
uv run nemotron nano3 data prep pretrain --run YOUR-CLUSTER
# Run locally with Ray
uv run nemotron nano3 data prep pretrain --run local
# Preview without executing
uv run nemotron nano3 data prep pretrain --run YOUR-CLUSTER --dry-run
Configure execution profiles in env.toml:
[YOUR-CLUSTER]
executor = "slurm"
account = "YOUR-ACCOUNT"
partition = "batch"
See Execution through NeMo-Run for complete configuration options.
Choosing an API#
API |
Use When |
Output Format |
|---|---|---|
|
Simple tokenization pipelines |
bin/idx |
|
Custom formats, transforms, chat SFT |
Any |
Use high-level API (run_data_prep) for:
Standard pretraining data tokenization
Per-split output (train/valid/test) with
PerSplitConfigWhen you want automatic Ray initialization and artifact tracking
Use low-level API (last_mile_process) for:
JSONL output with custom transforms
Chat SFT with loss masking (
ChatSftOutputConfig)Packed sequences without chat templates
Custom pipeline orchestration
Supported Output Formats#
Format |
Description |
Use Case |
|---|---|---|
|
Tokenized |
Pretraining (default) |
|
JSONL files with optional transforms |
SFT/RL training |
|
Packed sequences in |
Efficient SFT training |
Quick Start#
High-Level API (DataPrepConfig)#
For simple tokenization to binidx format:
from nemotron.data_prep import DataPrepConfig, run_data_prep
from pathlib import Path
config = DataPrepConfig(
blend_path=Path("data_blend.json"),
output_dir=Path("./output"),
tokenizer_model="meta-llama/Llama-3.2-1B",
)
artifact = run_data_prep(config)
print(f"Blend path: {artifact.path}")
Low-Level API (last_mile_process)#
For more control over output format:
from nemotron.data_prep import last_mile_process, DataBlend, PipelineConfig
from nemotron.data_prep.config import OutputConfig, JsonlOutputConfig
from nemotron.data_prep.formats.transforms import sft
blend = DataBlend.load("data_blend.json")
config = PipelineConfig(
output=OutputConfig(
dir=Path("./sft_data"),
format=JsonlOutputConfig(
transform=sft(input="instruction", output="response"),
),
),
)
result = last_mile_process(blend, config)
Output Formats#
BinIdx (Default)#
Tokenized binary format for Megatron pretraining:
from nemotron.data_prep.config import BinIdxOutputConfig
config = PipelineConfig(
tokenizer=TokenizerConfig(model="meta-llama/Llama-3.2-1B"),
output=OutputConfig(
dir=Path("./tokenized"),
format=BinIdxOutputConfig(
shard_size="256MB", # Or num_shards=128
dtype="int32",
),
),
)
JSONL#
Structured JSONL for SFT/RL training (no tokenization):
from nemotron.data_prep.config import JsonlOutputConfig
from nemotron.data_prep.formats.transforms import sft, openai_chat
# SFT format: {"input": "...", "output": "..."}
config = PipelineConfig(
output=OutputConfig(
dir=Path("./sft_data"),
format=JsonlOutputConfig(
transform=sft(input="instruction", output="response"),
compression="zstd", # Optional compression
),
),
)
# OpenAI chat format: {"messages": [...]}
config = PipelineConfig(
output=OutputConfig(
dir=Path("./rl_data"),
format=JsonlOutputConfig(
transform=openai_chat(),
),
),
)
Packed#
Packed sequences for efficient SFT training:
from nemotron.data_prep.config import PackedOutputConfig
config = PipelineConfig(
tokenizer=TokenizerConfig(model="meta-llama/Llama-3.2-1B"),
output=OutputConfig(
dir=Path("./packed_data"),
format=PackedOutputConfig(
pack_size=4096,
algorithm="first_fit_shuffle",
),
),
)
Chat SFT (Packed with Loss Masking)#
Chat-templated SFT with role-based loss masking. This format applies chat templates to OpenAI-format messages, tokenizes them, and produces packed sequences with a loss mask that zeros out system/user tokens:
from nemotron.data_prep import last_mile_process, DataBlend, PipelineConfig
from nemotron.data_prep.config import OutputConfig, ChatSftOutputConfig, TokenizerConfig
blend = DataBlend.load("chat_data.json")
config = PipelineConfig(
tokenizer=TokenizerConfig(model="nvidia/NVIDIA-Nemotron-Nano-9B-v2"),
output=OutputConfig(
dir=Path("./chat_sft"),
format=ChatSftOutputConfig(
chat_template="nano3", # Built-in template or path to .jinja file
messages_field="messages", # Field containing OpenAI-format messages
pack_size=4096, # Maximum tokens per packed sequence
algorithm="first_fit_shuffle",
),
),
)
result = last_mile_process(blend, config)
Input format (OpenAI chat messages):
{
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": "Hello!"},
{"role": "assistant", "content": "Hi there!"}
]
}
Output:
.npyfiles with packedinput_idsandloss_maskarraysLoss mask:
0for system/user tokens,1for assistant tokensMetadata files for Megatron-Bridge compatibility
Transforms#
Transforms convert input records to the desired output format. They are callables that take a dict and return a dict (or None to skip the record).
Built-in Transform Factories#
from nemotron.data_prep.formats.transforms import (
sft, # SFT format: {input, output}
openai_chat, # OpenAI format: {messages: [...]}
sharegpt, # ShareGPT format: {conversations: [...]}
nemotron_rl, # Nemotron RL format: {messages, tools}
passthrough, # Pass records unchanged
select, # Select specific fields
rename, # Rename fields
)
sft()#
Creates SFT format output:
transform = sft(
input="instruction", # Source field for input
output="response", # Source field for output
system="system_prompt" # Optional system prompt field
)
# Input: {"instruction": "Hello", "response": "Hi!", "system_prompt": "Be helpful"}
# Output: {"input": "Hello", "output": "Hi!", "system": "Be helpful"}
openai_chat()#
Creates OpenAI chat format:
transform = openai_chat(messages="conversation")
# Input: {"conversation": [{"role": "user", "content": "Hi"}]}
# Output: {"messages": [{"role": "user", "content": "Hi"}]}
nemotron_rl()#
Extracts messages and tools from Nemotron RL dataset format:
transform = nemotron_rl()
# Input: {
# "responses_create_params": {
# "input": [{"role": "user", "content": "Hi"}],
# "tools": [{"name": "search", ...}]
# }
# }
# Output: {"messages": [{"role": "user", "content": "Hi"}], "tools": [...]}
Records without valid responses_create_params.input are skipped.
passthrough()#
Passes records unchanged:
transform = passthrough()
# Input: {"any": "data"}
# Output: {"any": "data"}
select()#
Selects specific fields:
transform = select("id", "text")
# Input: {"id": 1, "text": "hello", "extra": "ignored"}
# Output: {"id": 1, "text": "hello"}
rename()#
Renames fields:
transform = rename(input="question", output="answer")
# Input: {"question": "What?", "answer": "This."}
# Output: {"input": "What?", "output": "This."}
Custom Transforms#
You can use any callable:
# Lambda
transform = lambda r: {"input": r["q"], "output": r["a"]} if r.get("valid") else None
# Function
def my_transform(record: dict) -> dict | None:
if len(record.get("text", "")) < 10:
return None # Skip short records
return {"input": record["question"], "output": record["answer"]}
Filtering Records#
Return None from a transform to skip records. This is useful for filtering out low-quality or malformed data:
def filter_by_length(min_chars: int = 100) -> Transform:
"""Skip records with text shorter than min_chars."""
def transform(record: dict) -> dict | None:
text = record.get("text", "")
if len(text) < min_chars:
return None # Record will be skipped
return record
return transform
# Usage
config = PipelineConfig(
output=OutputConfig(
dir=Path("./filtered"),
format=JsonlOutputConfig(transform=filter_by_length(min_chars=200)),
),
)
Built-in transforms also filter: for example, nemotron_rl() skips records missing required fields.
Per-Dataset Shard Allocation#
When a blend includes multiple datasets, shard counts are now allocated per dataset
instead of using a single global count. The total shard budget comes from
num_shards or shard_size, and is distributed proportionally by dataset weight
and estimated size, with at least one shard per dataset and a cap based on the
number of input files (to avoid empty shards). Dataset weights still control
training mixture ratios; shard allocation is only a sizing heuristic.
Note: Weight vs Shard Counts
Dataset.weight (e.g., 0.7, 0.3): Controls training-time sampling in Megatron-Bridge. A blend with weights [0.7, 0.3] means 70% of training samples come from dataset 1 during training.
Shard counts: Controlled by
shard_size(recommended) or explicitnum_shards. These determine how many physical output files are created during data preparation, independent of weights.For blends with datasets of different sizes, use
shard_size="256MB"instead of explicitnum_shardsto let each dataset get an appropriate shard count based on its size.
blend.json now includes a num_shards map with the effective per-dataset counts:
{
"data_paths": ["1.0", "/path/to/ds1/shard", "0.3", "/path/to/ds2/shard"],
"num_shards": {"ds1": 120, "ds2": 8},
"split": "99990,8,2"
}
Per-Split Output#
Generate separate train/valid/test outputs using PerSplitConfig:
from nemotron.data_prep import DataPrepConfig, PerSplitConfig
from pathlib import Path
config = DataPrepConfig(
blend_path=Path("blend.json"),
output_dir=Path("./output"),
tokenizer_model="nvidia/NVIDIA-Nemotron-Nano-9B-v2",
per_split=PerSplitConfig(
enabled=True,
valid_shards=1, # Number of validation shards
test_shards=1, # Number of test shards
),
)
Output structure:
output/
├── train/
│ ├── shard_000000.bin
│ ├── shard_000000.idx
│ └── ...
├── valid/
│ └── shard_000000.bin/.idx
├── test/
│ └── shard_000000.bin/.idx
└── blend.json
blend.json format (per-split mode):
{
"train": [["1.0", "/path/to/train/shard_000000"], ["1.0", "/path/to/train/shard_000001"]],
"valid": [["1.0", "/path/to/valid/shard_000000"]],
"test": [["1.0", "/path/to/test/shard_000000"]],
"num_shards": {
"train": {"train_ds": 128},
"valid": {"valid_ds": 2},
"test": {"test_ds": 2}
}
}
This format is directly compatible with Megatron-Bridge’s per-split data loading.
Type Definitions#
TypedDicts are provided for type safety:
from nemotron.data_prep.formats.transforms import (
SftRecord, # {"input": str, "output": str}
OpenAIChatRecord, # {"messages": list[Message]}
ShareGPTRecord, # {"conversations": list[Conversation]}
Message, # {"role": str, "content": str}
)
API Reference#
Main Functions#
Function |
Description |
|---|---|
|
High-level entry point for tokenization |
|
Low-level entry point with format dispatch |
|
Deprecated alias for |
Configuration Classes#
Class |
Description |
|---|---|
|
High-level configuration for |
|
Low-level pipeline configuration |
|
Tokenizer settings (model, add_bos, add_eos) |
|
Output directory and format |
|
Tokenized binary format options |
|
JSONL format options |
|
Packed sequence format options |
Result Classes#
Class |
Description |
|---|---|
|
Complete pipeline result with all splits |
|
Result for a single split (train/valid/test) |
|
Artifact with blend.json path and metrics |
Compression#
JSONL output supports optional zstd compression:
format=JsonlOutputConfig(
compression="zstd", # Output .jsonl.zst files
)
Requires the zstandard package: uv pip install zstandard
Dependencies#
Core dependencies:
ray- Parallel processingpyarrow- Parquet file readingxxhash- Fast checksums
Optional dependencies:
orjson- Fast JSON serialization (falls back to stdlib json)zstandard- Zstd compression for JSONL output
Further Reading#
NeMo Curator — Data curation at scale (coming soon)
NeMo Data Designer — Synthetic data generation (coming soon)
NVIDIA AI Stack — Megatron-Core, Megatron-Bridge, NeMo-RL documentation
Execution through NeMo-Run — Job orchestration and execution profiles
CLI Framework — CLI building and recipe decorators
Artifact Lineage — W&B artifact system and lineage tracking
Nano3 Recipe — Complete training recipe example