Skip to content

results

results

Generation result containers and multi-batch accumulator.

Classes:

Name Description
GenerateJobResults

Results of a complete generation job.

GenerationBatches

Accumulator that tracks batches during the generation phase.

Functions:

Name Description
rejected_record_to_error

Convert a rejected record into a (detailed, summary) error tuple.

GenerateJobResults(df, status, num_valid_records, num_invalid_records, num_prompts, valid_record_fraction, batch_valid_record_fractions, elapsed_time=None) dataclass

Results of a complete generation job.

Encapsulates the generated DataFrame along with validity statistics, prompt counts, and timing information. Built from a GenerationBatches accumulator via from_batches.

Methods:

Name Description
from_batches

Build results from a completed :class:GenerationBatches accumulator.

Attributes:

Name Type Description
df DataFrame

DataFrame containing the generated records.

status GenerationStatus

Overall generation status derived from the processed batches.

num_valid_records int

Total number of records that passed validation.

num_invalid_records int

Total number of records that failed validation.

num_prompts int

Total number of prompts processed during generation.

valid_record_fraction float

Fraction of valid records among all generated records.

batch_valid_record_fractions list[float]

Per-batch valid record fractions, in batch order.

elapsed_time float | None

Wall-clock generation duration in seconds, or None if not yet set.

df instance-attribute

DataFrame containing the generated records.

status instance-attribute

Overall generation status derived from the processed batches.

num_valid_records instance-attribute

Total number of records that passed validation.

num_invalid_records instance-attribute

Total number of records that failed validation.

num_prompts instance-attribute

Total number of prompts processed during generation.

valid_record_fraction instance-attribute

Fraction of valid records among all generated records.

batch_valid_record_fractions instance-attribute

Per-batch valid record fractions, in batch order.

elapsed_time = None class-attribute instance-attribute

Wall-clock generation duration in seconds, or None if not yet set.

from_batches(batches, max_num_records, columns, elapsed_time) classmethod

Build results from a completed :class:GenerationBatches accumulator.

Parameters:

Name Type Description Default
batches GenerationBatches

Accumulated generation batches.

required
max_num_records int | None

If set, truncate the output DataFrame to this many rows.

required
columns list[str]

Column names to select from the generated records.

required
elapsed_time float

Wall-clock generation duration in seconds.

required

Returns:

Type Description
Self

Populated results instance.

Source code in src/nemo_safe_synthesizer/generation/results.py
@classmethod
def from_batches(
    cls, batches: GenerationBatches, max_num_records: int | None, columns: list[str], elapsed_time: float
) -> Self:
    """Build results from a completed :class:`GenerationBatches` accumulator.

    Args:
        batches: Accumulated generation batches.
        max_num_records: If set, truncate the output DataFrame to
            this many rows.
        columns: Column names to select from the generated records.
        elapsed_time: Wall-clock generation duration in seconds.

    Returns:
        Populated results instance.
    """
    df = batches.to_dataframe(columns, max_num_records)
    status = batches.status
    num_valid_records = batches.num_valid_records
    num_invalid_records = batches.num_invalid_records
    num_prompts = batches.num_prompts
    valid_record_fraction = (
        batches.num_valid_records / (batches.num_valid_records + batches.num_invalid_records)
        if (batches.num_valid_records + batches.num_invalid_records) > 0
        else 0.0
    )
    batch_valid_record_fractions = [batch.valid_record_fraction for batch in batches._batches]
    return cls(
        df=df,
        status=status,
        num_valid_records=num_valid_records,
        num_invalid_records=num_invalid_records,
        num_prompts=num_prompts,
        valid_record_fraction=valid_record_fraction,
        batch_valid_record_fractions=batch_valid_record_fractions,
        elapsed_time=elapsed_time,
    )

GenerationBatches(target_num_records=None, batches=None, max_num_prompts_per_batch=MAX_NUM_PROMPTS_PER_BATCH, invalid_fraction_threshold=None, patience=None, data_actions_fn=None)

Accumulator that tracks batches during the generation phase.

Manages the stopping condition, running statistics, and optional post-processing via data_actions_fn.

Parameters:

Name Type Description Default
target_num_records int | None

Target number of valid records to generate.

None
batches list[Batch] | None

Pre-existing batches to seed the accumulator with.

None
max_num_prompts_per_batch int

Maximum prompts per LLM generation call.

MAX_NUM_PROMPTS_PER_BATCH
invalid_fraction_threshold float | None

Fraction of invalid records that triggers stopping after patience consecutive batches.

None
patience int | None

Consecutive batch count before the threshold triggers a stop.

None
data_actions_fn DataActionsFn | None

Optional function that post-processes and validates records from each batch.

None

Attributes:

Name Type Description
status

Current generation status.

running_stopping_metric

Exponential running average of the invalid-record fraction.

stop_condition

The patience-based stopping condition, or None if thresholds were not provided.

Methods:

Name Description
add_batch

Add a batch and update the generation status.

get_next_num_prompts

Return an estimate of the optimal number of prompts to process in the next batch.

job_complete

Update the generation job status to a finished state and log the results.

log_status

Log the current status of the generation process.

to_dataframe

Combine valid records from all batches into a single DataFrame.

Source code in src/nemo_safe_synthesizer/generation/results.py
def __init__(
    self,
    target_num_records: int | None = None,
    batches: list[Batch] | None = None,
    max_num_prompts_per_batch: int = MAX_NUM_PROMPTS_PER_BATCH,
    invalid_fraction_threshold: float | None = None,
    patience: int | None = None,
    data_actions_fn: DataActionsFn | None = None,
):
    self._batches = batches or []
    self._start_time = time.perf_counter()
    self.target_num_records = target_num_records
    self.max_num_prompts_per_batch = max_num_prompts_per_batch
    self.status = GenerationStatus.IN_PROGRESS
    self.running_stopping_metric = RunningStatistics()

    self.stop_condition = None
    if invalid_fraction_threshold is not None or patience is not None:
        if invalid_fraction_threshold is None or patience is None:
            raise ValueError("Invalid fraction threshold and patience must be provided together.")
        self.stop_condition = GenerationStopCondition(
            invalid_fraction_threshold=invalid_fraction_threshold,
            patience=patience,
        )

    self.data_actions_fn = data_actions_fn
    self._batches_df: pd.DataFrame | None = None

num_batches property

The number of batches in the generation job.

num_prompts property

The total number of prompts processed in the generation job.

num_invalid_records property

The total number of invalid records generated in the generation job.

num_valid_records property

The total number of valid records generated in the generation job.

add_batch(batch)

Add a batch and update the generation status.

Stopping rules:

  • The very first batch producing zero valid records always triggers STOP_NO_RECORDS.
  • When a stop_condition is configured, subsequent batches with zero valid records are tolerated until the patience-based threshold is reached.
  • Without a stop_condition, any batch with zero valid records triggers STOP_NO_RECORDS.

Parameters:

Name Type Description Default
batch Batch

The completed batch to add.

required
Source code in src/nemo_safe_synthesizer/generation/results.py
def add_batch(self, batch: Batch) -> None:
    """Add a batch and update the generation status.

    Stopping rules:

    * The very first batch producing zero valid records always
      triggers ``STOP_NO_RECORDS``.
    * When a ``stop_condition`` is configured, subsequent batches
      with zero valid records are tolerated until the patience-based
      threshold is reached.
    * Without a ``stop_condition``, any batch with zero valid
      records triggers ``STOP_NO_RECORDS``.

    Args:
        batch: The completed batch to add.
    """
    # TODO: Move application of the data_actions_fn deeper in the generation process
    self._apply_data_actions_fn(batch)
    self.running_stopping_metric.update(batch.stopping_metric)
    if self.stop_condition is None:
        if batch.num_valid_records == 0:
            self.status = GenerationStatus.STOP_NO_RECORDS
    else:
        if batch.num_valid_records == 0 and self.num_batches == 0:
            self.status = GenerationStatus.STOP_NO_RECORDS
        elif self.stop_condition.has_been_reached(self.running_stopping_metric.mean):
            self.status = GenerationStatus.STOP_METRIC_REACHED

    self._batches.append(batch)

get_next_num_prompts()

Return an estimate of the optimal number of prompts to process in the next batch.

Source code in src/nemo_safe_synthesizer/generation/results.py
def get_next_num_prompts(self) -> int:
    """Return an estimate of the optimal number of prompts to process in the next batch."""
    num_prompts = self.max_num_prompts_per_batch

    if self.target_num_records is None:
        return num_prompts

    if self.num_valid_records > 0:
        num_records_remaining = self.target_num_records - self.num_valid_records
        valid_records_per_prompt = 0 if self.num_prompts == 0 else self.num_valid_records / self.num_prompts
        num_prompts_needed = round(num_records_remaining / (valid_records_per_prompt + EPS))
        num_prompts = min(num_prompts, num_prompts_needed + NUM_PROMPT_BUFFER)

    return num_prompts

job_complete()

Update the generation job status to a finished state and log the results.

Source code in src/nemo_safe_synthesizer/generation/results.py
def job_complete(self) -> None:
    """Update the generation job status to a finished state and log the results."""
    self.duration = time.perf_counter() - self._start_time
    if self.status == GenerationStatus.IN_PROGRESS:
        if self.target_num_records is None or self.num_valid_records >= self.target_num_records:
            self.status = GenerationStatus.COMPLETE
        else:
            self.status = GenerationStatus.INCOMPLETE

log_status()

Log the current status of the generation process.

Source code in src/nemo_safe_synthesizer/generation/results.py
def log_status(self) -> None:
    """Log the current status of the generation process."""
    if self.status == GenerationStatus.COMPLETE:
        logger.info("🎉 Generation complete 🎉")
    elif self.status == GenerationStatus.IN_PROGRESS:
        logger.info(
            f"Generation in progress. {self.num_valid_records} out of {self.target_num_records} records generated.",
        )
    elif self.status == GenerationStatus.INCOMPLETE:
        logger.warning(
            f"😬 Generation incomplete -> {self.num_valid_records} out of "
            f"{self.target_num_records} records were generated.",
        )
    elif self.status == GenerationStatus.STOP_NO_RECORDS:
        logger.error(
            "🛑 Stopping generation prematurely. No valid records were generated due to model underfitting."
            " Please consider increasing the 'num_input_records_to_sample' parameter.",
        )
        raise GenerationError(
            "Generation stopped prematurely due to no valid records."
            " Please consider increasing the 'num_input_records_to_sample' parameter."
        )
    elif self.status == GenerationStatus.STOP_METRIC_REACHED:
        logger.error(
            "🛑 Stopping generation prematurely. The stopping "
            "condition was reached with a running average invalid "
            f"fraction of {self.stop_condition.last_value:.2%}."
            " Please consider increasing the 'num_input_records_to_sample' parameter.",
        )
        raise GenerationError(
            "Generation stopped prematurely because "
            f"the average fraction of invalid records was higher than {self.stop_condition.last_value:.2%}."
            " Please consider increasing the 'num_input_records_to_sample' parameter.",
        )

to_dataframe(columns, max_num_records=None)

Combine valid records from all batches into a single DataFrame.

Parameters:

Name Type Description Default
columns list[str]

Column names to include in the output.

required
max_num_records int | None

If set, truncate to this many rows.

None

Returns:

Type Description
DataFrame

DataFrame of valid records, or an empty DataFrame if none

DataFrame

were generated.

Source code in src/nemo_safe_synthesizer/generation/results.py
def to_dataframe(self, columns: list[str], max_num_records: int | None = None) -> pd.DataFrame:
    """Combine valid records from all batches into a single DataFrame.

    Args:
        columns: Column names to include in the output.
        max_num_records: If set, truncate to this many rows.

    Returns:
        DataFrame of valid records, or an empty DataFrame if none
        were generated.
    """
    # return an empty DataFrame when any generated batch has 0 valid records.
    if self.num_valid_records == 0:
        return pd.DataFrame()
    df = pd.concat([batch.to_dataframe() for batch in self._batches], ignore_index=True)[columns]
    if isinstance(df, pd.Series):
        return df.to_frame().head(max_num_records or len(df))
    return df.head(max_num_records or len(df))

rejected_record_to_error(record)

Convert a rejected record into a (detailed, summary) error tuple.

Both elements are identical so that log output is consistent regardless of the detailed_errors setting.

Source code in src/nemo_safe_synthesizer/generation/results.py
def rejected_record_to_error(record: dict) -> tuple[str, str]:
    """Convert a rejected record into a ``(detailed, summary)`` error tuple.

    Both elements are identical so that log output is consistent
    regardless of the ``detailed_errors`` setting.
    """
    error_msg = f"Failed data_config validation due to [{record[MetadataColumns.REJECT_REASON.value]}]"
    return (error_msg, error_msg)