Skip to content

processors

processors

Processors that parse raw LLM text into validated records.

Classes:

Name Description
ParsedRecord

A single record extracted from an LLM completion.

ParsedResponse

Parsed result of a single LLM prompt response.

Processor

Abstract class for processing text generation results from the LLM.

TabularDataProcessor

Processor for standard (non-grouped, non-time-series) tabular data.

TimeSeriesDataProcessor

Processor for time-series data generation tasks.

GroupedDataProcessor

Processor for grouped data generation tasks.

Functions:

Name Description
create_processor

Create the appropriate record processor for the current pipeline mode.

ParsedRecord(text, parsed=None, error=None, token_count=0) dataclass

A single record extracted from an LLM completion.

Validity is tracked by the invariant that exactly one of parsed and error is non-None: a valid record has parsed set and error as None, an invalid record has error set and parsed as None. is_valid is the canonical accessor.

text and token_count are captured at extraction time and remain invariant even if the record is reclassified later (e.g. by group-level checks or data-fidelity filters) via invalidate.

Methods:

Name Description
invalidate

Reclassify this record as invalid.

Attributes:

Name Type Description
text str

Original regex-matched JSON string (invariant under reclassification).

parsed dict | None

Parsed dict when validation succeeded, None when invalid.

error tuple[str, str] | None

(detailed_msg, validator) when invalid, None when valid.

token_count int

Number of tokens in text; 0 when no tokenizer was provided.

is_valid bool

Return True when this record passed validation.

text instance-attribute

Original regex-matched JSON string (invariant under reclassification).

parsed = None class-attribute instance-attribute

Parsed dict when validation succeeded, None when invalid.

error = None class-attribute instance-attribute

(detailed_msg, validator) when invalid, None when valid.

token_count = 0 class-attribute instance-attribute

Number of tokens in text; 0 when no tokenizer was provided.

is_valid property

Return True when this record passed validation.

invalidate(error)

Reclassify this record as invalid.

text and token_count are kept intact; parsed is cleared so downstream consumers don't accidentally use a stale dict.

Parameters:

Name Type Description Default
error tuple[str, str]

(detailed_msg, validator) tuple describing the reason for invalidation.

required
Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def invalidate(self, error: tuple[str, str]) -> None:
    """Reclassify this record as invalid.

    ``text`` and ``token_count`` are kept intact; ``parsed`` is
    cleared so downstream consumers don't accidentally use a stale
    dict.

    Args:
        error: ``(detailed_msg, validator)`` tuple describing the
            reason for invalidation.
    """
    self.error = error
    self.parsed = None

ParsedResponse(records=list(), tokenization_time_sec=0.0, prompt_number=None) dataclass

Parsed result of a single LLM prompt response.

Holds a flat list of ParsedRecord objects (in input order) plus aggregated tokenization timing. valid_records / invalid_records / errors are convenience views that project the record list into the shapes expected by downstream aggregation code (parsed dicts, original text, (msg, validator) tuples respectively).

Attributes:

Name Type Description
records list[ParsedRecord]

Per-record extraction + validation outcomes, in input order.

tokenization_time_sec float

Wall-clock seconds spent tokenizing records in this response.

prompt_number int | None

Index of the prompt within the batch (set by the processor call).

valid_records list[dict]

Parsed dicts for records that passed validation.

invalid_records list[str]

Original text for records that failed validation.

errors list[tuple[str, str]]

(detailed_msg, validator) tuples for each invalid record.

records = field(default_factory=list) class-attribute instance-attribute

Per-record extraction + validation outcomes, in input order.

tokenization_time_sec = 0.0 class-attribute instance-attribute

Wall-clock seconds spent tokenizing records in this response.

prompt_number = None class-attribute instance-attribute

Index of the prompt within the batch (set by the processor call).

valid_records property

Parsed dicts for records that passed validation.

invalid_records property

Original text for records that failed validation.

errors property

(detailed_msg, validator) tuples for each invalid record.

Processor(schema, config, tokenizer=None)

Bases: ABC

Abstract class for processing text generation results from the LLM.

Parameters:

Name Type Description Default
schema dict[str, Any]

JSON schema as a dictionary.

required
config ValidationParameters

Validation parameters.

required
tokenizer PreTrainedTokenizerBase | None

Optional tokenizer for exact per-record token counting. None is supported for tests and the training eval callback path when the tokenizer isn't readily available; in that case ParsedRecord.token_count is 0.

None

Attributes:

Name Type Description
name str

The processor's name with spaces, for logging.

Source code in src/nemo_safe_synthesizer/generation/processors.py
def __init__(
    self,
    schema: dict[str, Any],
    config: ValidationParameters,
    tokenizer: PreTrainedTokenizerBase | None = None,
):
    self.schema = schema
    self.config = config
    self._tokenizer = tokenizer
    logger.debug(
        f"Initialized processor with schema={schema}, config={config}, tokenizer_present={tokenizer is not None}"
    )

name property

The processor's name with spaces, for logging.

TabularDataProcessor(schema, config, tokenizer=None)

Bases: Processor

Processor for standard (non-grouped, non-time-series) tabular data.

Source code in src/nemo_safe_synthesizer/generation/processors.py
def __init__(
    self,
    schema: dict[str, Any],
    config: ValidationParameters,
    tokenizer: PreTrainedTokenizerBase | None = None,
):
    self.schema = schema
    self.config = config
    self._tokenizer = tokenizer
    logger.debug(
        f"Initialized processor with schema={schema}, config={config}, tokenizer_present={tokenizer is not None}"
    )

TimeSeriesDataProcessor(schema, config, time_column, interval_seconds, time_format, tokenizer=None)

Bases: Processor

Processor for time-series data generation tasks.

Validates chronological ordering and timestamp intervals in addition to the standard schema checks.

Parameters:

Name Type Description Default
schema dict[str, Any]

JSON schema as a dictionary.

required
config ValidationParameters

Validation parameters.

required
time_column str | None

Name of the timestamp column.

required
interval_seconds int | None

Expected interval between consecutive timestamps, or None if intervals vary.

required
time_format str | None

Timestamp format string (strptime), or "elapsed_seconds" for numeric elapsed time.

required
tokenizer PreTrainedTokenizerBase | None

Optional tokenizer for exact per-record token counting.

None

Raises:

Type Description
ValueError

If time_column or time_format is None.

Source code in src/nemo_safe_synthesizer/generation/processors.py
def __init__(
    self,
    schema: dict[str, Any],
    config: ValidationParameters,
    time_column: str | None,
    interval_seconds: int | None,
    time_format: str | None,
    tokenizer: PreTrainedTokenizerBase | None = None,
):
    super().__init__(schema=schema, config=config, tokenizer=tokenizer)
    if time_column is None:
        raise ValueError(
            "time_column is required for TimeSeriesDataProcessor but was None. "
            "Ensure config.time_series.timestamp_column is set."
        )
    if time_format is None:
        raise ValueError(
            "time_format is required for TimeSeriesDataProcessor but was None. "
            "This should have been inferred during training preprocessing. "
            "Check that process_timeseries_data() was called during training and "
            "that the inferred timestamp_format was saved to the config."
        )
    self.time_column: str = time_column
    self.interval_seconds = interval_seconds
    self.time_format: str = time_format

GroupedDataProcessor(schema, config, bos_token, eos_token, group_by, order_by=None, tokenizer=None)

Bases: Processor

Processor for grouped data generation tasks.

Used when training examples are grouped (and optionally ordered) by a column. Validates that each group has a unique group_by value and respects the order_by ordering.

Parameters:

Name Type Description Default
schema dict[str, Any]

JSON schema as a dictionary.

required
config ValidationParameters

Validation parameters controlling tolerance for invalid records, non-unique group values, etc.

required
bos_token str

Token delimiting the beginning of a group sequence.

required
eos_token str

Token delimiting the end of a group sequence.

required
group_by str

Column name that defines groups.

required
order_by str | None

Column name to enforce ordering within a group, or None if ordering is not required.

None
tokenizer PreTrainedTokenizerBase | None

Optional tokenizer for exact per-record token counting.

None
Source code in src/nemo_safe_synthesizer/generation/processors.py
def __init__(
    self,
    schema: dict[str, Any],
    config: ValidationParameters,
    bos_token: str,
    eos_token: str,
    group_by: str,
    order_by: str | None = None,
    tokenizer: PreTrainedTokenizerBase | None = None,
):
    super().__init__(schema=schema, config=config, tokenizer=tokenizer)
    self.group_by: list[str] = [group_by]
    self.order_by = order_by
    self.bos_token = bos_token
    self.eos_token = eos_token

create_processor(schema, metadata, config, tokenizer=None)

Create the appropriate record processor for the current pipeline mode.

Selects TimeSeriesDataProcessor, GroupedDataProcessor, or TabularDataProcessor based on the pipeline configuration.

Parameters:

Name Type Description Default
schema dict[str, Any]

JSON schema describing the expected record format.

required
metadata ModelMetadata

Model metadata (prompt template, BOS/EOS tokens, etc.).

required
config SafeSynthesizerParameters

Pipeline configuration determining the generation mode.

required
tokenizer PreTrainedTokenizerBase | None

Optional tokenizer for exact token counting during record parsing. When None, token counts are not tracked.

None

Returns:

Type Description
Processor

Processor instance matching the configured generation mode.

Source code in src/nemo_safe_synthesizer/generation/processors.py
def create_processor(
    schema: dict[str, Any],
    metadata: ModelMetadata,
    config: SafeSynthesizerParameters,
    tokenizer: PreTrainedTokenizerBase | None = None,
) -> Processor:
    """Create the appropriate record processor for the current pipeline mode.

    Selects ``TimeSeriesDataProcessor``, ``GroupedDataProcessor``, or
    ``TabularDataProcessor`` based on the pipeline configuration.

    Args:
        schema: JSON schema describing the expected record format.
        metadata: Model metadata (prompt template, BOS/EOS tokens, etc.).
        config: Pipeline configuration determining the generation mode.
        tokenizer: Optional tokenizer for exact token counting during
            record parsing.  When ``None``, token counts are not tracked.

    Returns:
        Processor instance matching the configured generation mode.
    """
    if config.time_series.is_timeseries:
        processor = TimeSeriesDataProcessor(
            schema,
            config=config.generation.validation,
            time_column=config.time_series.timestamp_column,
            interval_seconds=config.time_series.timestamp_interval_seconds,
            time_format=config.time_series.timestamp_format,
            tokenizer=tokenizer,
        )
    elif config.data.group_training_examples_by:
        processor = GroupedDataProcessor(
            schema,
            config=config.generation.validation,
            group_by=config.data.group_training_examples_by,
            order_by=config.data.order_training_examples_by,
            bos_token=metadata.prompt_config.bos_token,
            eos_token=metadata.prompt_config.eos_token,
            tokenizer=tokenizer,
        )
    else:
        processor = TabularDataProcessor(schema, config=config.generation.validation, tokenizer=tokenizer)

    logger.info(f"Initialized the {processor.name}")
    return processor