Skip to content

dp_utils

dp_utils

DP training utilities for Hugging Face Trainer and data collation.

Provides OpacusDPTrainer (DP-aware Trainer with entity-level sampling and Opacus optimizer), DPCallback for Trainer hooks, data collators that expose position_ids for per-sample gradients, and GradSampleModule wrapper with no_sync support.

Classes:

Name Description
DPCallback

Trainer callback that integrates Opacus DP-SGD with transformers.Trainer.

DataCollatorForPrivateCausalLanguageModeling

Adds position_ids for Opacus per-sample gradients.

DataCollatorForPrivateTokenClassification

Collator for token classification that adds position_ids for Opacus.

GradSampleModule

Opacus GradSampleModule with no_sync for Hugging Face Trainer.

OpacusDPTrainer

DP-aware Trainer for PEFT/LoRA fine-tuning with Opacus.

Functions:

Name Description
create_entity_mapping

Build a mapping from each entity to its dataset indices.

DPCallback(noise_multiplier, sampling_probability, accountant, max_epsilon=float('inf'))

Bases: TrainerCallback

Trainer callback that integrates Opacus DP-SGD with transformers.Trainer.

Handles per-step optimizer behavior (skip signal, step, zero_grad), optional RDP step accounting, and early stopping when max_epsilon is exceeded. Used with OpacusDPTrainer; the trainer injects this callback when privacy arguments are enabled.

Parameters:

Name Type Description Default
noise_multiplier float

Gaussian noise scale for gradients.

required
sampling_probability float

Probability of a record being in a batch.

required
accountant SafeSynthesizerAccountant

Privacy accountant for epsilon computation and (if RDP) step tracking.

required
max_epsilon float

Stop training when computed epsilon exceeds this value.

float('inf')

Methods:

Name Description
on_substep_end

Run DP optimizer step at the end of each gradient-accumulation substep.

on_step_end

Clear gradients and update RDP accountant at the end of each optimizer step.

on_save

Called when the Trainer is about to save a checkpoint. Ensures training

on_evaluate

Check epsilon budget and stop training if max_epsilon is exceeded.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def __init__(
    self,
    noise_multiplier: float,
    sampling_probability: float,
    accountant: SafeSynthesizerAccountant,
    max_epsilon: float = float("inf"),
) -> None:
    self.accountant = accountant
    self._max_epsilon = max_epsilon
    self._on_substep_end_was_called = False

    self.noise_multiplier = noise_multiplier
    self.sampling_probability = sampling_probability

on_substep_end(args, state, control, optimizer=None, **kwargs)

Run DP optimizer step at the end of each gradient-accumulation substep.

Signals the Opacus optimizer to skip the step, calls step() and zero_grad() on the underlying DP optimizer (or the optimizer itself if not wrapped by Accelerate). Required when using gradient accumulation so that the optimizer step runs once per micro-batch.

Parameters:

Name Type Description Default
args TrainingArguments

HF Trainer arguments.

required
state TrainerState

Current trainer state.

required
control TrainerControl

Trainer control object (not modified).

required
optimizer

The Trainer's optimizer (Opacus DP optimizer or AcceleratedOptimizer wrapping it).

None
**kwargs

Additional callback keyword arguments.

{}

Raises:

Type Description
RuntimeError

If optimizer is None (callback cannot access optimizer).

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def on_substep_end(
    self,
    args: training_args.TrainingArguments,
    state: TrainerState,
    control: TrainerControl,
    optimizer=None,
    **kwargs,
):
    """Run DP optimizer step at the end of each gradient-accumulation substep.

    Signals the Opacus optimizer to skip the step, calls ``step()`` and
    ``zero_grad()`` on the underlying DP optimizer (or the optimizer itself
    if not wrapped by Accelerate). Required when using gradient accumulation
    so that the optimizer step runs once per micro-batch.

    Args:
        args: HF Trainer arguments.
        state: Current trainer state.
        control: Trainer control object (not modified).
        optimizer: The Trainer's optimizer (Opacus DP optimizer or AcceleratedOptimizer wrapping it).
        **kwargs: Additional callback keyword arguments.

    Raises:
        RuntimeError: If optimizer is None (callback cannot access optimizer).
    """
    if optimizer is None:
        raise RuntimeError("Impossible to access optimizer from inside callback")
    if isinstance(optimizer, AcceleratedOptimizer):
        dp_optimizer = optimizer.optimizer
    else:
        dp_optimizer = optimizer
    dp_optimizer.signal_skip_step(do_skip=True)
    dp_optimizer.step()
    dp_optimizer.zero_grad()

    self._on_substep_end_was_called = True

on_step_end(args, state, control, optimizer=None, **kwargs)

Clear gradients and update RDP accountant at the end of each optimizer step.

Calls zero_grad() on the optimizer (Opacus expects this; Trainer does not call it by default). When using the RDP accountant (not PRV), increments the accountant step for accurate epsilon calculation.

Parameters:

Name Type Description Default
args TrainingArguments

Trainer training arguments (used to check gradient_accumulation_steps).

required
state TrainerState

Current trainer state.

required
control TrainerControl

Trainer control object (not modified).

required
optimizer

The Trainer's optimizer (required for zero_grad()).

None
**kwargs

Additional callback keyword arguments.

{}

Raises:

Type Description
RuntimeError

If gradient accumulation is used but on_substep_end was never called (e.g. transformers < 4.10.0), or if optimizer is None.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def on_step_end(
    self,
    args: training_args.TrainingArguments,
    state: TrainerState,
    control: TrainerControl,
    optimizer=None,
    **kwargs,
):
    """Clear gradients and update RDP accountant at the end of each optimizer step.

    Calls ``zero_grad()`` on the optimizer (Opacus expects this; Trainer does not
    call it by default). When using the RDP accountant (not PRV), increments the
    accountant step for accurate epsilon calculation.

    Args:
        args: Trainer training arguments (used to check gradient_accumulation_steps).
        state: Current trainer state.
        control: Trainer control object (not modified).
        optimizer: The Trainer's optimizer (required for ``zero_grad()``).
        **kwargs: Additional callback keyword arguments.

    Raises:
        RuntimeError: If gradient accumulation is used but ``on_substep_end`` was
            never called (e.g. transformers < 4.10.0), or if optimizer is None.
    """
    if args.gradient_accumulation_steps > 1 and not self._on_substep_end_was_called:
        raise RuntimeError(
            "Gradient accumulation was specified but `on_substep_end` wasn't called. "
            "Make sure you're using a recent version of transformers (>=4.10.0) "
            "which has an appropriate callback in the trainer."
        )
    if optimizer is None:
        raise RuntimeError(
            "No optimizer provided to on_step_end callback, required for correct DP-SGD to call zero_grad()"
        )

    optimizer.zero_grad()  # Opacus needs .zero_grad() on the optimizer, HF doesn't call by default.
    if not self.accountant.use_prv:
        # Use RDPAccountant, which uses `.step()` to increment number of
        # steps, required for accurate epsilon calculation.
        self.accountant.accountant.step(
            noise_multiplier=self.noise_multiplier,
            sample_rate=self.sampling_probability,
        )

on_save(args, state, control, **kwargs)

Called when the Trainer is about to save a checkpoint. Ensures training stops before saving if the privacy budget would be exceeded.

Parameters:

Name Type Description Default
args TrainingArguments

HF Trainer arguments.

required
state TrainerState

Current trainer state (used for global_step).

required
control TrainerControl

Trainer control object; should_training_stop may be set to True.

required
**kwargs

Additional callback keyword arguments.

{}

Returns:

Type Description

TrainerControl with should_training_stop set to True if current

epsilon exceeds max_epsilon, otherwise unchanged.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def on_save(
    self,
    args: training_args.TrainingArguments,
    state: TrainerState,
    control: TrainerControl,
    **kwargs,
):
    """Called when the Trainer is about to save a checkpoint. Ensures training
    stops before saving if the privacy budget would be exceeded.

    Args:
        args: HF Trainer arguments.
        state: Current trainer state (used for global_step).
        control: Trainer control object; ``should_training_stop`` may be set to True.
        **kwargs: Additional callback keyword arguments.

    Returns:
        TrainerControl with ``should_training_stop`` set to True if current
        epsilon exceeds ``max_epsilon``, otherwise unchanged.
    """
    return self._check_max_epsilon_exceeded(state, control)

on_evaluate(args, state, control, **kwargs)

Check epsilon budget and stop training if max_epsilon is exceeded.

Called when the Trainer runs evaluation. Ensures training stops before further steps if the privacy budget would be exceeded.

Parameters:

Name Type Description Default
args TrainingArguments

HF Trainer arguments.

required
state TrainerState

Current trainer state (used for global_step).

required
control TrainerControl

Trainer control object; should_training_stop may be set to True.

required
**kwargs

Additional callback keyword arguments.

{}

Returns:

Type Description

TrainerControl with should_training_stop set to True if current

epsilon exceeds max_epsilon, otherwise unchanged.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def on_evaluate(
    self,
    args: training_args.TrainingArguments,
    state: TrainerState,
    control: TrainerControl,
    **kwargs,
):
    """Check epsilon budget and stop training if ``max_epsilon`` is exceeded.

    Called when the Trainer runs evaluation. Ensures training stops before
    further steps if the privacy budget would be exceeded.

    Args:
        args: HF Trainer arguments.
        state: Current trainer state (used for global_step).
        control: Trainer control object; ``should_training_stop`` may be set to True.
        **kwargs: Additional callback keyword arguments.

    Returns:
        TrainerControl with ``should_training_stop`` set to True if current
        epsilon exceeds ``max_epsilon``, otherwise unchanged.
    """
    return self._check_max_epsilon_exceeded(state, control)

DataCollatorForPrivateCausalLanguageModeling(tokenizer)

Bases: DataCollatorForLanguageModeling

Adds position_ids for Opacus per-sample gradients.

Trainer and model code often create position_ids inside the model forward pass, which Opacus cannot see. This collator builds position_ids during batching so they are present in the batch and available for per-sample gradient computation. See https://github.com/huggingface/transformers/blob/5c1c72be5f864d10d0efe8ece0768d9ed6ee4fdd/src/transformers/models/mistral/modeling_mistral.py#L379 for an example.

Parameters:

Name Type Description Default
tokenizer PreTrainedTokenizer

Tokenizer for padding and encoding.

required
Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def __init__(self, tokenizer: PreTrainedTokenizer):
    super().__init__(tokenizer=tokenizer, mlm=False)

DataCollatorForPrivateTokenClassification(tokenizer)

Bases: DataCollatorForTokenClassification

Collator for token classification that adds position_ids for Opacus.

Same rationale as DataCollatorForPrivateCausalLanguageModeling: ensures position_ids are in the batch for per-sample gradient computation.

Parameters:

Name Type Description Default
tokenizer PreTrainedTokenizer

Tokenizer for padding and encoding.

required
Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def __init__(self, tokenizer: PreTrainedTokenizer):
    super().__init__(tokenizer=tokenizer)

GradSampleModule

Bases: GradSampleModule

Opacus GradSampleModule with no_sync for Hugging Face Trainer.

Trainer expects a no_sync context manager to defer gradient sync in distributed settings. This wrapper provides a no-op no_sync so the Trainer API is satisfied.

Methods:

Name Description
no_sync

Context manager that does nothing; required by Trainer's expected API.

no_sync()

Context manager that does nothing; required by Trainer's expected API.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
@contextmanager
def no_sync(self):
    """Context manager that does nothing; required by Trainer's expected API."""
    yield

OpacusDPTrainer(train_dataset, model, args=None, privacy_args=None, data_fraction=None, true_dataset_size=None, entity_column_values=None, callbacks=None, secure_mode=True, **kwargs)

Bases: Trainer

DP-aware Trainer for PEFT/LoRA fine-tuning with Opacus.

Adapts Hugging Face Trainer for differential privacy: uses entity-level (or record-level) sampling, wraps the model in GradSampleModule and the optimizer in Opacus DPOptimizer, and avoids double-scaling of loss by gradient accumulation. Saves only the PEFT/LoRA adapter weights.

Parameters:

Name Type Description Default
train_dataset Dataset

Dataset for training.

required
model PreTrainedModel | Module

Base model (will be wrapped with GradSampleModule).

required
args

Training arguments (e.g. TrainingArguments).

None
privacy_args PrivacyArguments | None

DP parameters (epsilon, delta, noise, clipping). Required.

None
data_fraction float | None

If set, scales effective number of epochs for privacy math.

None
true_dataset_size int | None

Override number of entities/records for privacy accounting.

None
entity_column_values list | None

If set, entity-level DP; each value is the entity ID for the corresponding dataset row. If None, record-level DP (one entity per row).

None
callbacks list[TrainerCallback] | None

Additional Trainer callbacks.

None
secure_mode bool | None

If True, use secure RNG for noise (recommended).

True
**kwargs dict

Passed to Trainer (e.g. eval_dataset, tokenizer, data_collator).

{}

Attributes:

Name Type Description
accountant

Privacy accountant used for epsilon computation.

entity_mapping

For entity i, list of dataset indices in that entity.

Methods:

Name Description
get_epsilon

Uses the trainer's privacy accountant and the current number of

create_optimizer

Create the base optimizer then wrap it with Opacus DPOptimizer.

training_step

Run one training step and return the loss scaled for logging.

get_train_dataloader

DataLoader with entity-level sampler and DP data collator.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def __init__(
    self,
    train_dataset: Dataset,
    model: modeling_utils.PreTrainedModel | torch.nn.Module,
    args=None,
    privacy_args: PrivacyArguments | None = None,
    data_fraction: float | None = None,
    true_dataset_size: int | None = None,
    entity_column_values: list | None = None,
    callbacks: list[TrainerCallback] | None = None,
    secure_mode: bool | None = True,
    **kwargs: dict,
) -> None:
    self.train_args = args
    self.privacy_args = privacy_args
    self.secure_mode = secure_mode

    if entity_column_values is None:
        # Record-level DP == mapping each sample to a unique entity.
        self.entity_mapping = [[i] for i in range(train_dataset.num_rows)]
    else:
        self.entity_mapping = create_entity_mapping(entity_column_values=entity_column_values)

    # Adjustments for NavFT
    self.true_num_epochs = self.train_args.num_train_epochs
    self.true_dataset_size = len(self.entity_mapping)

    if data_fraction is not None:
        self.true_num_epochs *= data_fraction
        logger.info(
            f"True number of epochs set to {self.true_num_epochs}",
        )
    if true_dataset_size is not None:
        self.true_dataset_size = true_dataset_size
        logger.info(
            (
                f"Training dataset contains {self.true_dataset_size} unique "
                f"{'groups' if entity_column_values else 'records'}; using this "
                "value for differential privacy parameter determination."
            ),
        )

    if not self.privacy_args.is_initialized:
        self.privacy_args.initialize(
            sampling_probability=self.sampling_probability,
            num_steps=self.num_steps,
        )

    model = GradSampleModule(model)

    super().__init__(
        model=model,
        args=args,
        train_dataset=train_dataset,
        callbacks=callbacks,
        **kwargs,
    )
    self.accountant = SafeSynthesizerAccountant(
        use_prv=self.privacy_args.use_prv,
        noise_multiplier=self.privacy_args.noise_multiplier,
        sampling_probability=self.sampling_probability,
        delta=self.privacy_args.target_delta,
        num_steps=self.num_steps,
    )
    self.dp_callback = DPCallback(
        noise_multiplier=self.privacy_args.noise_multiplier,
        sampling_probability=self.sampling_probability,
        accountant=self.accountant,
        max_epsilon=self.privacy_args.target_epsilon,
    )
    self.add_callback(self.dp_callback)

sampling_probability property

Probability that an entity is included in a batch (capped at 1.0).

For record-level DP (one entity per row), it is \(min(1, (per_device_batch_size × gradient_accumulation_steps) / n_entities)\). For entity-level DP, n_entities can be small so the ratio may exceed 1; the result is capped at 1.0. Used as the sampling probability in the privacy accountant for ε computation.

num_steps property

The number of optimizer steps used for privacy accounting.

Either user-supplied (via max_steps when true_num_epochs == -1) or determined from num_train_epochs. When the user specifies num_train_epochs, we determine num_steps from sampling_probability so we pass over each entity roughly once per epoch, similarly to passing over each record once per epoch in record-level training.

Always at least 1, because we add 1 to 1 / sampling_probability; this can happen when there are fewer entities than batch_size * gradient_accumulation_steps (e.g. 4 * 8 = 32). Used to determine the privacy budget (noise multiplier and epsilon) during training.

get_epsilon()

Uses the trainer's privacy accountant and the current number of optimizer steps to return the epsilon consumed so far.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def get_epsilon(self) -> float:
    """
    Uses the trainer's privacy accountant and the current number of
    optimizer steps to return the epsilon consumed so far.
    """
    return self.accountant.compute_epsilon(self.state.global_step)

create_optimizer()

Create the base optimizer then wrap it with Opacus DPOptimizer.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def create_optimizer(self):
    """Create the base optimizer then wrap it with Opacus DPOptimizer."""
    _ = super().create_optimizer()

    class DPOptimizer(opacus.optimizers.DPOptimizer):  # ty: ignore[unresolved-attribute]
        """DPOptimizer that delegates ``param_groups`` to the inner optimizer.

        Hugging Face AcceleratedOptimizer replaces ``param_groups``; Opacus
        expects to mutate it. This subclass forwards get/set to the inner
        optimizer so learning rate scheduling and other param_group updates work.
        """

        @property
        def param_groups(self):
            return self.original_optimizer.param_groups

        @param_groups.setter
        def param_groups(self, param_groups):
            self.original_optimizer.param_groups = param_groups

    optimizer_generator = DPOptimizer

    # TODO: explore better mitigation for precision based attacks on finite
    # precision devices
    # https://tpdp.journalprivacyconfidentiality.org/2022/papers/HaneyDHSH22.pdf
    self.optimizer = optimizer_generator(
        optimizer=self.optimizer,
        noise_multiplier=self.privacy_args.noise_multiplier,
        max_grad_norm=self.privacy_args.per_sample_max_grad_norm,
        expected_batch_size=self.args.per_device_train_batch_size * self.args.gradient_accumulation_steps,
        secure_mode=self.secure_mode,
    )

    return self.optimizer

training_step(model, inputs, num_items_in_batch=None)

Run one training step and return the loss scaled for logging.

Forward pass and backward are performed as usual. Loss is not scaled by batch size or per-sample factors here: Opacus handles per-sample gradient scaling. The returned value is the raw loss divided by gradient_accumulation_steps so that the logged loss matches the effective per-step loss (averaged over accumulation steps).

Parameters:

Name Type Description Default
model Module

The model to train (wrapped in GradSampleModule).

required
inputs dict[str, Tensor | Any]

Batch of inputs (e.g. input_ids, labels, position_ids).

required
num_items_in_batch

Unused; passed for API compatibility. Opacus handles scaling; we pass None to avoid double-scaling.

None

Returns:

Type Description
Tensor

Detached loss tensor scaled by 1 / gradient_accumulation_steps,

Tensor

for logging only (optimizer step is driven by the callback).

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def training_step(
    self,
    model: nn.Module,
    inputs: dict[str, torch.Tensor | Any],
    num_items_in_batch=None,
) -> torch.Tensor:
    """Run one training step and return the loss scaled for logging.

    Forward pass and backward are performed as usual. Loss is not scaled by
    batch size or per-sample factors here: Opacus handles per-sample gradient
    scaling. The returned value is the raw loss divided by
    ``gradient_accumulation_steps`` so that the logged loss matches the
    effective per-step loss (averaged over accumulation steps).

    Args:
        model: The model to train (wrapped in ``GradSampleModule``).
        inputs: Batch of inputs (e.g. ``input_ids``, ``labels``, ``position_ids``).
        num_items_in_batch: Unused; passed for API compatibility. Opacus
            handles scaling; we pass ``None`` to avoid double-scaling.

    Returns:
        Detached loss tensor scaled by 1 / ``gradient_accumulation_steps``,
        for logging only (optimizer step is driven by the callback).
    """
    model.train()
    if hasattr(self.optimizer, "train") and callable(self.optimizer.train):
        self.optimizer.train()

    # Compared to the original HF implementation (as of 4.48), we use
    # `num_items_in_batch=None` to avoid any extra scaling, since Opacus
    # already does it; we only divide the loss by the number of gradient
    # accumulation steps after loss.backward(), to get correct logging
    inputs = self._prepare_inputs(inputs)
    with self.compute_loss_context_manager():
        try:
            loss = self.compute_loss(model, inputs, num_items_in_batch=None)
        except TypeError:  # older transformers
            loss = self.compute_loss(model, inputs)
    del inputs

    loss.backward()

    return loss.detach() / self.args.gradient_accumulation_steps

get_train_dataloader()

DataLoader with entity-level sampler and DP data collator.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def get_train_dataloader(self) -> DataLoader:
    """DataLoader with entity-level sampler and DP data collator."""
    train_sampler = self._get_train_sampler()
    return DataLoader(
        self.train_dataset,
        batch_sampler=train_sampler,
        collate_fn=self.data_collator,
        drop_last=self.args.dataloader_drop_last,
        num_workers=self.args.dataloader_num_workers,
        pin_memory=self.args.dataloader_pin_memory,
    )

create_entity_mapping(entity_column_values)

Build a mapping from each entity to its dataset indices.

Groups rows by the entity column; each group's indices are the dataset positions for that entity. Entity order follows groupby sort; order within a group is preserved.

Parameters:

Name Type Description Default
entity_column_values list

List of entity IDs aligned with dataset rows (e.g. one value per row in the same order).

required

Returns:

Type Description
Sequence[Sequence[int]]

Sequence of sequences: for entity i, result[i] is the list of dataset

Sequence[Sequence[int]]

indices belonging to that entity.

Source code in src/nemo_safe_synthesizer/privacy/dp_transformers/dp_utils.py
def create_entity_mapping(entity_column_values: list) -> Sequence[Sequence[int]]:
    """Build a mapping from each entity to its dataset indices.

    Groups rows by the entity column; each group's indices are the dataset
    positions for that entity. Entity order follows groupby sort; order within
    a group is preserved.

    Args:
        entity_column_values: List of entity IDs aligned with dataset rows
            (e.g. one value per row in the same order).

    Returns:
        Sequence of sequences: for entity i, result[i] is the list of dataset
        indices belonging to that entity.
    """
    entities = pd.DataFrame(data={"entity": entity_column_values})
    # Using `groupby("entity")` - note that the entities returned by groupby are
    # sorted, but the order of records in each group is preserved.
    # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.groupby.html
    # TODO: improve for use in sampler.py using a dictionary or such structure
    # with clearly defined entity_ids
    entity_mapping = [g.index.values for _, g in entities.groupby("entity")]
    return entity_mapping