Skip to content

wandb_setup

wandb_setup

WandB integration for Safe Synthesizer.

This module provides WandB (Weights & Biases) integration for experiment tracking, including run initialization, configuration logging, and failure reporting.

Classes:

Name Description
WandbMode

WandB run mode.

WandbPhase

Phase of the Safe Synthesizer pipeline.

WandbSettings

WandB configuration for Safe Synthesizer.

WandbLoggable

Structural type for observability events that can be logged to wandb.

Functions:

Name Description
resolve_wandb_run_id

Resolve a wandb run ID from a string or file path.

log_failure_to_wandb

Log failure to wandb before exiting.

update_wandb_config

Update the wandb config with the given configuration.

initialize_wandb_run

Initialize or resume a wandb run with consistent configuration.

log_observability_event

Log an observability event to the currently active wandb run.

WandbMode

Bases: str, Enum

WandB run mode.

WandbPhase

Bases: str, Enum

Phase of the Safe Synthesizer pipeline.

WandbSettings

Bases: BaseSettings

WandB configuration for Safe Synthesizer.

All settings can be configured via environment variables.

Methods:

Name Description
validate_wandb_mode

Coerce string or None to WandbMode enum, defaulting to DISABLED.

validate_phase

Coerce string or None to WandbPhase, defaulting to UNKNOWN.

Attributes:

Name Type Description
wandb_mode WandbMode

Run mode, one of online, offline, or disabled (env variable: WANDB_MODE or NSS_WANDB_MODE).

wandb_project str | None

WandB project name override (env variable: WANDB_PROJECT or NSS_WANDB_PROJECT).

exp_name str

Fallback project name when wandb_project is not set.

phase WandbPhase

Current pipeline phase for WandB grouping.

effective_wandb_project str

Effective wandb project name, falling back to exp_name.

wandb_mode = Field(default=(WandbMode.DISABLED), description='Run mode, one of online, offline, or disabled.', validation_alias=(AliasChoices('WANDB_MODE', 'NSS_WANDB_MODE'))) class-attribute instance-attribute

Run mode, one of online, offline, or disabled (env variable: WANDB_MODE or NSS_WANDB_MODE).

wandb_project = Field(default=None, description='WandB project name override.', validation_alias=(AliasChoices('WANDB_PROJECT', 'NSS_WANDB_PROJECT'))) class-attribute instance-attribute

WandB project name override (env variable: WANDB_PROJECT or NSS_WANDB_PROJECT).

exp_name = Field(default='nss_experiments', description='Fallback project name when ``wandb_project`` is not set.') class-attribute instance-attribute

Fallback project name when wandb_project is not set.

phase = Field(default=(WandbPhase.UNKNOWN), description='Current pipeline phase for WandB grouping.') class-attribute instance-attribute

Current pipeline phase for WandB grouping.

effective_wandb_project property

Effective wandb project name, falling back to exp_name.

validate_wandb_mode(v) classmethod

Coerce string or None to WandbMode enum, defaulting to DISABLED.

Source code in src/nemo_safe_synthesizer/cli/wandb_setup.py
@field_validator("wandb_mode", mode="before")
@classmethod
def validate_wandb_mode(cls, v: str | WandbMode | None) -> WandbMode:
    """Coerce string or None to ``WandbMode`` enum, defaulting to DISABLED."""
    if v is None:
        return WandbMode.DISABLED
    if isinstance(v, WandbMode):
        return v
    return WandbMode(v)

validate_phase(v) classmethod

Coerce string or None to WandbPhase, defaulting to UNKNOWN.

Source code in src/nemo_safe_synthesizer/cli/wandb_setup.py
@field_validator("phase", mode="before")
@classmethod
def validate_phase(cls, v: str | WandbPhase | None) -> WandbPhase:
    """Coerce string or None to ``WandbPhase``, defaulting to UNKNOWN."""
    if v is None:
        return WandbPhase.UNKNOWN
    if isinstance(v, WandbPhase):
        return v
    return WandbPhase(v)

WandbLoggable

Bases: Protocol

Structural type for observability events that can be logged to wandb.

Any event exposing to_wandb_payload(prefix) -> dict satisfies this -- e.g. TrainingObservability and the generation-side GenerationObservability. Using a Protocol keeps :func:log_observability_event decoupled from the concrete event types (no import of the training/generation subpackages from this CLI module).

Methods:

Name Description
to_wandb_payload

Return wandb metrics for this event, namespaced under prefix.

to_wandb_payload(prefix='')

Return wandb metrics for this event, namespaced under prefix.

Source code in src/nemo_safe_synthesizer/cli/wandb_setup.py
def to_wandb_payload(self, prefix: str = "") -> dict[str, Any]:
    """Return wandb metrics for this event, namespaced under ``prefix``."""

resolve_wandb_run_id(id_or_path)

Resolve a wandb run ID from a string or file path.

Parameters:

Name Type Description Default
id_or_path str

Either a wandb run ID string, or a path to a file containing the ID.

required

Returns:

Type Description
str

The resolved wandb run ID.

Source code in src/nemo_safe_synthesizer/cli/wandb_setup.py
def resolve_wandb_run_id(id_or_path: str) -> str:
    """Resolve a wandb run ID from a string or file path.

    Args:
        id_or_path: Either a wandb run ID string, or a path to a file containing the ID.

    Returns:
        The resolved wandb run ID.
    """
    path = Path(id_or_path)
    if path.exists() and path.is_file():
        return path.read_text().strip()
    return id_or_path

log_failure_to_wandb(error, phase)

Log failure to wandb before exiting.

Parameters:

Name Type Description Default
error Exception

The exception that caused the failure

required
phase str

The phase where failure occurred (e.g., "train", "generation", "end_to_end")

required
Source code in src/nemo_safe_synthesizer/cli/wandb_setup.py
def log_failure_to_wandb(error: Exception, phase: str) -> None:
    """Log failure to wandb before exiting.

    Args:
        error: The exception that caused the failure
        phase: The phase where failure occurred (e.g., "train", "generation", "end_to_end")
    """
    try:
        if wandb.run is not None:
            wandb.log(
                {
                    "eval/success": 0,
                    f"{phase}/error_type": type(error).__name__,
                    f"{phase}/error_message": str(error),
                }
            )
            logger.info(f"Logged failure to wandb for {phase} phase")
    except Exception as e:
        logger.warning(f"Failed to log error to wandb: {e}")

update_wandb_config(cfg=None, additional_configs=None)

Update the wandb config with the given configuration.

Parameters:

Name Type Description Default
cfg SafeSynthesizerParameters | None

SafeSynthesizerParameters to log

None
additional_configs dict[str, Any] | None

Additional key-value pairs to log

None
Source code in src/nemo_safe_synthesizer/cli/wandb_setup.py
def update_wandb_config(
    cfg: SafeSynthesizerParameters | None = None,
    additional_configs: dict[str, Any] | None = None,
) -> None:
    """Update the wandb config with the given configuration.

    Args:
        cfg: SafeSynthesizerParameters to log
        additional_configs: Additional key-value pairs to log
    """
    if wandb.run is None:
        return

    if additional_configs is None:
        additional_configs = {}

    if cfg is not None:
        config_dict = cfg.model_dump()
        config_dict.update(additional_configs)
        wandb.config.update(config_dict, allow_val_change=True)

initialize_wandb_run(workdir, resume_job_id=None, cfg=None)

Initialize or resume a wandb run with consistent configuration.

This function handles four cases (in priority order): 1. WandB already initialized - just save the run ID 2. Explicit resume_job_id provided - resume that run (ID or file path) 3. Resume existing run from saved run_id file in workdir 4. Create new run

Parameters:

Name Type Description Default
workdir Workdir

Workdir structure containing paths for run ID files

required
resume_job_id str | None

Optional wandb run ID or path to file containing the ID

None
cfg SafeSynthesizerParameters | None

Optional SafeSynthesizerParameters to log to wandb config

None
Source code in src/nemo_safe_synthesizer/cli/wandb_setup.py
def initialize_wandb_run(
    workdir: Workdir,
    resume_job_id: str | None = None,
    cfg: SafeSynthesizerParameters | None = None,
) -> None:
    """Initialize or resume a wandb run with consistent configuration.

    This function handles four cases (in priority order):
    1. WandB already initialized - just save the run ID
    2. Explicit resume_job_id provided - resume that run (ID or file path)
    3. Resume existing run from saved run_id file in workdir
    4. Create new run

    Args:
        workdir: Workdir structure containing paths for run ID files
        resume_job_id: Optional wandb run ID or path to file containing the ID
        cfg: Optional SafeSynthesizerParameters to log to wandb config
    """
    settings = WandbSettings()

    logger.info(f"WANDB_MODE: {settings.wandb_mode}")
    if settings.wandb_mode == WandbMode.DISABLED:
        return

    wandb_project = settings.effective_wandb_project
    logger.info(f"WANDB_PROJECT: {wandb_project}")

    phase = settings.phase
    run_id_file = workdir.wandb_run_id_file

    if TYPE_CHECKING:
        assert isinstance(run_id_file, Path)

    # WandB settings to prevent console log issues
    wandb_settings = wandb.Settings(
        console="wrap",  # Wrap console output instead of redirecting
    )

    # Make a dictionary of additional configs to log to wandb
    additional_configs = {
        "dataset_name": workdir.dataset_name,
        "config_name": workdir.config_name,
        "dataset_name-config_name": f"{workdir.dataset_name}-{workdir.config_name}",  # wandb charts can only group by one variable
        "run_name": workdir.run_name,
        "phase": phase,
    }

    # Case 1: WandB already initialized
    if wandb.run is not None:
        run_id_file.parent.mkdir(parents=True, exist_ok=True)
        run_id_file.write_text(wandb.run.id, encoding="utf-8")

    # Case 2: Explicit resume_job_id provided (ID or file path)
    elif resume_job_id is not None:
        resolved_run_id = resolve_wandb_run_id(resume_job_id)
        logger.info(f"Resuming wandb run: {resolved_run_id} (from --wandb-resume-job-id)")
        wandb.init(
            project=wandb_project,
            id=resolved_run_id,
            resume="allow",
            mode=settings.wandb_mode.value,
            settings=wandb_settings,
            dir=workdir.run_dir,
        )
        if wandb.run is not None:
            run_id_file.parent.mkdir(parents=True, exist_ok=True)
            run_id_file.write_text(wandb.run.id, encoding="utf-8")

    # Case 3: Resume existing run from saved run_id file in workdir
    elif run_id_file.exists():
        saved_run_id = run_id_file.read_text().strip()
        logger.info(f"Resuming wandb run: {saved_run_id} (from {run_id_file.name})")
        wandb.init(
            project=wandb_project,
            id=saved_run_id,
            resume="allow",
            mode=settings.wandb_mode.value,
            settings=wandb_settings,
            dir=workdir.run_dir,
        )
        if wandb.run is not None:
            run_id_file.write_text(wandb.run.id, encoding="utf-8")

    # Case 4: Create new run
    else:
        logger.info(f"Creating new wandb run: {workdir.run_name}")
        run_id_file.parent.mkdir(parents=True, exist_ok=True)
        wandb.init(
            project=wandb_project,
            name=workdir.run_name,
            mode=settings.wandb_mode.value,
            settings=wandb_settings,
            dir=workdir.run_dir,
        )
        if wandb.run is not None:
            run_id_file.write_text(wandb.run.id, encoding="utf-8")
        logger.info(f"Saved wandb run ID to {workdir.wandb_run_id_file}")

        # Log config to wandb (only for new runs - resumed runs already have config)
        update_wandb_config(cfg, additional_configs=additional_configs)

    # Log run info
    logger.info(f"Wandb run name: {wandb.run.name if wandb.run else 'None'}")
    logger.info(f"Wandb run id: {wandb.run.id if wandb.run else 'None'}")
    if settings.wandb_mode != WandbMode.DISABLED:
        logger.info(f"Wandb run url: {wandb.run.url if wandb.run else 'None'}")

log_observability_event(event, prefix)

Log an observability event to the currently active wandb run.

Generic sink shared by the training and generation observability paths. No-op when no wandb run is active (WANDB_MODE=disabled or the pipeline hasn't called :func:initialize_wandb_run). Errors during wandb.log are swallowed at warning level -- observability is best-effort and a wandb failure must not break the run.

Parameters:

Name Type Description Default
event WandbLoggable

Any object exposing to_wandb_payload(prefix) -> dict (see :class:WandbLoggable).

required
prefix str

wandb key namespace for this event's metrics (e.g. "training" or "vllm_gen").

required
Source code in src/nemo_safe_synthesizer/cli/wandb_setup.py
def log_observability_event(event: WandbLoggable, prefix: str) -> None:
    """Log an observability event to the currently active wandb run.

    Generic sink shared by the training and generation observability paths.
    No-op when no wandb run is active (``WANDB_MODE=disabled`` or the pipeline
    hasn't called :func:`initialize_wandb_run`). Errors during ``wandb.log`` are
    swallowed at warning level -- observability is best-effort and a wandb
    failure must not break the run.

    Args:
        event: Any object exposing ``to_wandb_payload(prefix) -> dict`` (see
            :class:`WandbLoggable`).
        prefix: wandb key namespace for this event's metrics (e.g. ``"training"``
            or ``"vllm_gen"``).
    """
    if wandb.run is None:
        return
    try:
        wandb.log(event.to_wandb_payload(prefix=prefix))
    except Exception as exc:  # noqa: BLE001 -- degraded mode
        logger.warning(f"failed to log observability event ({prefix!r}) to wandb: {exc}")