Skip to content

record_utils

record_utils

Utilities for extracting, validating, and converting JSONL records.

Provides regex-based JSONL extraction, JSON-schema validation (including time-series interval checks), DataFrame normalization, and JSONL serialization.

Classes:

Name Description
ParsedRecord

A single record extracted from an LLM completion.

ParsedResponse

Parsed result of a single LLM prompt response.

Functions:

Name Description
is_safe_for_float_conversion

Check if a value can be safely converted to float64 without overflow.

check_record_for_large_numbers

Check if a record contains any numbers that would cause float64 overflow.

check_if_records_are_ordered

Check if the records are in ascending order based on the given order_by column.

extract_records_from_jsonl_string

Extract and return tabular records from the given JSONL string.

extract_groups_from_jsonl_string

Extract groups of records from the given JSONL string.

timed_encode

Wrap an encode callable with timing, or return a no-op.

extract_and_validate_records

Extract and validate records from the given JSONL string.

extract_and_validate_timeseries_records

Extract and validate sequential records with time-interval constraints.

normalize_dataframe

Normalize a DataFrame of generated records via a CSV round-trip.

records_to_jsonl

Convert list of records to a JSONL string.

ParsedRecord(text, parsed=None, error=None, token_count=0) dataclass

A single record extracted from an LLM completion.

Validity is tracked by the invariant that exactly one of parsed and error is non-None: a valid record has parsed set and error as None, an invalid record has error set and parsed as None. is_valid is the canonical accessor.

text and token_count are captured at extraction time and remain invariant even if the record is reclassified later (e.g. by group-level checks or data-fidelity filters) via invalidate.

Methods:

Name Description
invalidate

Reclassify this record as invalid.

Attributes:

Name Type Description
text str

Original regex-matched JSON string (invariant under reclassification).

parsed dict | None

Parsed dict when validation succeeded, None when invalid.

error tuple[str, str] | None

(detailed_msg, validator) when invalid, None when valid.

token_count int

Number of tokens in text; 0 when no tokenizer was provided.

is_valid bool

Return True when this record passed validation.

text instance-attribute

Original regex-matched JSON string (invariant under reclassification).

parsed = None class-attribute instance-attribute

Parsed dict when validation succeeded, None when invalid.

error = None class-attribute instance-attribute

(detailed_msg, validator) when invalid, None when valid.

token_count = 0 class-attribute instance-attribute

Number of tokens in text; 0 when no tokenizer was provided.

is_valid property

Return True when this record passed validation.

invalidate(error)

Reclassify this record as invalid.

text and token_count are kept intact; parsed is cleared so downstream consumers don't accidentally use a stale dict.

Parameters:

Name Type Description Default
error tuple[str, str]

(detailed_msg, validator) tuple describing the reason for invalidation.

required
Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def invalidate(self, error: tuple[str, str]) -> None:
    """Reclassify this record as invalid.

    ``text`` and ``token_count`` are kept intact; ``parsed`` is
    cleared so downstream consumers don't accidentally use a stale
    dict.

    Args:
        error: ``(detailed_msg, validator)`` tuple describing the
            reason for invalidation.
    """
    self.error = error
    self.parsed = None

ParsedResponse(records=list(), tokenization_time_sec=0.0, prompt_number=None) dataclass

Parsed result of a single LLM prompt response.

Holds a flat list of ParsedRecord objects (in input order) plus aggregated tokenization timing. valid_records / invalid_records / errors are convenience views that project the record list into the shapes expected by downstream aggregation code (parsed dicts, original text, (msg, validator) tuples respectively).

Attributes:

Name Type Description
records list[ParsedRecord]

Per-record extraction + validation outcomes, in input order.

tokenization_time_sec float

Wall-clock seconds spent tokenizing records in this response.

prompt_number int | None

Index of the prompt within the batch (set by the processor call).

valid_records list[dict]

Parsed dicts for records that passed validation.

invalid_records list[str]

Original text for records that failed validation.

errors list[tuple[str, str]]

(detailed_msg, validator) tuples for each invalid record.

records = field(default_factory=list) class-attribute instance-attribute

Per-record extraction + validation outcomes, in input order.

tokenization_time_sec = 0.0 class-attribute instance-attribute

Wall-clock seconds spent tokenizing records in this response.

prompt_number = None class-attribute instance-attribute

Index of the prompt within the batch (set by the processor call).

valid_records property

Parsed dicts for records that passed validation.

invalid_records property

Original text for records that failed validation.

errors property

(detailed_msg, validator) tuples for each invalid record.

is_safe_for_float_conversion(value)

Check if a value can be safely converted to float64 without overflow.

Only int values can cause overflow; all other types are considered safe.

Parameters:

Name Type Description Default
value str | int | float | None | list | dict

The value to check.

required

Returns:

Type Description
bool

True if the value can be safely converted to float64, False otherwise.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def is_safe_for_float_conversion(value: str | int | float | None | list | dict) -> bool:
    """Check if a value can be safely converted to float64 without overflow.

    Only ``int`` values can cause overflow; all other types are considered safe.

    Args:
        value: The value to check.

    Returns:
        True if the value can be safely converted to float64, False otherwise.
    """
    # not considering Decimal because the input of this validation
    # is coming from converting a jsonl string to JSON object.
    # JSON object only supports int or float for numeric numbers

    # only int could have overflow error
    if isinstance(value, int):
        try:
            float(value)
            return True
        except (OverflowError, ValueError):
            return False
    return True

check_record_for_large_numbers(record)

Check if a record contains any numbers that would cause float64 overflow.

Parameters:

Name Type Description Default
record dict

Dictionary of field names to values.

required

Returns:

Type Description
str | None

An error message describing the first unsafe value found,

str | None

or None if all values are safe.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def check_record_for_large_numbers(record: dict) -> str | None:
    """Check if a record contains any numbers that would cause float64 overflow.

    Args:
        record: Dictionary of field names to values.

    Returns:
        An error message describing the first unsafe value found,
        or None if all values are safe.
    """
    for key, value in record.items():
        if not is_safe_for_float_conversion(value):
            # If a column contains a value that is too large to convert to float64,
            # then the entire record is invalid
            return f"Value {value} in field '{key}' is too large to convert to float64"

    return None

check_if_records_are_ordered(records, order_by)

Check if the records are in ascending order based on the given order_by column.

Parameters:

Name Type Description Default
records list[dict]

List of of JSONL records.

required
order_by str

Column to check for ordering.

required

Returns:

Type Description
bool

True if the records are ordered by the given column, otherwise False.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def check_if_records_are_ordered(records: list[dict], order_by: str) -> bool:
    """Check if the records are in ascending order based on the given `order_by` column.

    Args:
        records: List of of JSONL records.
        order_by: Column to check for ordering.

    Returns:
        True if the records are ordered by the given column, otherwise False.
    """
    order_by_values = [rec[order_by] for rec in records]
    sorted_values = sorted([rec[order_by] for rec in records])
    return order_by_values == sorted_values

extract_records_from_jsonl_string(jsonl_string)

Extract and return tabular records from the given JSONL string.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def extract_records_from_jsonl_string(jsonl_string: str) -> list[str]:
    """Extract and return tabular records from the given JSONL string."""
    return re.findall(RECORD_REGEX_PATTEN_LOOKAHEAD, jsonl_string)

extract_groups_from_jsonl_string(jsonl_string, bos, eos)

Extract groups of records from the given JSONL string.

This function assumes that the complete group of records is enclosed by the given beginning-of-sequence (bos) and end-of-sequence (eos) tokens.

Parameters:

Name Type Description Default
jsonl_string str

Single JSONL string containing grouped tabular records.

required
bos str

Beginning-of-sequence token used to identify the start of a group.

required
eos str

End-of-sequence token used to identify the end of a group.

required

Returns:

Type Description
list[str]

Substrings matching complete bos/eos-delimited record groups.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def extract_groups_from_jsonl_string(jsonl_string: str, bos: str, eos: str) -> list[str]:
    """Extract groups of records from the given JSONL string.

    This function assumes that the complete group of records
    is enclosed by the given beginning-of-sequence (bos) and
    end-of-sequence (eos) tokens.

    Args:
        jsonl_string: Single JSONL string containing grouped tabular records.
        bos: Beginning-of-sequence token used to identify the start of a group.
        eos: End-of-sequence token used to identify the end of a group.

    Returns:
        Substrings matching complete bos/eos-delimited record groups.
    """
    bos_re = re.escape(rf"{bos}")
    eos_re = re.escape(rf"{eos}")
    return re.findall(rf"{bos_re}\s?(?:{RECORD_REGEX_PATTERN}\s?)+\s?{eos_re}", jsonl_string)

timed_encode(encode)

Wrap an encode callable with timing, or return a no-op.

Returns a function timed(text) that returns (n_tokens, elapsed_seconds). When encode is None the returned function always returns (0, 0.0).

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def timed_encode(
    encode: Callable[[str], list[int]] | None,
) -> Callable[[str], tuple[int, float]]:
    """Wrap an encode callable with timing, or return a no-op.

    Returns a function ``timed(text)`` that returns ``(n_tokens,
    elapsed_seconds)``.  When *encode* is ``None`` the returned
    function always returns ``(0, 0.0)``.
    """
    if encode is None:

        def _noop(_text: str) -> tuple[int, float]:
            return 0, 0.0

        return _noop

    def _timed(text: str) -> tuple[int, float]:
        t0 = time.monotonic()
        n = len(encode(text))
        return n, time.monotonic() - t0

    return _timed

extract_and_validate_records(jsonl_string, schema, encode=None)

Extract and validate records from the given JSONL string.

Each regex-matched JSON string is tokenized (when encode is provided) before validation so that exact token counts are available for every record regardless of later reclassification.

Parameters:

Name Type Description Default
jsonl_string str

Single JSONL string containing tabular records.

required
schema dict

JSON schema as a dictionary.

required
encode Callable[[str], list[int]] | None

Optional tokenizer encode callable. When provided, each matched record string is tokenized and its token count is stored on the corresponding ParsedRecord.

None

Returns:

Type Description
ParsedResponse

A

ParsedResponse
ParsedResponse

whose records list is in input order, with parsed set

ParsedResponse

for valid records and error set for invalid ones.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def extract_and_validate_records(
    jsonl_string: str,
    schema: dict,
    encode: Callable[[str], list[int]] | None = None,
) -> ParsedResponse:
    """Extract and validate records from the given JSONL string.

    Each regex-matched JSON string is tokenized (when *encode* is
    provided) before validation so that exact token counts are
    available for every record regardless of later reclassification.

    Args:
        jsonl_string: Single JSONL string containing tabular records.
        schema: JSON schema as a dictionary.
        encode: Optional tokenizer encode callable.  When provided,
            each matched record string is tokenized and its token count
            is stored on the corresponding
            [`ParsedRecord`][nemo_safe_synthesizer.data_processing.record_utils.ParsedRecord].

    Returns:
        A
        [`ParsedResponse`][nemo_safe_synthesizer.data_processing.record_utils.ParsedResponse]
        whose ``records`` list is in input order, with ``parsed`` set
        for valid records and ``error`` set for invalid ones.
    """
    records: list[ParsedRecord] = []
    tokenization_time = 0.0
    timed = timed_encode(encode)

    for matched_json in extract_records_from_jsonl_string(jsonl_string):
        n_tokens, dt = timed(matched_json)
        tokenization_time += dt

        parsed, error = _parse_and_validate_json(matched_json, schema)
        records.append(ParsedRecord(text=matched_json, parsed=parsed, error=error, token_count=n_tokens))

    return ParsedResponse(records=records, tokenization_time_sec=tokenization_time)

extract_and_validate_timeseries_records(jsonl_string, schema, time_column, interval_seconds, time_format, encode=None)

Extract and validate sequential records with time-interval constraints.

Each regex-matched JSON string is tokenized (when encode is provided) before validation so that exact token counts are captured for both validated and cascade-invalidated records.

Parameters:

Name Type Description Default
jsonl_string str

JSONL string containing series data.

required
schema dict

JSON schema describing the records.

required
time_column str

Column containing the timestamp used for interval validation.

required
interval_seconds int | None

Expected interval in seconds between consecutive timestamps. When None, no interval check is performed.

required
time_format str

Format of the timestamp column (required).

required
encode Callable[[str], list[int]] | None

Optional tokenizer encode callable. When provided, each matched record string is tokenized and its token count is stored on the corresponding ParsedRecord.

None

Returns:

Type Description
ParsedResponse

A

ParsedResponse
ParsedResponse

in input order. Once a record fails, every subsequent record is

ParsedResponse

marked invalid with a cascade error.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def extract_and_validate_timeseries_records(
    jsonl_string: str,
    schema: dict,
    time_column: str,
    interval_seconds: int | None,
    time_format: str,
    encode: Callable[[str], list[int]] | None = None,
) -> ParsedResponse:
    """Extract and validate sequential records with time-interval constraints.

    Each regex-matched JSON string is tokenized (when *encode* is
    provided) before validation so that exact token counts are captured
    for both validated and cascade-invalidated records.

    Args:
        jsonl_string: JSONL string containing series data.
        schema: JSON schema describing the records.
        time_column: Column containing the timestamp used for interval
            validation.
        interval_seconds: Expected interval in seconds between
            consecutive timestamps.  When ``None``, no interval check
            is performed.
        time_format: Format of the timestamp column (required).
        encode: Optional tokenizer encode callable.  When provided,
            each matched record string is tokenized and its token count
            is stored on the corresponding
            [`ParsedRecord`][nemo_safe_synthesizer.data_processing.record_utils.ParsedRecord].

    Returns:
        A
        [`ParsedResponse`][nemo_safe_synthesizer.data_processing.record_utils.ParsedResponse]
        in input order. Once a record fails, every subsequent record is
        marked invalid with a cascade error.
    """
    records: list[ParsedRecord] = []
    tokenization_time = 0.0
    timed = timed_encode(encode)

    last_absolute_seconds: int | None = None
    day_offset = 0

    # Allow rollover only for time-only formats (no date components)
    # If time_format is "elapsed_seconds", treat as time-only (allow rollover)
    date_tokens = ("%Y", "%y", "%m", "%b", "%B", "%d", "%j", "%U", "%W", "%V", "%x", "%c")
    if time_format == "elapsed_seconds":
        allow_rollover = True
    else:
        has_date = any(tok in time_format for tok in date_tokens)
        allow_rollover = not has_date

    all_json_records = list(extract_records_from_jsonl_string(jsonl_string))
    cascade_error = ("Invalid due to previous record error", "TimeSeries")

    for idx, matched_json in enumerate(all_json_records):
        n_tokens, dt = timed(matched_json)
        tokenization_time += dt

        # Step 1: Parse and validate JSON/schema.
        parsed, error = _parse_and_validate_json(matched_json, schema)
        if error or parsed is None:
            records.append(ParsedRecord(text=matched_json, error=error, token_count=n_tokens))
            # Parse/schema errors stop validation without cascading to later records.
            break

        # Step 2: Extract and parse timestamp.
        timestamp_seconds, error = _extract_timestamp_seconds(parsed, time_column, time_format)
        if error or timestamp_seconds is None:
            records.append(ParsedRecord(text=matched_json, error=error, token_count=n_tokens))
            # Missing timestamp stops validation without cascading to later records.
            break

        # Step 3: Validate time interval (if interval_seconds is specified).
        if interval_seconds is not None:
            absolute_seconds, day_offset, error = _validate_time_interval(
                timestamp_seconds,
                last_absolute_seconds,
                day_offset,
                interval_seconds,
                time_column,
                allow_rollover,
            )
            if error:
                records.append(ParsedRecord(text=matched_json, error=error, token_count=n_tokens))
                # Interval errors cascade: mark all remaining records invalid so the
                # caller can report how many were affected.
                for remaining in all_json_records[idx + 1 :]:
                    rem_tokens, rem_dt = timed(remaining)
                    tokenization_time += rem_dt
                    records.append(ParsedRecord(text=remaining, error=cascade_error, token_count=rem_tokens))
                break
            last_absolute_seconds = absolute_seconds

        records.append(ParsedRecord(text=matched_json, parsed=parsed, token_count=n_tokens))

    return ParsedResponse(records=records, tokenization_time_sec=tokenization_time)

normalize_dataframe(dataframe)

Normalize a DataFrame of generated records via a CSV round-trip.

Serializes to CSV and reads back to standardize missing-value representations (NaN/None/NA) across mixed-type columns. Falls back to ignoring encoding errors if the initial round-trip fails.

Parameters:

Name Type Description Default
dataframe DataFrame

DataFrame to normalize.

required

Returns:

Type Description
DataFrame

DataFrame with missing values normalized and invalid UTF-8 characters

DataFrame

dropped.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def normalize_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame:
    """Normalize a DataFrame of generated records via a CSV round-trip.

    Serializes to CSV and reads back to standardize missing-value
    representations (NaN/None/NA) across mixed-type columns. Falls back
    to ignoring encoding errors if the initial round-trip fails.

    Args:
        dataframe: DataFrame to normalize.

    Returns:
        DataFrame with missing values normalized and invalid UTF-8 characters
        dropped.
    """
    # HACK: Handle NaN/None/NA values with mixed types by
    # normalizing through pandas csv io format, which will match
    # the format in reports generated via the nss client.
    try:
        # try without trying to resolve utf-8 issues first
        return pd.read_csv(StringIO(dataframe.to_csv(index=False, quoting=QUOTE_NONNUMERIC)))
    except Exception as exc_info:
        msg = (
            "An exception was raised while normalizing the pandas dataframe with records generated for Safe Synth. "
            "Retrying with flags to ignore encoding errors."
        )
        logger.error(msg, exc_info=exc_info)
        return pd.read_csv(
            StringIO(dataframe.to_csv(index=False, quoting=QUOTE_NONNUMERIC)),
            encoding="utf-8",
            encoding_errors="ignore",
        )

records_to_jsonl(records)

Convert list of records to a JSONL string.

Parameters:

Name Type Description Default
records DataFrame | list[dict] | dict

DataFrame, list of records, or dict.

required

Returns:

Type Description
str

The JSONL string.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def records_to_jsonl(records: pd.DataFrame | list[dict] | dict) -> str:
    """Convert list of records to a JSONL string.

    Args:
        records: DataFrame, list of records, or dict.

    Returns:
        The JSONL string.
    """
    if isinstance(records, pd.DataFrame):
        return records.to_json(orient="records", lines=True, force_ascii=False)
    elif isinstance(records, (list, dict)):
        return pd.DataFrame(records).to_json(orient="records", lines=True, force_ascii=False)
    else:
        raise ValueError(f"Unsupported type: {type(records)}")