Skip to content

library_builder

library_builder

Executable pipeline for Safe Synthesizer.

Classes:

Name Description
SafeSynthesizer

Fluent builder and runner for Safe Synthesizer workflows.

SafeSynthesizer(config=None, workdir=None, save_path=None, emit_telemetry=None, deployment_type=None)

Bases: ConfigBuilder

Fluent builder and runner for Safe Synthesizer workflows.

Extends ConfigBuilder with artifact management and stepwise pipeline execution. Run all at once via run(), or step by step::

builder = SafeSynthesizer().with_data_source(df)
builder.process_data().train().generate().evaluate()
builder.save_results()
results = builder.results

train() uses HuggingFaceBackend. generate() chooses TimeseriesBackend when config.time_series.is_timeseries is true and VllmBackend otherwise. Stepwise callers must call save_results() themselves after evaluate(); run() does this automatically.

Parameters:

Name Type Description Default
config SafeSynthesizerParameters | None

Optional pre-built parameters that seed every config section.

None
workdir Workdir | None

Explicit artifact directory layout. When None a default Workdir is created under save_path.

None
save_path Path | str | None

Root directory for artifacts when workdir is not provided. Defaults to "safe-synthesizer-artifacts".

None

Example::

builder = (
    SafeSynthesizer()
    .with_data_source(df)
    .with_replace_pii()
    .with_train(learning_rate=0.0001)
    .with_generate(num_records=10000)
)
builder.run()
results = builder.results

Methods:

Name Description
load_from_save_path

Load the Safe Synthesizer configuration from the save path.

process_data

Perform train/test split, auto-config resolution, and optional PII replacement.

train

Fine-tune the base model on the processed training data.

generate

Generate synthetic data using the trained model.

evaluate

Run quality and privacy evaluations and populate results.

run

Run the full pipeline and save results.

save_results

Save synthetic data, evaluation report, and metrics to the workdir.

Attributes:

Name Type Description
trainer TrainingBackend

Training backend instance, populated after train().

generator GeneratorBackend

Generation backend instance, populated after generate().

evaluator Evaluator

Evaluator instance, populated after evaluate().

results SafeSynthesizerResults

Final pipeline results, populated after evaluate() or run().

Source code in src/nemo_safe_synthesizer/sdk/library_builder.py
def __init__(
    self,
    config: SafeSynthesizerParameters | None = None,
    workdir: Workdir | None = None,
    save_path: Path | str | None = None,
    emit_telemetry: bool | None = None,
    deployment_type: DeploymentTypeEnum | None = None,
):
    super().__init__(config=config)
    self._workdir = workdir
    if self._workdir is None:
        # Create a default workdir when none provided
        # Use "default" for config_name and "data" for dataset_name as fallbacks
        self._workdir = Workdir(
            base_path=Path(save_path) if save_path else Path("safe-synthesizer-artifacts"),
            config_name="default",
            dataset_name="data",
        )
    # Initialize state for pipeline stages
    self._training_df: pd.DataFrame | None = (
        None  # The active training df that might go through transformation, eg. pii replacement
    )
    self._original_training_df: pd.DataFrame | None = (
        None  # The original training df that we save for evaluation at the end
    )
    self._test_df: pd.DataFrame | None = None
    self._column_statistics: dict | None = None
    self._pii_replacer_time: float | None = None
    self._llm_metadata: ModelMetadata | None = None
    self._total_start: float | None = None
    self._loaded_from_save_path: bool = False
    self.preflight_report: PreflightReport | None = None
    self._data_processed: bool = False
    self._preflight_config_path: Path | None = None
    self._emit_telemetry: bool = emit_telemetry if emit_telemetry is not None else self._config_emit_telemetry()
    self._deployment_type: DeploymentTypeEnum = (
        deployment_type if deployment_type is not None else _deployment_type()
    )

trainer instance-attribute

Training backend instance, populated after train().

generator instance-attribute

Generation backend instance, populated after generate().

evaluator instance-attribute

Evaluator instance, populated after evaluate().

results instance-attribute

Final pipeline results, populated after evaluate() or run().

load_from_save_path()

Load the Safe Synthesizer configuration from the save path.

Loads the configuration from the source run directory's config file. When resuming from a trained model for generation, the source paths point to the parent workdir that contains the trained adapter.

Always prefers cached train/test splits from the training run to ensure evaluation metrics are consistent and privacy guarantees are maintained. Falls back to with_data_source() data only if cached files are missing.

Returns:

Type Description
SafeSynthesizer

Self for method chaining.

Source code in src/nemo_safe_synthesizer/sdk/library_builder.py
@traced("SafeSynthesizer.load_from_save_path", category=LogCategory.RUNTIME)
def load_from_save_path(self) -> SafeSynthesizer:
    """Load the Safe Synthesizer configuration from the save path.

    Loads the configuration from the source run directory's config file.
    When resuming from a trained model for generation, the source paths
    point to the parent workdir that contains the trained adapter.

    Always prefers cached train/test splits from the training run to ensure
    evaluation metrics are consistent and privacy guarantees are maintained.
    Falls back to with_data_source() data only if cached files are missing.

    Returns:
        Self for method chaining.
    """
    self._ensure_observability()
    assert self._workdir is not None
    # Use source paths which point to parent workdir when resuming for generation
    config_file = self._workdir.source_config

    self._nss_config = SafeSynthesizerParameters.from_json(config_file)

    # Load model metadata from saved file (contains initial_prefill for timeseries)
    # rather than creating new metadata from config
    metadata_file = self._workdir.metadata_file
    if not metadata_file.exists():
        raise FileNotFoundError(f"Metadata file not found: {metadata_file}")
    logger.info(f"Loading model metadata from: {metadata_file}")
    self._llm_metadata = ModelMetadata.from_metadata_json(metadata_file, workdir=self._workdir)

    # Always prefer cached train/test splits to preserve the exact split from training.
    # This ensures evaluation metrics are consistent and privacy guarantees are maintained.
    # Only fall back to with_data_source() data if cached files are missing.
    training_path = self._workdir.source_dataset.training
    test_path = self._workdir.source_dataset.test
    assert isinstance(training_path, Path) and isinstance(test_path, Path)
    if training_path.exists():
        logger.info("Loading cached train/test split from training run")
        # training_path persists the original training split for evaluation.
        self._original_training_df = pd.read_csv(training_path)
        # test.csv may not exist (holdout=0) or may be empty (old runs with holdout=0).
        if test_path.exists() and test_path.stat().st_size > 0:
            self._test_df = pd.read_csv(test_path)
        else:
            logger.info("No test split loaded (holdout was disabled for this run)")
            self._test_df = None
        # Mark that we have fully loaded from the saved run, including cached splits.
        self._loaded_from_save_path = True
    elif self._data_source is not None:
        logger.warning(
            "Cached dataset not found, will use provided data source. "
            "Note: A new train/test split will be created which may differ from the original training split."
        )
        # process_data() will handle the split using self._data_source
    else:
        raise ValueError(
            "Cached train/test split not found and no data source provided. "
            "Call with_data_source() before load_from_save_path(), or ensure the cached dataset exists."
        )
    return self

process_data(check_only=False)

Perform train/test split, auto-config resolution, and optional PII replacement.

Validates configured grouping/ordering columns against the input dataset, splits the data via Holdout, runs AutoConfigResolver to resolve "auto" parameters, applies PII replacement to the training set when enabled, and persists the splits to the workdir.

When check_only is True (the --validate path), PII replacement is intentionally skipped and CSV writes are elided; a resolved config YAML is written instead. Preflight therefore sees the pre-replacement training split, which is a known gap: PII replacement can change token lengths, so a clean --validate does not guarantee a full run will pass token-budget checks. See the "--validate is best-effort" callout in docs/user-guide/running.md.

Parameters:

Name Type Description Default
check_only bool

If True, run preflight checks only (validation mode).

False

Returns:

Type Description
SafeSynthesizer

Self for method chaining.

Source code in src/nemo_safe_synthesizer/sdk/library_builder.py
@traced("SafeSynthesizer.process_data", category=LogCategory.RUNTIME)
def process_data(self, check_only: bool = False) -> SafeSynthesizer:
    """Perform train/test split, auto-config resolution, and optional PII replacement.

    Validates configured grouping/ordering columns against the input
    dataset, splits the data via ``Holdout``, runs
    ``AutoConfigResolver`` to resolve ``"auto"`` parameters, applies
    PII replacement to the training set when enabled, and persists the
    splits to the workdir.

    When ``check_only`` is ``True`` (the ``--validate`` path), PII
    replacement is intentionally skipped and CSV writes are elided; a
    resolved config YAML is written instead. Preflight therefore sees
    the *pre-replacement* training split, which is a known gap: PII
    replacement can change token lengths, so a clean ``--validate``
    does not guarantee a full run will pass token-budget checks. See
    the "``--validate`` is best-effort" callout in
    ``docs/user-guide/running.md``.

    Args:
        check_only: If ``True``, run preflight checks only (validation mode).

    Returns:
        Self for method chaining.
    """
    self._total_start = time.monotonic()
    if not os.environ.get("NSS_PHASE"):
        os.environ["NSS_PHASE"] = "process_data"

    self._ensure_observability()

    if self._loaded_from_save_path or getattr(self, "_data_processed", False):
        # Resume path or already-processed data in this builder instance; nothing to do.
        return self

    self._resolve_nss_config()
    self._resolve_datasource()

    if TYPE_CHECKING:
        assert self._nss_config is not None
        assert isinstance(self._data_source, pd.DataFrame)

    # Run the config/dataframe stages before holdout so invalid column
    # settings produce structured preflight issues instead of downstream
    # pandas/sklearn errors. The later full preflight run still uses the
    # final training split and real metadata for split-dependent checks.
    preflight = run_preflight(
        self._data_source,
        self._nss_config,
        ModelMetadata.stub(self._nss_config),
        stages=frozenset({PreflightStage.CONFIG, PreflightStage.DATAFRAME}),
    )
    self.preflight_report = preflight
    if preflight.errors:
        summary = "\n".join(f"  {e.code}: {e.message}" for e in preflight.errors)
        raise ParameterError(f"Pre-flight check failed with {len(preflight.errors)} error(s):\n{summary}")

    holdout = Holdout(self._nss_config)
    original_training_df, self._test_df = holdout.train_test_split(self._data_source)

    self._original_training_df = (
        original_training_df  # The original training df that we use for evaluation at the end
    )
    self._training_df = original_training_df  # The active training df that might go through transformation
    self._column_statistics = None

    resolver = AutoConfigResolver(self._training_df, self._nss_config)
    resolved_config = resolver()
    self._nss_config = resolved_config

    # PII replacement is skipped on the validate path (``check_only=True``).
    # Rationale: the replacer makes network calls to the PII classifier
    # and can take minutes on large datasets -- incompatible with the
    # fast fail-fast semantics of ``--validate``. The consequence is
    # that preflight sees the pre-replacement training split; replacement
    # text can shift token lengths, so ``--validate`` is documented as
    # best-effort rather than a guarantee (see user-guide/running.md).
    if not check_only and self._nss_config.replace_pii is not None:
        replacer = NemoPII(self._nss_config.replace_pii)
        replacer.transform_df(original_training_df)
        assert replacer.result is not None
        self._training_df = replacer.result.transformed_df
        self._column_statistics = replacer.result.column_statistics
        self._pii_replacer_time = replacer.elapsed_time
        # We explicitly do not replace PII in the test set so that the
        # privacy metrics are valid.

    # Only create new metadata if not already loaded (e.g., from load_from_save_path)
    metadata_for_preflight = self._llm_metadata
    if metadata_for_preflight is None:
        if check_only:
            try:
                metadata_for_preflight = ModelMetadata.from_config(self._nss_config, workdir=self._workdir)
                self._llm_metadata = metadata_for_preflight
            except Exception:
                logger.user.warning(
                    "Could not load model metadata (network/cache); token budget checks will be skipped."
                )
                metadata_for_preflight = ModelMetadata.stub(self._nss_config)
        else:
            metadata_for_preflight = ModelMetadata.from_config(self._nss_config, workdir=self._workdir)
            self._llm_metadata = metadata_for_preflight

    # Persist the resolved config before running preflight so that on
    # preflight failure the CLI error report can still point at the
    # config YAML.  ``_preflight_config_path`` is set here (not after
    # ``run_preflight``) so the error path has a valid location.
    if check_only:
        assert self._workdir is not None
        self._workdir.ensure_directories()
        config_path = self._workdir.run_dir / "safe-synthesizer-config.yaml"
        self._nss_config.to_yaml(config_path, exclude_unset=False)
        self._preflight_config_path = config_path

    preflight = run_preflight(self._training_df, self._nss_config, metadata_for_preflight)
    self.preflight_report = preflight
    for issue in preflight.warnings:
        logger.user.warning(issue.message, extra={"preflight_code": issue.code, "preflight_check": issue.check})
    if preflight.errors:
        summary = "\n".join(f"  {e.code}: {e.message}" for e in preflight.errors)
        raise ParameterError(f"Pre-flight check failed with {len(preflight.errors)} error(s):\n{summary}")

    # If we're in check-only mode, we don't need to process the data further and we'll end the program.
    # ``_data_processed`` is intentionally *not* set here: the validate →
    # full-run pattern calls ``process_data(check_only=True)`` followed
    # by ``process_data()`` on the same instance, and the second call
    # must rebuild real metadata and apply PII replacement (see
    # ``TestProcessDataMetadataLifecycle.test_check_only_stub_metadata_not_persisted_for_followup_run``).
    # Callers who repeat ``process_data(check_only=True)`` pay the
    # (cheap) preflight cost twice on purpose.
    if check_only:
        return self

    self._data_processed = True

    # Always persist the original training split -- this is the version
    # reloaded by load_from_save_path and used for evaluation metrics.
    assert self._workdir is not None
    self._workdir.ensure_directories()
    # ``training.csv`` is the canonical persisted original training split.
    self._original_training_df.to_csv(self._workdir.dataset.training, index=False)
    if not self._training_df.equals(self._original_training_df):
        # The transformed (e.g. PII-replaced) training data is saved for
        # inspection only -- we don't need it in the generation or evaluation phase.
        self._training_df.to_csv(self._workdir.dataset.transformed_training, index=False)
    if self._test_df is not None:
        self._test_df.to_csv(self._workdir.dataset.test, index=False)
    return self

train()

Fine-tune the base model on the processed training data.

Creates the HuggingFace training backend, loads the base model, and runs fine-tuning. Requires process_data() to have been called first.

Returns:

Type Description
SafeSynthesizer

Self for method chaining.

Raises:

Type Description
RuntimeError

If called after load_from_save_path() or before process_data().

Source code in src/nemo_safe_synthesizer/sdk/library_builder.py
@traced("SafeSynthesizer.train", category=LogCategory.RUNTIME)
def train(self) -> SafeSynthesizer:
    """Fine-tune the base model on the processed training data.

    Creates the HuggingFace training backend, loads the base model,
    and runs fine-tuning.  Requires ``process_data()`` to have been
    called first.

    Returns:
        Self for method chaining.

    Raises:
        RuntimeError: If called after ``load_from_save_path()`` or
            before ``process_data()``.
    """
    if self._loaded_from_save_path:
        raise RuntimeError(
            "train() cannot be called after load_from_save_path(). "
            "The resume path is for generation and evaluation only: "
            ".load_from_save_path().generate().evaluate()"
        )

    # these are for ty
    if TYPE_CHECKING:
        assert self._training_df is not None
        assert self._nss_config is not None
        assert self._llm_metadata is not None

    if self._total_start is None:
        self._total_start = time.monotonic()
    if not os.environ.get("NSS_PHASE"):
        os.environ["NSS_PHASE"] = "train"

    self.trainer = HuggingFaceBackend(
        params=self._nss_config,
        model_metadata=self._llm_metadata,
        training_dataset=Dataset.from_pandas(self._training_df),
        action_executor=None,
        verbose_logging=True,
        maybe_split_dataset=True,
        artifact_path=None,
        workdir=self._workdir,
    )
    self.trainer.load_model()
    self.trainer.train()

    # Propagate config changes from training (e.g., inferred timestamp_format) to generation
    self._nss_config = self.trainer.params

    return self

generate()

Generate synthetic data using the trained model.

Selects the appropriate backend (VllmBackend or TimeseriesBackend), initializes it, and generates synthetic records.

Returns:

Type Description
SafeSynthesizer

Self for method chaining.

Source code in src/nemo_safe_synthesizer/sdk/library_builder.py
@traced("SafeSynthesizer.generate", category=LogCategory.RUNTIME)
def generate(self) -> SafeSynthesizer:
    """Generate synthetic data using the trained model.

    Selects the appropriate backend (``VllmBackend`` or
    ``TimeseriesBackend``), initializes it, and generates
    synthetic records.

    Returns:
        Self for method chaining.
    """
    if not os.environ.get("NSS_PHASE"):
        os.environ["NSS_PHASE"] = "generate"
    if TYPE_CHECKING:
        assert self._nss_config is not None
        assert self._llm_metadata is not None
    if self._total_start is None:
        self._total_start = time.monotonic()

    # Clean up trainer model if it exists (only present when train->generate in same session)
    trainer = getattr(self, "trainer", None)
    if trainer is not None:
        trainer.teardown()

    assert self._workdir is not None
    # Select backend based on time_series configuration
    if self._nss_config.time_series and self._nss_config.time_series.is_timeseries:
        self.generator = TimeseriesBackend(
            config=self._nss_config, model_metadata=self._llm_metadata, workdir=self._workdir
        )
    else:
        self.generator = VllmBackend(
            config=self._nss_config, model_metadata=self._llm_metadata, workdir=self._workdir
        )

    try:
        self.generator.initialize()
        self.generator.generate()
    finally:
        self.generator.teardown()
    self._generated = True
    return self

evaluate()

Run quality and privacy evaluations and populate results.

Returns:

Type Description
SafeSynthesizer

Self for method chaining.

Source code in src/nemo_safe_synthesizer/sdk/library_builder.py
@traced("SafeSynthesizer.evaluate", category=LogCategory.RUNTIME)
def evaluate(self) -> SafeSynthesizer:
    """Run quality and privacy evaluations and populate ``results``.

    Returns:
        Self for method chaining.
    """
    if not os.environ.get("NSS_PHASE"):
        os.environ["NSS_PHASE"] = "evaluate"
    if TYPE_CHECKING:
        assert self._nss_config is not None
        assert self._original_training_df is not None
        assert self._test_df is not None
        assert self._total_start is not None
        if self._nss_config.replace_pii is not None:
            assert self._pii_replacer_time is not None
            assert self._column_statistics is not None

    self.evaluator = Evaluator(
        config=self._nss_config,
        generate_results=self.generator.gen_results,
        pii_replacer_time=self._pii_replacer_time,
        column_statistics=self._column_statistics,
        training_df=self._original_training_df,
        test_df=self._test_df,
        workdir=self._workdir,
    )
    self.evaluator.evaluate()

    training_time = None
    if trainer := getattr(self, "trainer", {}):
        if res := getattr(trainer, "results", None):
            training_time = res.elapsed_time
    generation_time = None
    if generator := getattr(self, "generator", {}):
        if res := getattr(generator, "gen_results", None):
            generation_time = res.elapsed_time

    self.results = make_nss_results(
        total_time=time.monotonic() - self._total_start,
        training_time=training_time,
        generation_time=generation_time,
        evaluation_time=self.evaluator.evaluation_time,
        report=self.evaluator.report,
        generate_results=self.generator.gen_results,
    )
    return self

run(output_file=None)

Run the full pipeline and save results.

Executes process_data -> train -> generate -> evaluate -> save_results. For step-by-step control, call the individual methods instead.

Parameters:

Name Type Description Default
output_file Path | str | None

Explicit output path for the synthetic data CSV. Falls back to workdir.output_file when None.

None

Raises:

Type Description
RuntimeError

If called after load_from_save_path(). Use .generate().evaluate() for the resume path.

Source code in src/nemo_safe_synthesizer/sdk/library_builder.py
def run(self, output_file: Path | str | None = None) -> None:
    """Run the full pipeline and save results.

    Executes ``process_data`` -> ``train`` -> ``generate`` ->
    ``evaluate`` -> ``save_results``.  For step-by-step control,
    call the individual methods instead.

    Args:
        output_file: Explicit output path for the synthetic data CSV.
            Falls back to ``workdir.output_file`` when ``None``.

    Raises:
        RuntimeError: If called after ``load_from_save_path()``.
            Use ``.generate().evaluate()`` for the resume path.
    """
    if self._loaded_from_save_path:
        raise RuntimeError(
            "run() cannot be called after load_from_save_path(). "
            "The resume path is for generation and evaluation only: "
            ".load_from_save_path().generate().evaluate()"
        )

    if TYPE_CHECKING:
        assert self._nss_config is not None
        assert isinstance(self._data_source, pd.DataFrame)

    try:
        self.process_data().train().generate().evaluate()
        self.save_results(output_file=output_file)
        _emit_nss_telemetry(self, TaskStatusEnum.COMPLETED)
    except KeyboardInterrupt:
        _emit_nss_telemetry(self, TaskStatusEnum.CANCELED)
        raise
    except Exception:
        _emit_nss_telemetry(self, TaskStatusEnum.ERROR)
        raise

save_results(output_file=None)

Save synthetic data, evaluation report, and metrics to the workdir.

Writes synthetic_data.csv, evaluation_report.html (when available), and evaluation_metrics.json into the generate directory. Called automatically by run(). Call explicitly after stepwise execution (process_data().train().generate().evaluate()).

Parameters:

Name Type Description Default
output_file Path | str | None

Explicit output path for the CSV. Falls back to workdir.output_file when None.

None
Source code in src/nemo_safe_synthesizer/sdk/library_builder.py
@traced("SafeSynthesizer.save_results", category=LogCategory.RUNTIME, level="INFO")
def save_results(self, output_file: Path | str | None = None) -> SafeSynthesizer:
    """Save synthetic data, evaluation report, and metrics to the workdir.

    Writes ``synthetic_data.csv``, ``evaluation_report.html`` (when
    available), and ``evaluation_metrics.json`` into the generate
    directory.  Called automatically by ``run()``.  Call explicitly
    after stepwise execution
    (``process_data().train().generate().evaluate()``).

    Args:
        output_file: Explicit output path for the CSV.  Falls back
            to ``workdir.output_file`` when ``None``.
    """
    if TYPE_CHECKING:
        assert self.results is not None
        assert isinstance(self.results.synthetic_data, pd.DataFrame)

    assert self._workdir is not None
    match output_file:
        case Path() as p:
            output_file = p
        case str() as s:
            output_file = Path(s)
        case _:
            output_file = self._workdir.output_file

    output_file.parent.mkdir(parents=True, exist_ok=True)
    self.results.synthetic_data.to_csv(str(output_file), index=False)
    logger.info(f"Saved synthetic data to {output_file}")

    if self.results.evaluation_report_html:
        report_path = self._workdir.evaluation_report
        report_path.parent.mkdir(parents=True, exist_ok=True)
        report_path.write_text(self.results.evaluation_report_html)
        logger.info(f"Saved evaluation report to {report_path}")

        # we only get non-empty results summary when evaluation is run
        metrics_path = self._workdir.evaluation_metrics
        metrics_path.parent.mkdir(parents=True, exist_ok=True)
        metrics_path.write_text(self.results.summary.model_dump_json(indent=2))
        logger.info(f"Saved evaluation metrics and runtimes to {metrics_path}")

    return self