Skip to content

timeseries_backend

timeseries_backend

Time-series generation backend with chronological validation.

Classes:

Name Description
ProgressSnapshot

Snapshot configuration for saving partial generation results at progress milestones.

GroupState

Mutable state for tracking a single group during parallel generation.

GroupProcessingResult

Result of processing a generation batch for a single group.

TimeseriesBackend

Time-series aware generator that enforces chronological constraints.

ProgressSnapshot(label, threshold, path, saved=False) dataclass

Snapshot configuration for saving partial generation results at progress milestones.

Attributes:

Name Type Description
label str

Human-readable label for the milestone (e.g. "50").

threshold int

Record or group count that triggers this snapshot.

path Path

File path where the snapshot CSV will be written.

saved bool

Whether this snapshot has already been written to disk.

label instance-attribute

Human-readable label for the milestone (e.g. "50").

threshold instance-attribute

Record or group count that triggers this snapshot.

path instance-attribute

File path where the snapshot CSV will be written.

saved = field(default=False) class-attribute instance-attribute

Whether this snapshot has already been written to disk.

GroupState(group_id, initial_prefill, current_prefill, recent_records=list(), expected_records=0, last_timestamp_seconds=None, low_valid_fraction_count=0, completed=False, failed=False, total_valid_records=0, total_invalid_records=0) dataclass

Mutable state for tracking a single group during parallel generation.

Each group maintains its own sliding-window context, timestamp cursor, and retry counters so that multiple groups can be generated in parallel while tracking progress independently.

Attributes:

Name Type Description
group_id str

Unique identifier for this group (e.g., device ID, customer ID).

initial_prefill str

Original prefill string (first few records) used to seed generation. Preserved for potential resets.

current_prefill str

Current prefill string, updated as generation progresses to include recently generated records.

recent_records list[dict]

Sliding window of recently generated records used to build the next prompt context.

expected_records int

Target record count, calculated from (stop_timestamp - start_timestamp) / interval_seconds.

last_timestamp_seconds int | None

Timestamp (in seconds) of the most recently generated record, used for chronological validation.

low_valid_fraction_count int

Consecutive batches with high invalid fraction. Triggers group failure after patience is exceeded.

completed bool

Whether this group has reached the stop timestamp.

failed bool

Whether this group failed (e.g., too many retries without progress).

total_valid_records int

Cumulative count of valid records generated for this group.

total_invalid_records int

Cumulative count of invalid records generated for this group.

group_id instance-attribute

Unique identifier for this group (e.g., device ID, customer ID).

initial_prefill instance-attribute

Original prefill string (first few records) used to seed generation. Preserved for potential resets.

current_prefill instance-attribute

Current prefill string, updated as generation progresses to include recently generated records.

recent_records = field(default_factory=list) class-attribute instance-attribute

Sliding window of recently generated records used to build the next prompt context.

expected_records = 0 class-attribute instance-attribute

Target record count, calculated from (stop_timestamp - start_timestamp) / interval_seconds.

last_timestamp_seconds = None class-attribute instance-attribute

Timestamp (in seconds) of the most recently generated record, used for chronological validation.

low_valid_fraction_count = 0 class-attribute instance-attribute

Consecutive batches with high invalid fraction. Triggers group failure after patience is exceeded.

completed = False class-attribute instance-attribute

Whether this group has reached the stop timestamp.

failed = False class-attribute instance-attribute

Whether this group failed (e.g., too many retries without progress).

total_valid_records = 0 class-attribute instance-attribute

Cumulative count of valid records generated for this group.

total_invalid_records = 0 class-attribute instance-attribute

Cumulative count of invalid records generated for this group.

GroupProcessingResult

Bases: Enum

Result of processing a generation batch for a single group.

Used by _process_group_result to signal whether a group should remain active, be marked complete, or be removed due to failure.

Attributes:

Name Type Description
IN_PROGRESS

Group continues; batch should be added to the accumulator.

COMPLETED

Group reached the stop timestamp; remove from active processing.

FAILED

Group failed (e.g., too many retries); remove from active, no batch added.

IN_PROGRESS = auto() class-attribute instance-attribute

Group continues; batch should be added to the accumulator.

COMPLETED = auto() class-attribute instance-attribute

Group reached the stop timestamp; remove from active processing.

FAILED = auto() class-attribute instance-attribute

Group failed (e.g., too many retries); remove from active, no batch added.

TimeseriesBackend(config, model_metadata, **kwargs)

Bases: VllmBackend

Time-series aware generator that enforces chronological constraints.

This backend extends VllmBackend to generate synthetic time-series data with strict chronological ordering. It uses a sliding window approach where recently generated records are used as context (prefill) for subsequent generation, ensuring temporal continuity.

Key Concepts
  • Time-Range Based Generation: The number of records generated is determined by the configured time range and interval, not by a target count. Specifically: (stop_timestamp - start_timestamp) / interval_seconds. The config.generation.num_records parameter is used only for progress tracking, not to limit output.
  • Sliding Window: The backend maintains a window of recent records (controlled by _prefill_context_size) that are included in each prompt to provide context for the LLM, ensuring generated records follow the established patterns and timestamps.
  • Parallel Group Generation: Multiple time-series groups (e.g., different devices, customers) are processed in parallel batches for efficiency. Even single-sequence data uses this path (treated as 1 group via a pseudo-group column added during preprocessing). Groups are the same as those seen during training (from model_metadata.initial_prefill).
  • Chronological Validation: Each generated record must continue from the previous timestamp at the expected interval. Out-of-order records are marked invalid.

Generation Flow (parallel group mode): 1. Initialize GroupState for each group with its prefill context 2. While groups remain pending or active: a. Fill active slots with pending groups (up to max_groups_per_batch) b. Build prompts for all active groups using their current prefill c. Generate completions for all prompts in a single LLM batch call d. Process LLM outputs into per-group Batch objects e. For each group: - Validate chronological order against group's last timestamp - Retain the response with the most valid records (discard others) - Update group state with new records (prefill, last_timestamp) - Check if stop timestamp was reached (marks group complete) - Track low valid fraction; fail group after max retries f. Remove completed/failed groups from active list g. Save progress snapshots if thresholds are met h. Log per-group progress summary

Stopping Conditions

Generation stops when all groups finish (either completed or failed). Individual groups and the overall generation can stop for different reasons:

Per-Group Stopping: - Completion (success): A group completes when any generated record has a timestamp >= _stop_timestamp_value. The group is marked as completed and removed from active processing. - Failure (low valid fraction): A group fails after config.generation.patience consecutive batches where the invalid record fraction >= config.generation.invalid_fraction_threshold. This prevents infinite loops when the model consistently produces bad output for a particular group. Failed groups are not retried and produce no synthetic data for that group ID. The failure is reflected in all_groups_succeeded returning False.

Global Stopping: - Natural completion: Generation ends when both the pending groups queue and active groups list are empty (all groups processed). - No records: If GenerationBatches detects too many consecutive batches with no valid records globally, it signals STOP_NO_RECORDS. - Target reached: If the target number of records is reached, GenerationBatches signals STOP_METRIC_REACHED.

When global stopping occurs before all groups complete, all_groups_succeeded returns False, and the final generation status reflects partial completion.

Attributes:

Name Type Description
_schema_fragment str

JSON schema template with column placeholders, e.g., '"col1":,"col2":'. Used in prompt formatting.

_samples_per_prompt int

Number of completion samples to generate per prompt. Multiple samples increase chances of getting valid records. Default: 5.

_max_prompts_per_batch int

Maximum number of prompts to include in a single LLM generation call. Controls parallelism. Default: 100.

_prefill_context_size int

Number of recent records to include in the sliding window prefill context. Default: 3.

_time_column str

Name of the timestamp column in the data.

_time_format str

Format string for parsing timestamps (strptime format), or "elapsed_seconds" for numeric elapsed time.

_is_elapsed_time bool

True if timestamps are numeric elapsed seconds.

_start_timestamp_value

Starting timestamp for generation range.

_stop_timestamp_value

Ending timestamp for generation range. Generation stops when a record reaches or exceeds this timestamp.

_timestamp_interval_seconds int | None

Expected interval between consecutive timestamps. Used for chronological validation.

_group_column str | None

Column name used to group time-series data. If None or PSEUDO_GROUP_COLUMN, treated as single-sequence.

_group_prefills dict[str, str]

Mapping of group_id -> initial prefill string. Prefills are the first few records from training data used to seed generation for each group.

_groups list[str]

List of all group IDs to generate.

Methods:

Name Description
generate

Generate time-series tabular data using Nemo Safe Synthesizer.

Source code in src/nemo_safe_synthesizer/generation/timeseries_backend.py
def __init__(self, config: SafeSynthesizerParameters, model_metadata: ModelMetadata, **kwargs):
    super().__init__(config, model_metadata, **kwargs)

    self._schema_fragment = ",".join([f'"{c}":<unk>' for c in self.columns])
    self._samples_per_prompt = 5  # num of samples per prompt
    self._max_prompts_per_batch = 100  # max prompts per batch for parallel group generation
    self._prefill_context_size = 3  # number of records to prefill
    self._time_column = config.time_series.timestamp_column
    self._time_format = config.time_series.timestamp_format
    self._is_elapsed_time = self._time_format == "elapsed_seconds"
    self._start_timestamp_value = config.time_series.start_timestamp
    self._stop_timestamp_value = config.time_series.stop_timestamp
    self._timestamp_interval_seconds = config.time_series.timestamp_interval_seconds

    # Grouped generation support
    # Note: Since time series preprocessing adds a pseudo-group column when no group
    # is specified, we always have grouped mode (even single-sequence is 1 group).
    self._group_column = config.data.group_training_examples_by
    initial_prefill_value = self.model_metadata.initial_prefill

    if not isinstance(initial_prefill_value, dict):
        raise ValueError(
            "TimeseriesBackend requires initial_prefill to be a dict mapping group -> prefill string. "
            "This should be set by SequentialExampleAssembler during training."
        )

    # Prefills is a dict mapping group -> prefill string
    self._group_prefills: dict[str, str] = initial_prefill_value
    self._groups: list[str] = list(self._group_prefills.keys())

generate(data_actions_fn=None)

Generate time-series tabular data using Nemo Safe Synthesizer.

All time series are processed as groups (single-sequence is treated as 1 group via pseudo-group column added during preprocessing).

Note

Generation is time-range based, not count-based. The number of records generated is determined by (stop_timestamp - start_timestamp) / interval_seconds for each group. The config.generation.num_records parameter is used for progress tracking but does not limit output. Groups are the same as those seen during training (from model_metadata.initial_prefill).

Parameters:

Name Type Description Default
data_actions_fn DataActionsFn | None

Optional function that takes a DataFrame and returns a modified DataFrame.

None

Returns:

Type Description
GenerateJobResults

Generation results object, which includes a DataFrame of generated records.

Source code in src/nemo_safe_synthesizer/generation/timeseries_backend.py
def generate(
    self,
    data_actions_fn: utils.DataActionsFn | None = None,
) -> GenerateJobResults:
    """Generate time-series tabular data using Nemo Safe Synthesizer.

    All time series are processed as groups (single-sequence is treated as 1 group
    via pseudo-group column added during preprocessing).

    Note:
        Generation is time-range based, not count-based. The number of records
        generated is determined by (stop_timestamp - start_timestamp) / interval_seconds
        for each group. The config.generation.num_records parameter is used for
        progress tracking but does not limit output. Groups are the same as those
        seen during training (from model_metadata.initial_prefill).

    Args:
        data_actions_fn: Optional function that takes a DataFrame and returns a modified DataFrame.

    Returns:
        Generation results object, which includes a DataFrame of generated records.
    """
    generation_start = time.monotonic()
    num_records = self.config.generation.num_records

    sampling_params = SamplingParams(
        temperature=self.config.generation.temperature,
        repetition_penalty=self.config.generation.repetition_penalty,
        top_p=self.config.generation.top_p,
        top_k=FIXED_RUNTIME_GENERATE_ARGS["top_k"],
        min_p=FIXED_RUNTIME_GENERATE_ARGS["min_p"],
        max_tokens=self.model_metadata.max_seq_length,
        skip_special_tokens=True,
        include_stop_str_in_output=False,
        ignore_eos=False,
    )

    batches = GenerationBatches(
        target_num_records=num_records,
        patience=self.config.generation.patience,
        invalid_fraction_threshold=self.config.generation.invalid_fraction_threshold,
        data_actions_fn=data_actions_fn,
    )

    # Use parallel group generation (single-sequence is just 1 group)
    num_groups = len(self._groups)

    # Compute total expected records across all groups for snapshot thresholds
    total_expected_records = self._compute_total_expected_records()
    progress_snapshots = self._build_progress_snapshots(total_expected_records, is_group_based=False)

    logger.info(
        f"Generating for {num_groups} groups using parallel generation "
        f"(total expected records: {total_expected_records})",
    )
    all_groups_completed = self._generate_parallel_groups(
        batches=batches,
        sampling_params=sampling_params,
        progress_snapshots=progress_snapshots,
    )

    if all_groups_completed and batches.status == GenerationStatus.IN_PROGRESS:
        batches.status = GenerationStatus.COMPLETE

    batches.job_complete()
    batches.log_status()

    generation_time_sec = time.monotonic() - generation_start
    self.elapsed_time = generation_time_sec
    self.gen_results = GenerateJobResults.from_batches(
        batches=batches,
        columns=self.columns,
        max_num_records=None,  # Time-range based, not count-based
        elapsed_time=self.elapsed_time,
    )

    # Sort by group and timestamp for consistent output (also removes pseudo-group column)
    self.gen_results.df = self._sort_dataframe(self.gen_results.df)

    return self.gen_results