Skip to content

results

results

Generation result containers and multi-batch accumulator.

Classes:

Name Description
GenerateJobResults

Results of a complete generation job.

GenerationBatches

Accumulator that tracks batches during the generation phase.

Functions:

Name Description
rejected_record_to_error

Convert a rejected record into a (detailed, summary) error tuple.

Attributes:

Name Type Description
NUM_PROMPT_BUFFER

Extra prompts added on top of the records-per-prompt estimate to absorb invalid completions.

INITIAL_PROBE_PROMPTS

Prompt count used for the first batch when no records-per-prompt history exists yet.

ADAPTIVE_MAX_PROMPTS_CEILING

Upper bound for target_num_records-derived max_num_prompts_per_batch.

NUM_PROMPT_BUFFER = 10 module-attribute

Extra prompts added on top of the records-per-prompt estimate to absorb invalid completions.

INITIAL_PROBE_PROMPTS = 10 module-attribute

Prompt count used for the first batch when no records-per-prompt history exists yet.

Sending a small probe lets the accumulator measure the records-per-prompt ratio cheaply. Subsequent batches escalate to the full max_num_prompts_per_batch once at least one prompt has been processed (regardless of whether it produced valid records), avoiding the overshoot that an upfront full-batch causes when the target count is much larger than the per-prompt yield.

ADAPTIVE_MAX_PROMPTS_CEILING = 2000 module-attribute

Upper bound for target_num_records-derived max_num_prompts_per_batch.

GenerateJobResults(df, status, num_valid_records, num_invalid_records, num_prompts, valid_record_fraction, batch_valid_record_fractions, elapsed_time=None, num_completion_tokens=None, num_valid_record_tokens=None, num_invalid_record_tokens=None, num_non_record_tokens=None, tokens_per_prompt=None, tokens_per_second=None, valid_tokens_per_second=None, tokenization_overhead_sec=None) dataclass

Results of a complete generation job.

Encapsulates the generated DataFrame along with validity statistics, prompt counts, and timing information. Built from a GenerationBatches accumulator via from_batches.

Methods:

Name Description
from_batches

Build results from a completed :class:GenerationBatches accumulator.

Attributes:

Name Type Description
df DataFrame

DataFrame containing the generated records.

status GenerationStatus

Overall generation status derived from the processed batches.

num_valid_records int

Total number of records that passed validation.

num_invalid_records int

Total number of records that failed validation.

num_prompts int

Total number of prompts processed during generation.

valid_record_fraction float

Fraction of valid records among all generated records.

batch_valid_record_fractions list[float]

Per-batch valid record fractions, in batch order.

elapsed_time float | None

Wall-clock generation duration in seconds, or None if not yet set.

num_completion_tokens int | None

Total tokens generated by the LLM across all completions.

num_valid_record_tokens int | None

Tokens in records that passed validation.

num_invalid_record_tokens int | None

Tokens in records that failed validation.

num_non_record_tokens int | None

Tokens not part of any recognized record.

tokens_per_prompt float | None

Average completion tokens per prompt (num_completion_tokens / num_prompts).

tokens_per_second float | None

Total completion tokens divided by generation wall-clock time.

valid_tokens_per_second float | None

Valid record tokens divided by generation wall-clock time.

tokenization_overhead_sec float | None

Wall-clock seconds spent tokenizing records for statistics.

df instance-attribute

DataFrame containing the generated records.

status instance-attribute

Overall generation status derived from the processed batches.

num_valid_records instance-attribute

Total number of records that passed validation.

num_invalid_records instance-attribute

Total number of records that failed validation.

num_prompts instance-attribute

Total number of prompts processed during generation.

valid_record_fraction instance-attribute

Fraction of valid records among all generated records.

batch_valid_record_fractions instance-attribute

Per-batch valid record fractions, in batch order.

elapsed_time = None class-attribute instance-attribute

Wall-clock generation duration in seconds, or None if not yet set.

num_completion_tokens = None class-attribute instance-attribute

Total tokens generated by the LLM across all completions.

num_valid_record_tokens = None class-attribute instance-attribute

Tokens in records that passed validation.

num_invalid_record_tokens = None class-attribute instance-attribute

Tokens in records that failed validation.

num_non_record_tokens = None class-attribute instance-attribute

Tokens not part of any recognized record.

tokens_per_prompt = None class-attribute instance-attribute

Average completion tokens per prompt (num_completion_tokens / num_prompts).

tokens_per_second = None class-attribute instance-attribute

Total completion tokens divided by generation wall-clock time.

valid_tokens_per_second = None class-attribute instance-attribute

Valid record tokens divided by generation wall-clock time.

tokenization_overhead_sec = None class-attribute instance-attribute

Wall-clock seconds spent tokenizing records for statistics.

from_batches(batches, max_num_records, columns, elapsed_time) classmethod

Build results from a completed :class:GenerationBatches accumulator.

Parameters:

Name Type Description Default
batches GenerationBatches

Accumulated generation batches.

required
max_num_records int | None

If set, truncate the output DataFrame to this many rows.

required
columns list[str]

Column names to select from the generated records.

required
elapsed_time float

Wall-clock generation duration in seconds.

required

Returns:

Type Description
Self

Populated results instance.

Source code in src/nemo_safe_synthesizer/generation/results.py
@classmethod
def from_batches(
    cls, batches: GenerationBatches, max_num_records: int | None, columns: list[str], elapsed_time: float
) -> Self:
    """Build results from a completed :class:`GenerationBatches` accumulator.

    Args:
        batches: Accumulated generation batches.
        max_num_records: If set, truncate the output DataFrame to
            this many rows.
        columns: Column names to select from the generated records.
        elapsed_time: Wall-clock generation duration in seconds.

    Returns:
        Populated results instance.
    """
    df = batches.to_dataframe(columns, max_num_records)
    status = batches.status
    num_valid_records = batches.num_valid_records
    num_invalid_records = batches.num_invalid_records
    num_prompts = batches.num_prompts
    valid_record_fraction = (
        batches.num_valid_records / (batches.num_valid_records + batches.num_invalid_records)
        if (batches.num_valid_records + batches.num_invalid_records) > 0
        else 0.0
    )
    batch_valid_record_fractions = [batch.valid_record_fraction for batch in batches._batches]

    has_token_data = batches.total_completion_tokens > 0
    return cls(
        df=df,
        status=status,
        num_valid_records=num_valid_records,
        num_invalid_records=num_invalid_records,
        num_prompts=num_prompts,
        valid_record_fraction=valid_record_fraction,
        batch_valid_record_fractions=batch_valid_record_fractions,
        elapsed_time=elapsed_time,
        num_completion_tokens=batches.total_completion_tokens if has_token_data else None,
        num_valid_record_tokens=batches.total_valid_record_tokens if has_token_data else None,
        num_invalid_record_tokens=batches.total_invalid_record_tokens if has_token_data else None,
        num_non_record_tokens=batches.total_non_record_tokens if has_token_data else None,
        tokens_per_prompt=(
            batches.total_completion_tokens / num_prompts if has_token_data and num_prompts > 0 else None
        ),
        tokens_per_second=(
            batches.total_completion_tokens / elapsed_time if has_token_data and elapsed_time > 0 else None
        ),
        valid_tokens_per_second=(
            batches.total_valid_record_tokens / elapsed_time if has_token_data and elapsed_time > 0 else None
        ),
        tokenization_overhead_sec=batches.total_tokenization_time_sec if has_token_data else None,
    )

GenerationBatches(target_num_records=None, batches=None, max_num_prompts_per_batch=None, invalid_fraction_threshold=None, patience=None, data_actions_fn=None)

Accumulator that tracks batches during the generation phase.

Manages the stopping condition, running statistics, and optional post-processing via data_actions_fn.

Parameters:

Name Type Description Default
target_num_records int | None

Target number of valid records to generate.

None
batches list[Batch] | None

Pre-existing batches to seed the accumulator with.

None
max_num_prompts_per_batch int | None

Maximum prompts per LLM generation call. None (the default) derives the cap adaptively from target_num_records -- min(2000, max(MAX_NUM_PROMPTS_PER_BATCH, target_num_records // 20)) -- so small jobs do not request an oversized batch and large jobs do not stall on a single small batch.

None
invalid_fraction_threshold float | None

Fraction of invalid records that triggers stopping after patience consecutive batches.

None
patience int | None

Consecutive batch count before the threshold triggers a stop.

None
data_actions_fn DataActionsFn | None

Optional function that post-processes and validates records from each batch.

None

Attributes:

Name Type Description
status

Current generation status.

running_stopping_metric

Exponential running average of the invalid-record fraction.

stop_condition

The patience-based stopping condition, or None if thresholds were not provided.

Methods:

Name Description
add_batch

Add a batch and update the generation status.

get_next_num_prompts

Return an estimate of the optimal number of prompts to process in the next batch.

job_complete

Update the generation job status to a finished state and log the results.

log_status

Log the current status of the generation process.

to_dataframe

Combine valid records from all batches into a single DataFrame.

Source code in src/nemo_safe_synthesizer/generation/results.py
def __init__(
    self,
    target_num_records: int | None = None,
    batches: list[Batch] | None = None,
    max_num_prompts_per_batch: int | None = None,
    invalid_fraction_threshold: float | None = None,
    patience: int | None = None,
    data_actions_fn: DataActionsFn | None = None,
):
    self._batches = batches or []
    self._start_time = time.perf_counter()
    self.target_num_records = target_num_records
    self.max_num_prompts_per_batch = self._resolve_max_num_prompts_per_batch(
        max_num_prompts_per_batch, target_num_records
    )
    self.status = GenerationStatus.IN_PROGRESS
    self.running_stopping_metric = RunningStatistics()

    self.stop_condition = None
    if invalid_fraction_threshold is not None or patience is not None:
        if invalid_fraction_threshold is None or patience is None:
            raise ValueError("Invalid fraction threshold and patience must be provided together.")
        self.stop_condition = GenerationStopCondition(
            invalid_fraction_threshold=invalid_fraction_threshold,
            patience=patience,
        )

    self.data_actions_fn = data_actions_fn
    self._batches_df: pd.DataFrame | None = None

num_batches property

The number of batches in the generation job.

num_prompts property

The total number of prompts processed in the generation job.

num_invalid_records property

The total number of invalid records generated in the generation job.

num_valid_records property

The total number of valid records generated in the generation job.

total_completion_tokens property

Total tokens across all completions in all batches.

total_valid_record_tokens property

Sum of token counts for valid records across all batches.

total_invalid_record_tokens property

Sum of token counts for invalid records across all batches.

total_non_record_tokens property

Tokens not part of any recognized record across all batches.

total_tokenization_time_sec property

Wall-clock seconds spent tokenizing records across all batches.

add_batch(batch)

Add a batch and update the generation status.

Stopping rules:

  • The very first batch producing zero valid records always triggers STOP_NO_RECORDS.
  • When a stop_condition is configured, subsequent batches with zero valid records are tolerated until the patience-based threshold is reached.
  • Without a stop_condition, any batch with zero valid records triggers STOP_NO_RECORDS.

Parameters:

Name Type Description Default
batch Batch

The completed batch to add.

required
Source code in src/nemo_safe_synthesizer/generation/results.py
def add_batch(self, batch: Batch) -> None:
    """Add a batch and update the generation status.

    Stopping rules:

    * The very first batch producing zero valid records always
      triggers ``STOP_NO_RECORDS``.
    * When a ``stop_condition`` is configured, subsequent batches
      with zero valid records are tolerated until the patience-based
      threshold is reached.
    * Without a ``stop_condition``, any batch with zero valid
      records triggers ``STOP_NO_RECORDS``.

    Args:
        batch: The completed batch to add.
    """
    # TODO: Move application of the data_actions_fn deeper in the generation process
    self._apply_data_actions_fn(batch)
    self.running_stopping_metric.update(batch.stopping_metric)
    if self.stop_condition is None:
        if batch.num_valid_records == 0:
            self.status = GenerationStatus.STOP_NO_RECORDS
    else:
        if batch.num_valid_records == 0 and self.num_batches == 0:
            self.status = GenerationStatus.STOP_NO_RECORDS
        elif self.stop_condition.has_been_reached(self.running_stopping_metric.mean):
            self.status = GenerationStatus.STOP_METRIC_REACHED

    self._batches.append(batch)

get_next_num_prompts()

Return an estimate of the optimal number of prompts to process in the next batch.

The accumulator scales the per-batch prompt count through three regimes:

  • Truly-first batch (no prompts ever sent) -- send a small probe batch of INITIAL_PROBE_PROMPTS so the records-per-prompt ratio can be measured before committing to a full batch.
  • Prompts sent but no valid records yet -- escalate to the full max_num_prompts_per_batch budget so the patience-based stopping path can decide whether to abort.
  • Have valid records -- size the next batch from the observed records-per-prompt ratio, plus NUM_PROMPT_BUFFER to absorb invalid completions.
Source code in src/nemo_safe_synthesizer/generation/results.py
def get_next_num_prompts(self) -> int:
    """Return an estimate of the optimal number of prompts to process in the next batch.

    The accumulator scales the per-batch prompt count through three
    regimes:

    * Truly-first batch (no prompts ever sent) -- send a small probe
      batch of ``INITIAL_PROBE_PROMPTS`` so the records-per-prompt
      ratio can be measured before committing to a full batch.
    * Prompts sent but no valid records yet -- escalate to the full
      ``max_num_prompts_per_batch`` budget so the patience-based
      stopping path can decide whether to abort.
    * Have valid records -- size the next batch from the observed
      records-per-prompt ratio, plus ``NUM_PROMPT_BUFFER`` to absorb
      invalid completions.
    """
    num_prompts = self.max_num_prompts_per_batch

    if self.target_num_records is None:
        return num_prompts

    num_records_remaining = self.target_num_records - self.num_valid_records
    records_per_prompt_known = self.num_valid_records > 0 and self.num_prompts > 0
    is_first_batch = self.num_prompts == 0

    if records_per_prompt_known:
        valid_records_per_prompt = self.num_valid_records / self.num_prompts
        num_prompts_needed = round(num_records_remaining / (valid_records_per_prompt + EPS))
        return min(num_prompts, num_prompts_needed + NUM_PROMPT_BUFFER)

    if is_first_batch:
        return min(num_prompts, INITIAL_PROBE_PROMPTS, num_records_remaining + NUM_PROMPT_BUFFER)

    return min(num_prompts, num_records_remaining + NUM_PROMPT_BUFFER)

job_complete()

Update the generation job status to a finished state and log the results.

Source code in src/nemo_safe_synthesizer/generation/results.py
def job_complete(self) -> None:
    """Update the generation job status to a finished state and log the results."""
    self.duration = time.perf_counter() - self._start_time
    if self.status == GenerationStatus.IN_PROGRESS:
        if self.target_num_records is None or self.num_valid_records >= self.target_num_records:
            self.status = GenerationStatus.COMPLETE
        else:
            self.status = GenerationStatus.INCOMPLETE

log_status()

Log the current status of the generation process.

Source code in src/nemo_safe_synthesizer/generation/results.py
def log_status(self) -> None:
    """Log the current status of the generation process."""
    if self.status == GenerationStatus.COMPLETE:
        logger.info("🎉 Generation complete 🎉")
    elif self.status == GenerationStatus.IN_PROGRESS:
        logger.info(
            f"Generation in progress. {self.num_valid_records} out of {self.target_num_records} records generated.",
        )
    elif self.status == GenerationStatus.INCOMPLETE:
        logger.warning(
            f"😬 Generation incomplete -> {self.num_valid_records} out of "
            f"{self.target_num_records} records were generated.",
        )
    elif self.status == GenerationStatus.STOP_NO_RECORDS:
        logger.error(
            "🛑 Stopping generation prematurely. No valid records were generated due to model underfitting."
            " Please consider increasing the 'num_input_records_to_sample' parameter.",
        )
        raise GenerationError(
            "Generation stopped prematurely due to no valid records."
            " Please consider increasing the 'num_input_records_to_sample' parameter."
        )
    elif self.status == GenerationStatus.STOP_METRIC_REACHED:
        stop = self.stop_condition
        if stop is None:
            raise GenerationError("Generation stopped: metric reached but stop_condition is None.")
        stop_val: float | int | None = stop.last_value
        frac_str = f"{stop_val:.2%}" if stop_val is not None else "?"
        logger.error(
            "🛑 Stopping generation prematurely. The stopping "
            "condition was reached with a running average invalid "
            f"fraction of {frac_str}."
            " Please consider increasing the 'num_input_records_to_sample' parameter.",
        )
        raise GenerationError(
            "Generation stopped prematurely because "
            f"the average fraction of invalid records was higher than {frac_str}."
            " Please consider increasing the 'num_input_records_to_sample' parameter.",
        )

to_dataframe(columns, max_num_records=None)

Combine valid records from all batches into a single DataFrame.

Parameters:

Name Type Description Default
columns list[str]

Column names to include in the output.

required
max_num_records int | None

If set, truncate to this many rows.

None

Returns:

Type Description
DataFrame

DataFrame of valid records, or an empty DataFrame if none

DataFrame

were generated.

Source code in src/nemo_safe_synthesizer/generation/results.py
def to_dataframe(self, columns: list[str], max_num_records: int | None = None) -> pd.DataFrame:
    """Combine valid records from all batches into a single DataFrame.

    Args:
        columns: Column names to include in the output.
        max_num_records: If set, truncate to this many rows.

    Returns:
        DataFrame of valid records, or an empty DataFrame if none
        were generated.
    """
    # return an empty DataFrame when any generated batch has 0 valid records.
    if self.num_valid_records == 0:
        return pd.DataFrame()
    df = pd.concat([batch.to_dataframe() for batch in self._batches], ignore_index=True)[columns]
    if isinstance(df, pd.Series):
        return df.to_frame().head(max_num_records or len(df))
    return df.head(max_num_records or len(df))

rejected_record_to_error(record)

Convert a rejected record into a (detailed, summary) error tuple.

Both elements are identical so that log output is consistent regardless of the detailed_errors setting.

Source code in src/nemo_safe_synthesizer/generation/results.py
def rejected_record_to_error(record: dict) -> tuple[str, str]:
    """Convert a rejected record into a ``(detailed, summary)`` error tuple.

    Both elements are identical so that log output is consistent
    regardless of the ``detailed_errors`` setting.
    """
    error_msg = f"Failed data_config validation due to [{record[MetadataColumns.REJECT_REASON.value]}]"
    return (error_msg, error_msg)