Skip to content

processors

processors

Processors that parse raw LLM text into validated records.

Classes:

Name Description
ParsedResponse

Parsed result of a single LLM prompt response.

Processor

Abstract class for processing text generation results from the LLM.

TabularDataProcessor

Processor for standard (non-grouped, non-time-series) tabular data.

TimeSeriesDataProcessor

Processor for time-series data generation tasks.

GroupedDataProcessor

Processor for grouped data generation tasks.

Functions:

Name Description
create_processor

Create the appropriate record processor for the current pipeline mode.

ParsedResponse(valid_records, invalid_records, errors, prompt_number=None) dataclass

Parsed result of a single LLM prompt response.

Attributes:

Name Type Description
valid_records list[dict]

Records that passed schema validation (as dicts).

invalid_records list[str]

Raw text of records that failed validation.

errors list[tuple[str, str]]

(detailed_msg, summary_msg) tuples for each invalid record.

prompt_number int | None

Index of the prompt in the batch.

Processor(schema, config)

Bases: ABC

Abstract class for processing text generation results from the LLM.

Parameters:

Name Type Description Default
schema dict

JSON schema as a dictionary.

required

Attributes:

Name Type Description
name

The processor's name with spaces, for logging.

Source code in src/nemo_safe_synthesizer/generation/processors.py
def __init__(self, schema: dict, config: ValidationParameters):
    self.schema = schema
    self.config = config
    logger.debug(f"Initialized processor with schema={schema} and config={config}")

name property

The processor's name with spaces, for logging.

TabularDataProcessor(schema, config)

Bases: Processor

Processor for standard (non-grouped, non-time-series) tabular data.

Source code in src/nemo_safe_synthesizer/generation/processors.py
def __init__(self, schema: dict, config: ValidationParameters):
    self.schema = schema
    self.config = config
    logger.debug(f"Initialized processor with schema={schema} and config={config}")

TimeSeriesDataProcessor(schema, config, time_column, interval_seconds, time_format)

Bases: Processor

Processor for time-series data generation tasks.

Validates chronological ordering and timestamp intervals in addition to the standard schema checks.

Parameters:

Name Type Description Default
schema dict

JSON schema as a dictionary.

required
config ValidationParameters

Validation parameters.

required
time_column str | None

Name of the timestamp column.

required
interval_seconds int | None

Expected interval between consecutive timestamps, or None if intervals vary.

required
time_format str | None

Timestamp format string (strptime), or "elapsed_seconds" for numeric elapsed time.

required

Raises:

Type Description
ValueError

If time_column or time_format is None.

Source code in src/nemo_safe_synthesizer/generation/processors.py
def __init__(
    self,
    schema: dict,
    config: ValidationParameters,
    time_column: str | None,
    interval_seconds: int | None,
    time_format: str | None,
):
    super().__init__(schema=schema, config=config)
    if time_column is None:
        raise ValueError(
            "time_column is required for TimeSeriesDataProcessor but was None. "
            "Ensure config.time_series.timestamp_column is set."
        )
    if time_format is None:
        raise ValueError(
            "time_format is required for TimeSeriesDataProcessor but was None. "
            "This should have been inferred during training preprocessing. "
            "Check that process_timeseries_data() was called during training and "
            "that the inferred timestamp_format was saved to the config."
        )
    self.time_column: str = time_column
    self.interval_seconds = interval_seconds
    self.time_format: str = time_format

GroupedDataProcessor(schema, config, bos_token, eos_token, group_by, order_by=None)

Bases: Processor

Processor for grouped data generation tasks.

Used when training examples are grouped (and optionally ordered) by one or more columns. Validates that each group has a unique group_by value and respects the order_by ordering.

Parameters:

Name Type Description Default
schema dict

JSON schema as a dictionary.

required
config ValidationParameters

Validation parameters controlling tolerance for invalid records, non-unique group values, etc.

required
bos_token str

Token delimiting the beginning of a group sequence.

required
eos_token str

Token delimiting the end of a group sequence.

required
group_by str | list[str]

Column name that defines groups.

required
order_by str | None

Column name to enforce ordering within a group, or None if ordering is not required.

None
Source code in src/nemo_safe_synthesizer/generation/processors.py
def __init__(
    self,
    schema: dict,
    config: ValidationParameters,
    bos_token: str,
    eos_token: str,
    group_by: str | list[str],
    order_by: str | None = None,
):
    super().__init__(schema=schema, config=config)
    if isinstance(group_by, str):
        group_by = [group_by]
    self.group_by = group_by
    self.order_by = order_by
    self.bos_token = bos_token
    self.eos_token = eos_token

create_processor(schema, metadata, config)

Create the appropriate record processor for the current pipeline mode.

Selects TimeSeriesDataProcessor, GroupedDataProcessor, or TabularDataProcessor based on the pipeline configuration.

Parameters:

Name Type Description Default
schema dict

JSON schema describing the expected record format.

required
metadata ModelMetadata

Model metadata (prompt template, BOS/EOS tokens, etc.).

required
config SafeSynthesizerParameters

Pipeline configuration determining the generation mode.

required

Returns:

Type Description
Processor

Processor instance matching the configured generation mode.

Source code in src/nemo_safe_synthesizer/generation/processors.py
def create_processor(schema: dict, metadata: ModelMetadata, config: SafeSynthesizerParameters) -> Processor:
    """Create the appropriate record processor for the current pipeline mode.

    Selects ``TimeSeriesDataProcessor``, ``GroupedDataProcessor``, or
    ``TabularDataProcessor`` based on the pipeline configuration.

    Args:
        schema: JSON schema describing the expected record format.
        metadata: Model metadata (prompt template, BOS/EOS tokens, etc.).
        config: Pipeline configuration determining the generation mode.

    Returns:
        Processor instance matching the configured generation mode.
    """
    if config.time_series.is_timeseries:
        processor = TimeSeriesDataProcessor(
            schema,
            config=config.generation.validation,
            time_column=config.time_series.timestamp_column,
            interval_seconds=config.time_series.timestamp_interval_seconds,
            time_format=config.time_series.timestamp_format,
        )
    elif config.data.group_training_examples_by:
        processor = GroupedDataProcessor(
            schema,
            config=config.generation.validation,
            group_by=config.data.group_training_examples_by,
            order_by=config.data.order_training_examples_by,
            bos_token=metadata.prompt_config.bos_token,
            eos_token=metadata.prompt_config.eos_token,
        )
    else:
        processor = TabularDataProcessor(schema, config=config.generation.validation)

    logger.info(f"Initialized the {processor.name}")
    return processor