Skip to content

record_utils

record_utils

Utilities for extracting, validating, and converting JSONL records.

Provides regex-based JSONL extraction, JSON-schema validation (including time-series interval checks), DataFrame normalization, and JSONL serialization.

Functions:

Name Description
is_safe_for_float_conversion

Check if a value can be safely converted to float64 without overflow.

check_record_for_large_numbers

Check if a record contains any numbers that would cause float64 overflow.

check_if_records_are_ordered

Check if the records are in ascending order based on the given order_by column.

extract_records_from_jsonl_string

Extract and return tabular records from the given JSONL string.

extract_groups_from_jsonl_string

Extract groups of records from the given JSONL string.

extract_and_validate_records

Extract and validate records from the given JSONL string.

extract_and_validate_timeseries_records

Extract and validate sequential records with enforced time interval constraints.

normalize_dataframe

Normalize a DataFrame of generated records via a CSV round-trip.

records_to_jsonl

Convert list of records to a JSONL string.

is_safe_for_float_conversion(value)

Check if a value can be safely converted to float64 without overflow.

Only int values can cause overflow; all other types are considered safe.

Parameters:

Name Type Description Default
value str | int | float | None | list | dict

The value to check.

required

Returns:

Type Description
bool

True if the value can be safely converted to float64, False otherwise.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def is_safe_for_float_conversion(value: str | int | float | None | list | dict) -> bool:
    """Check if a value can be safely converted to float64 without overflow.

    Only ``int`` values can cause overflow; all other types are considered safe.

    Args:
        value: The value to check.

    Returns:
        True if the value can be safely converted to float64, False otherwise.
    """
    # not considering Decimal because the input of this validation
    # is coming from converting a jsonl string to JSON object.
    # JSON object only supports int or float for numeric numbers

    # only int could have overflow error
    if isinstance(value, int):
        try:
            float(value)
            return True
        except (OverflowError, ValueError):
            return False
    return True

check_record_for_large_numbers(record)

Check if a record contains any numbers that would cause float64 overflow.

Parameters:

Name Type Description Default
record dict

Dictionary of field names to values.

required

Returns:

Type Description
str | None

An error message describing the first unsafe value found,

str | None

or None if all values are safe.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def check_record_for_large_numbers(record: dict) -> str | None:
    """Check if a record contains any numbers that would cause float64 overflow.

    Args:
        record: Dictionary of field names to values.

    Returns:
        An error message describing the first unsafe value found,
        or None if all values are safe.
    """
    for key, value in record.items():
        if not is_safe_for_float_conversion(value):
            # If a column contains a value that is too large to convert to float64,
            # then the entire record is invalid
            return f"Value {value} in field '{key}' is too large to convert to float64"

    return None

check_if_records_are_ordered(records, order_by)

Check if the records are in ascending order based on the given order_by column.

Parameters:

Name Type Description Default
records list[dict]

List of of JSONL records.

required
order_by str

Column to check for ordering.

required

Returns:

Type Description
bool

True if the records are ordered by the given column, otherwise False.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def check_if_records_are_ordered(records: list[dict], order_by: str) -> bool:
    """Check if the records are in ascending order based on the given `order_by` column.

    Args:
        records: List of of JSONL records.
        order_by: Column to check for ordering.

    Returns:
        True if the records are ordered by the given column, otherwise False.
    """
    order_by_values = [rec[order_by] for rec in records]
    sorted_values = sorted([rec[order_by] for rec in records])
    return order_by_values == sorted_values

extract_records_from_jsonl_string(jsonl_string)

Extract and return tabular records from the given JSONL string.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def extract_records_from_jsonl_string(jsonl_string: str) -> list[str]:
    """Extract and return tabular records from the given JSONL string."""
    return re.findall(RECORD_REGEX_PATTEN_LOOKAHEAD, jsonl_string)

extract_groups_from_jsonl_string(jsonl_string, bos, eos)

Extract groups of records from the given JSONL string.

This function assumes that the complete group of records is enclosed by the given beginning-of-sequence (bos) and end-of-sequence (eos) tokens.

Parameters:

Name Type Description Default
jsonl_string str

Single JSONL string containing grouped tabular records.

required
bos str

Beginning-of-sequence token used to identify the start of a group.

required
eos str

End-of-sequence token used to identify the end of a group.

required

Returns:

Type Description
list[str]

Substrings matching complete bos/eos-delimited record groups.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def extract_groups_from_jsonl_string(jsonl_string: str, bos: str, eos: str) -> list[str]:
    """Extract groups of records from the given JSONL string.

    This function assumes that the complete group of records
    is enclosed by the given beginning-of-sequence (bos) and
    end-of-sequence (eos) tokens.

    Args:
        jsonl_string: Single JSONL string containing grouped tabular records.
        bos: Beginning-of-sequence token used to identify the start of a group.
        eos: End-of-sequence token used to identify the end of a group.

    Returns:
        Substrings matching complete bos/eos-delimited record groups.
    """
    bos_re = re.escape(rf"{bos}")
    eos_re = re.escape(rf"{eos}")
    return re.findall(rf"{bos_re}\s?(?:{RECORD_REGEX_PATTERN}\s?)+\s?{eos_re}", jsonl_string)

extract_and_validate_records(jsonl_string, schema)

Extract and validate records from the given JSONL string.

The records are validated against the given schema using jsonschema.

Parameters:

Name Type Description Default
jsonl_string str

Single JSONL string containing tabular records.

required
schema dict

JSON schema as a dictionary.

required

Returns:

Name Type Description
valid_records list[dict]

List of valid records.

invalid_records list[str]

List of invalid records.

invalid_record_errors list[tuple[str, str]]

List of errors for invalid records, each a (message, validator) tuple.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def extract_and_validate_records(
    jsonl_string: str, schema: dict
) -> tuple[list[dict], list[str], list[tuple[str, str]]]:
    """Extract and validate records from the given JSONL string.

    The records are validated against the given schema using jsonschema.

    Args:
        jsonl_string: Single JSONL string containing tabular records.
        schema: JSON schema as a dictionary.

    Returns:
        valid_records (list[dict]): List of valid records.
        invalid_records (list[str]): List of invalid records.
        invalid_record_errors (list[tuple[str, str]]): List of errors for invalid records, each a (message, validator) tuple.
    """
    valid_records = []
    invalid_records = []
    invalid_record_errors = []

    for matched_json in extract_records_from_jsonl_string(jsonl_string):
        matched_dict, error = _parse_and_validate_json(matched_json, schema)
        if error:
            invalid_records.append(matched_json)
            invalid_record_errors.append(error)
        else:
            valid_records.append(matched_dict)

    return valid_records, invalid_records, invalid_record_errors

extract_and_validate_timeseries_records(jsonl_string, schema, time_column, interval_seconds, time_format)

Extract and validate sequential records with enforced time interval constraints.

Parameters:

Name Type Description Default
jsonl_string str

JSONL string containing series data.

required
schema dict

JSON schema describing the records.

required
time_column str

Column containing the timestamp used for interval validation.

required
interval_seconds int | None

(Optional) Expected interval in seconds between consecutive timestamps. If not provided, no time interval validation is performed.

required
time_format str

Format of the timestamp column (required, should be set from config).

required

Returns:

Type Description
tuple[list[dict], list[str], list[tuple[str, str]]]

Tuple of valid records, invalid record strings, and their associated errors.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def extract_and_validate_timeseries_records(
    jsonl_string: str,
    schema: dict,
    time_column: str,
    interval_seconds: int | None,
    time_format: str,
) -> tuple[list[dict], list[str], list[tuple[str, str]]]:
    """Extract and validate sequential records with enforced time interval constraints.

    Args:
        jsonl_string: JSONL string containing series data.
        schema: JSON schema describing the records.
        time_column: Column containing the timestamp used for interval validation.
        interval_seconds: (Optional) Expected interval in seconds between consecutive
            timestamps. If not provided, no time interval validation is performed.
        time_format: Format of the timestamp column (required, should be set from config).

    Returns:
        Tuple of valid records, invalid record strings, and their associated errors.
    """
    valid_records: list[dict] = []
    invalid_records: list[str] = []
    invalid_record_errors: list[tuple[str, str]] = []

    last_absolute_seconds: int | None = None
    day_offset = 0

    # Allow rollover only for time-only formats (no date components)
    # If time_format is "elapsed_seconds", treat as time-only (allow rollover)
    date_tokens = ("%Y", "%y", "%m", "%b", "%B", "%d", "%j", "%U", "%W", "%V", "%x", "%c")
    if time_format == "elapsed_seconds":
        allow_rollover = True
    else:
        has_date = any(tok in time_format for tok in date_tokens)
        allow_rollover = not has_date

    all_json_records = list(extract_records_from_jsonl_string(jsonl_string))

    for idx, matched_json in enumerate(all_json_records):
        # Step 1: Parse and validate JSON/schema
        matched_dict, error = _parse_and_validate_json(matched_json, schema)
        if error or matched_dict is None:
            invalid_records.append(matched_json)
            if error:
                invalid_record_errors.append(error)
            break

        # Step 2: Extract and parse timestamp
        timestamp_seconds, error = _extract_timestamp_seconds(matched_dict, time_column, time_format)
        if error or timestamp_seconds is None:
            invalid_records.append(matched_json)
            if error:
                invalid_record_errors.append(error)
            break

        # Step 3: Validate time interval (if interval_seconds is specified)
        if interval_seconds is not None:
            absolute_seconds, day_offset, error = _validate_time_interval(
                timestamp_seconds,
                last_absolute_seconds,
                day_offset,
                interval_seconds,
                time_column,
                allow_rollover,
            )
            if error:
                # Mark current record with the specific error, and remaining records with cascade error
                invalid_records.append(matched_json)
                invalid_record_errors.append(error)
                # Mark remaining records (after current) as invalid due to previous error
                remaining_records = all_json_records[idx + 1 :]
                cascade_error = ("Invalid due to previous record error", "TimeSeries")
                invalid_records.extend(remaining_records)
                invalid_record_errors.extend([cascade_error] * len(remaining_records))
                break
            last_absolute_seconds = absolute_seconds

        valid_records.append(matched_dict)

    return valid_records, invalid_records, invalid_record_errors

normalize_dataframe(dataframe)

Normalize a DataFrame of generated records via a CSV round-trip.

Serializes to CSV and reads back to standardize missing-value representations (NaN/None/NA) across mixed-type columns. Falls back to ignoring encoding errors if the initial round-trip fails.

Parameters:

Name Type Description Default
dataframe DataFrame

DataFrame to normalize.

required

Returns:

Type Description
DataFrame

DataFrame with missing values normalized and invalid UTF-8 characters

DataFrame

dropped.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def normalize_dataframe(dataframe: pd.DataFrame) -> pd.DataFrame:
    """Normalize a DataFrame of generated records via a CSV round-trip.

    Serializes to CSV and reads back to standardize missing-value
    representations (NaN/None/NA) across mixed-type columns. Falls back
    to ignoring encoding errors if the initial round-trip fails.

    Args:
        dataframe: DataFrame to normalize.

    Returns:
        DataFrame with missing values normalized and invalid UTF-8 characters
        dropped.
    """
    # HACK: Handle NaN/None/NA values with mixed types by
    # normalizing through pandas csv io format, which will match
    # the format in reports generated via the nss client.
    try:
        # try without trying to resolve utf-8 issues first
        return pd.read_csv(StringIO(dataframe.to_csv(index=False, quoting=QUOTE_NONNUMERIC)))
    except Exception as exc_info:
        msg = (
            "An exception was raised while normalizing the pandas dataframe with records generated for Safe Synth. "
            "Retrying with flags to ignore encoding errors."
        )
        logger.error(msg, exc_info=exc_info)
        return pd.read_csv(
            StringIO(dataframe.to_csv(index=False, quoting=QUOTE_NONNUMERIC)),
            encoding="utf-8",
            encoding_errors="ignore",
        )

records_to_jsonl(records)

Convert list of records to a JSONL string.

Parameters:

Name Type Description Default
records DataFrame | list[dict] | dict

DataFrame, list of records, or dict.

required

Returns:

Type Description
str

The JSONL string.

Source code in src/nemo_safe_synthesizer/data_processing/record_utils.py
def records_to_jsonl(records: pd.DataFrame | list[dict] | dict) -> str:
    """Convert list of records to a JSONL string.

    Args:
        records: DataFrame, list of records, or dict.

    Returns:
        The JSONL string.
    """
    if isinstance(records, pd.DataFrame):
        return records.to_json(orient="records", lines=True, force_ascii=False)
    elif isinstance(records, (list, dict)):
        return pd.DataFrame(records).to_json(orient="records", lines=True, force_ascii=False)
    else:
        raise ValueError(f"Unsupported type: {type(records)}")