Skip to content

dp_utils

dp_utils

DP training utilities for Hugging Face Trainer and data collation.

Provides OpacusDPTrainer (DP-aware Trainer with entity-level sampling and Opacus optimizer), DPCallback for Trainer hooks, data collators that expose position_ids for per-sample gradients, and GradSampleModule wrapper with no_sync support.

Classes:

Name Description
DPCallback

Trainer callback that integrates Opacus DP-SGD with transformers.Trainer.

DataCollatorForPrivateCausalLanguageModeling

Adds position_ids for Opacus per-sample gradients.

DataCollatorForPrivateTokenClassification

Collator for token classification that adds position_ids for Opacus.

GradSampleModule

Opacus GradSampleModule with no_sync for Hugging Face Trainer.

OpacusDPTrainer

DP-aware Trainer for PEFT/LoRA fine-tuning with Opacus.

Functions:

Name Description
create_entity_mapping

Build a mapping from each entity to its dataset indices.

DPCallback(noise_multiplier, sampling_probability, accountant, max_epsilon=float('inf'))

Bases: TrainerCallback

Trainer callback that integrates Opacus DP-SGD with transformers.Trainer.

Handles per-step optimizer behavior (skip signal, step, zero_grad), optional RDP step accounting, and early stopping when max_epsilon is exceeded. Used with OpacusDPTrainer; the trainer injects this callback when privacy arguments are enabled.

Parameters:

Name Type Description Default
noise_multiplier float

Gaussian noise scale for gradients.

required
sampling_probability float

Probability of a record being in a batch.

required
accountant SafeSynthesizerAccountant

Privacy accountant for epsilon computation and (if RDP) step tracking.

required
max_epsilon float

Stop training when computed epsilon exceeds this value.

float('inf')

Methods:

Name Description
on_substep_end

Run DP optimizer step at the end of each gradient-accumulation substep.

on_step_end

Clear gradients and update RDP accountant at the end of each optimizer step.

on_save

Called when the Trainer is about to save a checkpoint. Ensures training

on_evaluate

Check epsilon budget and stop training if max_epsilon is exceeded.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def __init__(
    self,
    noise_multiplier: float,
    sampling_probability: float,
    accountant: SafeSynthesizerAccountant,
    max_epsilon: float = float("inf"),
) -> None:
    self.accountant = accountant
    self._max_epsilon = max_epsilon
    self._on_substep_end_was_called = False

    self.noise_multiplier = noise_multiplier
    self.sampling_probability = sampling_probability

on_substep_end(args, state, control, optimizer=None, **kwargs)

Run DP optimizer step at the end of each gradient-accumulation substep.

Signals the Opacus optimizer to skip the step, calls step() and zero_grad() on the underlying DP optimizer (or the optimizer itself if not wrapped by Accelerate). Required when using gradient accumulation so that the optimizer step runs once per micro-batch.

Parameters:

Name Type Description Default
args TrainingArguments

HF Trainer arguments.

required
state TrainerState

Current trainer state.

required
control TrainerControl

Trainer control object (not modified).

required
optimizer Optimizer | None

The Trainer's optimizer (Opacus DP optimizer or AcceleratedOptimizer wrapping it).

None
**kwargs

Additional callback keyword arguments.

{}

Raises:

Type Description
RuntimeError

If optimizer is None (callback cannot access optimizer).

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def on_substep_end(
    self,
    args: training_args.TrainingArguments,
    state: TrainerState,
    control: TrainerControl,
    optimizer: torch.optim.Optimizer | None = None,
    **kwargs,
) -> None:
    """Run DP optimizer step at the end of each gradient-accumulation substep.

    Signals the Opacus optimizer to skip the step, calls ``step()`` and
    ``zero_grad()`` on the underlying DP optimizer (or the optimizer itself
    if not wrapped by Accelerate). Required when using gradient accumulation
    so that the optimizer step runs once per micro-batch.

    Args:
        args: HF Trainer arguments.
        state: Current trainer state.
        control: Trainer control object (not modified).
        optimizer: The Trainer's optimizer (Opacus DP optimizer or AcceleratedOptimizer wrapping it).
        **kwargs: Additional callback keyword arguments.

    Raises:
        RuntimeError: If optimizer is None (callback cannot access optimizer).
    """
    if optimizer is None:
        raise RuntimeError("Impossible to access optimizer from inside callback")
    if isinstance(optimizer, AcceleratedOptimizer):
        dp_optimizer = optimizer.optimizer
    else:
        dp_optimizer = optimizer
    dp_optimizer.signal_skip_step(do_skip=True)  # ty: ignore[unresolved-attribute]
    dp_optimizer.step()
    dp_optimizer.zero_grad()

    self._on_substep_end_was_called = True

on_step_end(args, state, control, optimizer=None, **kwargs)

Clear gradients and update RDP accountant at the end of each optimizer step.

Calls zero_grad() on the optimizer (Opacus expects this; Trainer does not call it by default). When using the RDP accountant (not PRV), increments the accountant step for accurate epsilon calculation.

Parameters:

Name Type Description Default
args TrainingArguments

Trainer training arguments (used to check gradient_accumulation_steps).

required
state TrainerState

Current trainer state.

required
control TrainerControl

Trainer control object (not modified).

required
optimizer Optimizer | None

The Trainer's optimizer (required for zero_grad()).

None
**kwargs

Additional callback keyword arguments.

{}

Raises:

Type Description
RuntimeError

If gradient accumulation is used but on_substep_end was never called (e.g. transformers < 4.10.0), or if optimizer is None.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def on_step_end(
    self,
    args: training_args.TrainingArguments,
    state: TrainerState,
    control: TrainerControl,
    optimizer: torch.optim.Optimizer | None = None,
    **kwargs,
) -> None:
    """Clear gradients and update RDP accountant at the end of each optimizer step.

    Calls ``zero_grad()`` on the optimizer (Opacus expects this; Trainer does not
    call it by default). When using the RDP accountant (not PRV), increments the
    accountant step for accurate epsilon calculation.

    Args:
        args: Trainer training arguments (used to check gradient_accumulation_steps).
        state: Current trainer state.
        control: Trainer control object (not modified).
        optimizer: The Trainer's optimizer (required for ``zero_grad()``).
        **kwargs: Additional callback keyword arguments.

    Raises:
        RuntimeError: If gradient accumulation is used but ``on_substep_end`` was
            never called (e.g. transformers < 4.10.0), or if optimizer is None.
    """
    if args.gradient_accumulation_steps > 1 and not self._on_substep_end_was_called:
        raise RuntimeError(
            "Gradient accumulation was specified but `on_substep_end` wasn't called. "
            "Make sure you're using a recent version of transformers (>=4.10.0) "
            "which has an appropriate callback in the trainer."
        )
    if optimizer is None:
        raise RuntimeError(
            "No optimizer provided to on_step_end callback, required for correct DP-SGD to call zero_grad()"
        )

    optimizer.zero_grad()  # Opacus needs .zero_grad() on the optimizer, HF doesn't call by default.
    if not self.accountant.use_prv:
        # Use RDPAccountant, which uses `.step()` to increment number of
        # steps, required for accurate epsilon calculation.
        acct = cast("RDPAccountant", self.accountant.accountant)
        acct.step(
            noise_multiplier=self.noise_multiplier,
            sample_rate=self.sampling_probability,
        )

on_save(args, state, control, **kwargs)

Called when the Trainer is about to save a checkpoint. Ensures training stops before saving if the privacy budget would be exceeded.

Parameters:

Name Type Description Default
args TrainingArguments

HF Trainer arguments.

required
state TrainerState

Current trainer state (used for global_step).

required
control TrainerControl

Trainer control object; should_training_stop may be set to True.

required
**kwargs

Additional callback keyword arguments.

{}

Returns:

Type Description
TrainerControl

TrainerControl with should_training_stop set to True if current

TrainerControl

epsilon exceeds max_epsilon, otherwise unchanged.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def on_save(
    self,
    args: training_args.TrainingArguments,
    state: TrainerState,
    control: TrainerControl,
    **kwargs,
) -> TrainerControl:
    """Called when the Trainer is about to save a checkpoint. Ensures training
    stops before saving if the privacy budget would be exceeded.

    Args:
        args: HF Trainer arguments.
        state: Current trainer state (used for global_step).
        control: Trainer control object; ``should_training_stop`` may be set to True.
        **kwargs: Additional callback keyword arguments.

    Returns:
        TrainerControl with ``should_training_stop`` set to True if current
        epsilon exceeds ``max_epsilon``, otherwise unchanged.
    """
    return self._check_max_epsilon_exceeded(state, control)

on_evaluate(args, state, control, **kwargs)

Check epsilon budget and stop training if max_epsilon is exceeded.

Called when the Trainer runs evaluation. Ensures training stops before further steps if the privacy budget would be exceeded.

Parameters:

Name Type Description Default
args TrainingArguments

HF Trainer arguments.

required
state TrainerState

Current trainer state (used for global_step).

required
control TrainerControl

Trainer control object; should_training_stop may be set to True.

required
**kwargs

Additional callback keyword arguments.

{}

Returns:

Type Description
TrainerControl

TrainerControl with should_training_stop set to True if current

TrainerControl

epsilon exceeds max_epsilon, otherwise unchanged.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def on_evaluate(
    self,
    args: training_args.TrainingArguments,
    state: TrainerState,
    control: TrainerControl,
    **kwargs,
) -> TrainerControl:
    """Check epsilon budget and stop training if ``max_epsilon`` is exceeded.

    Called when the Trainer runs evaluation. Ensures training stops before
    further steps if the privacy budget would be exceeded.

    Args:
        args: HF Trainer arguments.
        state: Current trainer state (used for global_step).
        control: Trainer control object; ``should_training_stop`` may be set to True.
        **kwargs: Additional callback keyword arguments.

    Returns:
        TrainerControl with ``should_training_stop`` set to True if current
        epsilon exceeds ``max_epsilon``, otherwise unchanged.
    """
    return self._check_max_epsilon_exceeded(state, control)

DataCollatorForPrivateCausalLanguageModeling(tokenizer)

Bases: DataCollatorForLanguageModeling

Adds position_ids for Opacus per-sample gradients.

Trainer and model code often create position_ids inside the model forward pass, which Opacus cannot see. This collator builds position_ids during batching so they are present in the batch and available for per-sample gradient computation. See https://github.com/huggingface/transformers/blob/5c1c72be5f864d10d0efe8ece0768d9ed6ee4fdd/src/transformers/models/mistral/modeling_mistral.py#L379 for an example.

Parameters:

Name Type Description Default
tokenizer PreTrainedTokenizer

Tokenizer for padding and encoding.

required
Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def __init__(self, tokenizer: PreTrainedTokenizer):
    super().__init__(tokenizer=tokenizer, mlm=False)

DataCollatorForPrivateTokenClassification(tokenizer)

Bases: DataCollatorForTokenClassification

Collator for token classification that adds position_ids for Opacus.

Same rationale as DataCollatorForPrivateCausalLanguageModeling: ensures position_ids are in the batch for per-sample gradient computation.

Parameters:

Name Type Description Default
tokenizer PreTrainedTokenizer

Tokenizer for padding and encoding.

required
Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def __init__(self, tokenizer: PreTrainedTokenizer):
    super().__init__(tokenizer=tokenizer)

GradSampleModule

Bases: GradSampleModule

Opacus GradSampleModule with no_sync for Hugging Face Trainer.

Trainer expects a no_sync context manager to defer gradient sync in distributed settings. This wrapper provides a no-op no_sync so the Trainer API is satisfied.

Methods:

Name Description
no_sync

Context manager that does nothing; required by Trainer's expected API.

no_sync()

Context manager that does nothing; required by Trainer's expected API.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
@contextmanager
def no_sync(self) -> Iterator[None]:
    """Context manager that does nothing; required by Trainer's expected API."""
    yield

OpacusDPTrainer(train_dataset, model, args=None, privacy_args=None, data_fraction=None, true_dataset_size=None, entity_column_values=None, callbacks=None, secure_mode=True, **kwargs)

Bases: Trainer

DP-aware Trainer for PEFT/LoRA fine-tuning with Opacus.

Adapts Hugging Face Trainer for differential privacy: uses entity-level (or record-level) sampling, wraps the model in GradSampleModule and the optimizer in Opacus DPOptimizer, and avoids double-scaling of loss by gradient accumulation. Saves only the PEFT/LoRA adapter weights.

Parameters:

Name Type Description Default
train_dataset Dataset

Dataset for training.

required
model PreTrainedModel | Module

Base model (will be wrapped with GradSampleModule).

required
args TrainingArguments | None

Training arguments (e.g. TrainingArguments).

None
privacy_args PrivacyArguments | None

DP parameters (epsilon, delta, noise, clipping). Required.

None
data_fraction float | None

If set, scales effective number of epochs for privacy math.

None
true_dataset_size int | None

Override number of entities/records for privacy accounting.

None
entity_column_values list | None

If set, entity-level DP; each value is the entity ID for the corresponding dataset row. If None, record-level DP (one entity per row).

None
callbacks list[TrainerCallback] | None

Additional Trainer callbacks.

None
secure_mode bool | None

If True, use secure RNG for noise (recommended).

True
**kwargs Any

Passed to Trainer (e.g. eval_dataset, tokenizer, data_collator).

{}

Attributes:

Name Type Description
accountant

Privacy accountant used for epsilon computation.

entity_mapping

For entity i, list of dataset indices in that entity.

Methods:

Name Description
get_epsilon

Calculate the epsilon after model training completes.

create_optimizer

Create the base optimizer then wrap it with Opacus DPOptimizer.

training_step

Run one training step and return the loss scaled for logging.

get_train_dataloader

Returns a torch DataLoader that uses an entity-level sampler.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def __init__(
    self,
    train_dataset: Dataset,
    model: modeling_utils.PreTrainedModel | torch.nn.Module,
    args: training_args.TrainingArguments | None = None,
    privacy_args: PrivacyArguments | None = None,
    data_fraction: float | None = None,
    true_dataset_size: int | None = None,
    entity_column_values: list | None = None,
    callbacks: list[TrainerCallback] | None = None,
    secure_mode: bool | None = True,
    **kwargs: Any,
) -> None:
    if args is None:
        raise ValueError("TrainingArguments (args) is required for OpacusDPTrainer")
    if privacy_args is None:
        raise ValueError("PrivacyArguments is required for OpacusDPTrainer")
    self.train_args = args
    self.privacy_args = privacy_args
    self.secure_mode = secure_mode

    if entity_column_values is None:
        # Record-level DP == mapping each sample to a unique entity.
        self.entity_mapping = [[i] for i in range(train_dataset.num_rows)]
    else:
        self.entity_mapping = create_entity_mapping(entity_column_values=entity_column_values)

    # Adjustments for NavFT
    self.true_num_epochs = self.train_args.num_train_epochs
    self.true_dataset_size = len(self.entity_mapping)

    if data_fraction is not None:
        self.true_num_epochs *= data_fraction
        logger.info(
            f"True number of epochs set to {self.true_num_epochs}",
        )
    if true_dataset_size is not None:
        self.true_dataset_size = true_dataset_size
        logger.info(
            (
                f"Training dataset contains {self.true_dataset_size} unique "
                f"{'groups' if entity_column_values else 'records'}; using this "
                "value for differential privacy parameter determination."
            ),
        )

    if not self.privacy_args.is_initialized:
        self.privacy_args.initialize(
            sampling_probability=self.sampling_probability,
            num_steps=self.num_steps,
        )
    pa = self.privacy_args
    assert pa.use_prv is not None
    assert pa.noise_multiplier is not None

    model = GradSampleModule(model)

    super().__init__(
        model=model,
        args=args,
        train_dataset=train_dataset,
        callbacks=callbacks,
        **kwargs,
    )
    self.accountant = SafeSynthesizerAccountant(
        use_prv=pa.use_prv,
        noise_multiplier=pa.noise_multiplier,
        sampling_probability=self.sampling_probability,
        delta=self.privacy_args.target_delta,
        num_steps=self.num_steps,
    )
    self.dp_callback = DPCallback(
        noise_multiplier=pa.noise_multiplier,
        sampling_probability=self.sampling_probability,
        accountant=self.accountant,
        max_epsilon=float("inf") if self.privacy_args.target_epsilon is None else self.privacy_args.target_epsilon,
    )
    self.add_callback(self.dp_callback)

sampling_probability property

Probability that an entity is included in a batch (capped at 1.0).

For record-level DP (one entity per row), it is \(min(1, (per_device_batch_size × gradient_accumulation_steps) / n_entities)\). For entity-level DP, n_entities can be small so the ratio may exceed 1; the result is capped at 1.0. Used as the sampling probability in the privacy accountant for ε computation.

num_steps property

The number of optimizer steps used for privacy accounting.

Either user-supplied (via max_steps when true_num_epochs == -1) or determined from num_train_epochs. When the user specifies num_train_epochs, we determine num_steps from sampling_probability so we pass over each entity roughly once per epoch, similarly to passing over each record once per epoch in record-level training.

Always at least 1, because we add 1 to 1 / sampling_probability; this can happen when there are fewer entities than batch_size * gradient_accumulation_steps (e.g. 4 * 8 = 32). Used to determine the privacy budget (noise multiplier and epsilon) during training.

get_epsilon()

Calculate the epsilon after model training completes.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def get_epsilon(self) -> float:
    """Calculate the epsilon after model training completes."""
    return self.accountant.compute_epsilon(self.state.global_step)

create_optimizer()

Create the base optimizer then wrap it with Opacus DPOptimizer.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def create_optimizer(self):
    """Create the base optimizer then wrap it with Opacus DPOptimizer."""
    _ = super().create_optimizer()

    class DPOptimizer(opacus.optimizers.DPOptimizer):
        """DPOptimizer that delegates ``param_groups`` to the inner optimizer.

        Hugging Face AcceleratedOptimizer replaces ``param_groups``; Opacus
        expects to mutate it. This subclass forwards get/set to the inner
        optimizer so learning rate scheduling and other param_group updates work.
        """

        @property
        def param_groups(self) -> list:
            return self.original_optimizer.param_groups

        @param_groups.setter
        def param_groups(self, param_groups: list) -> None:
            self.original_optimizer.param_groups = param_groups

    optimizer_generator = DPOptimizer

    # TODO: explore better mitigation for precision based attacks on finite
    # precision devices
    # https://tpdp.journalprivacyconfidentiality.org/2022/papers/HaneyDHSH22.pdf
    pa = self.privacy_args
    assert pa is not None and pa.per_sample_max_grad_norm is not None and pa.noise_multiplier is not None
    assert self.optimizer is not None
    self.optimizer = optimizer_generator(
        optimizer=self.optimizer,
        noise_multiplier=pa.noise_multiplier,
        max_grad_norm=pa.per_sample_max_grad_norm,
        expected_batch_size=self.args.per_device_train_batch_size * self.args.gradient_accumulation_steps,
        secure_mode=self.secure_mode,
    )

    return self.optimizer

training_step(model, inputs, num_items_in_batch=None)

Run one training step and return the loss scaled for logging.

Forward pass and backward are performed as usual. Loss is not scaled by batch size or per-sample factors here: Opacus handles per-sample gradient scaling. The returned value is the raw loss divided by gradient_accumulation_steps so that the logged loss matches the effective per-step loss (averaged over accumulation steps).

Parameters:

Name Type Description Default
model Module

The model to train (wrapped in GradSampleModule).

required
inputs dict[str, Tensor | Any]

Batch of inputs (e.g. input_ids, labels, position_ids).

required
num_items_in_batch Tensor | None

Unused; passed for API compatibility. Opacus handles scaling; we pass None to avoid double-scaling.

None

Returns:

Type Description
Tensor

Detached loss tensor scaled by 1 / gradient_accumulation_steps,

Tensor

for logging only (optimizer step is driven by the callback).

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def training_step(
    self,
    model: nn.Module,
    inputs: dict[str, torch.Tensor | Any],
    num_items_in_batch: torch.Tensor | None = None,
) -> torch.Tensor:
    """Run one training step and return the loss scaled for logging.

    Forward pass and backward are performed as usual. Loss is not scaled by
    batch size or per-sample factors here: Opacus handles per-sample gradient
    scaling. The returned value is the raw loss divided by
    ``gradient_accumulation_steps`` so that the logged loss matches the
    effective per-step loss (averaged over accumulation steps).

    Args:
        model: The model to train (wrapped in ``GradSampleModule``).
        inputs: Batch of inputs (e.g. ``input_ids``, ``labels``, ``position_ids``).
        num_items_in_batch: Unused; passed for API compatibility. Opacus
            handles scaling; we pass ``None`` to avoid double-scaling.

    Returns:
        Detached loss tensor scaled by 1 / ``gradient_accumulation_steps``,
        for logging only (optimizer step is driven by the callback).
    """
    model.train()
    getattr(self.optimizer, "train", lambda: None)()

    # Compared to the original HF implementation (as of 4.48), we use
    # `num_items_in_batch=None` to avoid any extra scaling, since Opacus
    # already does it; we only divide the loss by the number of gradient
    # accumulation steps after loss.backward(), to get correct logging
    inputs = self._prepare_inputs(inputs)
    with self.compute_loss_context_manager():
        try:
            loss = self.compute_loss(model, inputs, num_items_in_batch=None)
        except TypeError:  # older transformers
            loss = self.compute_loss(model, inputs)
    del inputs

    loss.backward()

    return loss.detach() / self.args.gradient_accumulation_steps

get_train_dataloader()

Returns a torch DataLoader that uses an entity-level sampler.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def get_train_dataloader(self) -> DataLoader:
    """Returns a torch DataLoader that uses an entity-level sampler."""
    train_dataset = self.train_dataset
    assert isinstance(train_dataset, Dataset)
    train_sampler = self._get_train_sampler(train_dataset)
    return DataLoader(
        cast(TorchDataset, train_dataset),
        batch_sampler=train_sampler,
        collate_fn=self.data_collator,
        drop_last=self.args.dataloader_drop_last,
        num_workers=self.args.dataloader_num_workers,
        pin_memory=self.args.dataloader_pin_memory,
    )

create_entity_mapping(entity_column_values)

Build a mapping from each entity to its dataset indices.

Groups rows by the entity column; each group's indices are the dataset positions for that entity. Entity order follows groupby sort; order within a group is preserved.

Parameters:

Name Type Description Default
entity_column_values list

List of entity IDs aligned with dataset rows (e.g. one value per row in the same order).

required

Returns:

Type Description
Sequence[Sequence[int]]

Sequence of sequences: for entity i, result[i] is the list of dataset

Sequence[Sequence[int]]

indices belonging to that entity.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def create_entity_mapping(entity_column_values: list) -> Sequence[Sequence[int]]:
    """Build a mapping from each entity to its dataset indices.

    Groups rows by the entity column; each group's indices are the dataset
    positions for that entity. Entity order follows groupby sort; order within
    a group is preserved.

    Args:
        entity_column_values: List of entity IDs aligned with dataset rows
            (e.g. one value per row in the same order).

    Returns:
        Sequence of sequences: for entity i, result[i] is the list of dataset
        indices belonging to that entity.
    """
    entities = pd.DataFrame(data={"entity": entity_column_values})
    # Using `groupby("entity")` - note that the entities returned by groupby are
    # sorted, but the order of records in each group is preserved.
    # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.groupby.html
    # TODO: improve for use in sampler.py using a dictionary or such structure
    # with clearly defined entity_ids
    entity_mapping = [g.index.tolist() for _, g in entities.groupby("entity")]
    return entity_mapping