Skip to content

observability

observability

Observability for Safe Synthesizer.

Provides structured logging with category support for batch/CLI operations.

Log categories:

  • RUNTIME: internal operational details (memory, timings, debug info)
  • USER: user-relevant progress and results
  • SYSTEM: system-level events (startup, shutdown, config)
  • BACKEND: logs from dependencies

Configure via environment variables:

  • NSS_LOG_FORMAT: "json" or "plain" (default: auto-detect from tty)
  • NSS_LOG_LEVEL: "INFO", "WARNING", "ERROR", "CRITICAL", "DEBUG_DEPENDENCIES", or "DEBUG" (default: "INFO")
  • NSS_LOG_FILE: path to file for JSON logs (optional)
  • OTEL_SERVICE_NAME: OpenTelemetry service name (default: "nemo-safe-synthesizer")

Logging is NOT auto-initialized on import. Entry points (CLI, scripts) must call initialize_observability() first. When used as a library, get_logger() returns basic stdlib loggers that integrate with the parent application's logging configuration.

Classes:

Name Description
NSSObservabilitySettings

Logging configuration read from environment variables or CLI flags.

LogCategory

Categories for log messages.

CategoryFilter

Filter logs by category.

CategoryLogger

Logger wrapper that adds category support.

TracedContext

Traced context usable as both a decorator and a context manager.

Functions:

Name Description
initialize_observability

Initialize observability for Safe Synthesizer.

configure_logging_from_workdir

Configure observability settings from a Workdir before initialization.

get_logger

Return a category logger for structured logging.

traced

Create a traced context for logging operation entry/exit.

traced_user

Log a user-relevant operation (progress, results).

traced_runtime

Log a runtime/internal operation.

traced_system

Log a system-level operation.

traced_backend

Log a backend operation.

heartbeat

Context manager that logs a periodic heartbeat during a long-running operation.

NSSObservabilitySettings

Bases: BaseSettings

Logging configuration read from environment variables or CLI flags.

Methods:

Name Description
set_log_format_default

Set nss_log_format default based on whether stdout is a tty or notebook.

set_log_color_default

Set nss_log_color default based on whether stdout is a tty at instantiation time.

set_log_format_default(value) classmethod

Set nss_log_format default based on whether stdout is a tty or notebook.

Source code in src/nemo_safe_synthesizer/observability.py
@field_validator("nss_log_format", mode="before")
@classmethod
def set_log_format_default(cls, value: Any) -> Literal["json", "plain"]:
    """Set nss_log_format default based on whether stdout is a tty or notebook."""
    match value:
        case str():
            return value.lower()
        case _ if sys.stdout.isatty():
            return "plain"
        case _:
            try:
                from IPython import get_ipython

                if get_ipython().__class__.__name__ == "ZMQInteractiveShell":
                    return "plain"
            except (ImportError, AttributeError):
                pass
            return "json"

set_log_color_default(value) classmethod

Set nss_log_color default based on whether stdout is a tty at instantiation time.

Source code in src/nemo_safe_synthesizer/observability.py
@field_validator("nss_log_color", mode="before")
@classmethod
def set_log_color_default(cls, value: Any) -> bool:
    """Set nss_log_color default based on whether stdout is a tty at instantiation time."""
    match value:
        case str():
            return value.lower() == "true"
        case _ if sys.stdout.isatty():
            warnings.warn("stdout is a tty, setting nss_log_color to True", UserWarning)
            return True
        case _:
            warnings.warn("stdout is not a tty, setting nss_log_color to False", UserWarning)
            return False

LogCategory

Bases: str, Enum

Categories for log messages.

DiscardSensitiveMessages

Bases: Filter

Discards messages marked as sensitive via the sensitive flag.

CategoryFilter(include_categories=None)

Bases: Filter

Filter logs by category.

Source code in src/nemo_safe_synthesizer/observability.py
def __init__(self, include_categories: set[LogCategory] | None = None):
    super().__init__()
    self.include_categories = include_categories

CategoryLogger(base_logger)

Bases: Logger

Logger wrapper that adds category support.

Usage::

logger = get_logger(__name__)

# Runtime logs (internal details)
logger.runtime.debug("Memory allocated", extra={"bytes": 1024})
logger.runtime.info("Cache hit rate", extra={"rate": 0.95})

# User-relevant logs (progress, results)
logger.user.info("Training started", extra={"epochs": 10})
logger.user.info("Generation complete", extra={"records": 1000})

# Backend logs
logger.backend.info("Configuration loaded")

# Default (runtime)
logger.info("Some message")
Source code in src/nemo_safe_synthesizer/observability.py
def __init__(self, base_logger: logging.Logger):
    self._logger = base_logger
    self.runtime = _CategoryLogAdapter(base_logger, LogCategory.RUNTIME)
    self.user = _CategoryLogAdapter(base_logger, LogCategory.USER)
    self.system = _CategoryLogAdapter(base_logger, LogCategory.SYSTEM)
    self.backend = _CategoryLogAdapter(base_logger, LogCategory.BACKEND)
    self.default = self.runtime

TracedContext(name, category=LogCategory.RUNTIME, log_entry=True, log_exit=True, record_duration=True, logger=None, level='DEBUG')

Traced context usable as both a decorator and a context manager.

As a decorator::

@traced("operation_name", category=LogCategory.USER)
def my_function(): ...

As a context manager::

with traced("operation_name", category=LogCategory.USER):
    ...
Source code in src/nemo_safe_synthesizer/observability.py
def __init__(
    self,
    name: str | None,
    category: LogCategory = LogCategory.RUNTIME,
    log_entry: bool = True,
    log_exit: bool = True,
    record_duration: bool = True,
    logger: CategoryLogger | None = None,
    level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG",
):
    if not name:
        raise ValueError("TracedContext name is required")
    self.name: str = name
    self.category = category
    self.log_entry = log_entry
    self.log_exit = log_exit
    self.record_duration = record_duration
    self._start_time: float | None = None
    self._log_adapter: _CategoryLogAdapter | None = None
    self.level = level

initialize_observability()

Initialize observability for Safe Synthesizer.

Central entry point for all observability setup -- currently initializes logging only. Must be called explicitly by entry points (CLI, scripts); not called automatically on import. Idempotent.

Source code in src/nemo_safe_synthesizer/observability.py
def initialize_observability():
    """Initialize observability for Safe Synthesizer.

    Central entry point for all observability setup -- currently initializes
    logging only. Must be called explicitly by entry points (CLI, scripts);
    not called automatically on import. Idempotent.
    """
    global _INITIALIZED_OBSERVABILITY
    if _INITIALIZED_OBSERVABILITY:
        return

    _initialize_logging()
    _INITIALIZED_OBSERVABILITY = True

configure_logging_from_workdir(workdir, log_level='INFO', log_format=None, log_color=True)

Configure observability settings from a Workdir before initialization.

This should be called BEFORE initialize_observability() to set the log file path and other settings based on the workdir structure. The workdir's log_file path will be used for file logging.

Parameters:

Name Type Description Default
workdir Workdir

The Workdir that defines artifact paths

required
log_level Literal['INFO', 'WARNING', 'ERROR', 'CRITICAL', 'DEBUG_DEPENDENCIES', 'DEBUG']

Log level (default: INFO)

'INFO'
log_format Literal['json', 'plain'] | None

Log format - 'json' or 'plain' (default: auto-detect from tty)

None
log_color bool

Whether to colorize console output (default: True)

True

Returns:

Type Description
Path

The configured log file path

Example

workdir = Workdir(base_path=Path("artifacts"), config_name="default", dataset_name="mydata") log_file = configure_logging_from_workdir(workdir, log_level="DEBUG") initialize_observability() logger = get_logger(name) logger.info("Logs will be written to", extra={"log_file": str(log_file)})

Source code in src/nemo_safe_synthesizer/observability.py
def configure_logging_from_workdir(
    workdir: "Workdir",
    log_level: Literal["INFO", "WARNING", "ERROR", "CRITICAL", "DEBUG_DEPENDENCIES", "DEBUG"] = "INFO",
    log_format: Literal["json", "plain"] | None = None,
    log_color: bool = True,
) -> Path:
    """Configure observability settings from a Workdir before initialization.

    This should be called BEFORE initialize_observability() to set the log file path
    and other settings based on the workdir structure. The workdir's log_file path
    will be used for file logging.

    Args:
        workdir: The Workdir that defines artifact paths
        log_level: Log level (default: INFO)
        log_format: Log format - 'json' or 'plain' (default: auto-detect from tty)
        log_color: Whether to colorize console output (default: True)

    Returns:
        The configured log file path

    Example:
        workdir = Workdir(base_path=Path("artifacts"), config_name="default", dataset_name="mydata")
        log_file = configure_logging_from_workdir(workdir, log_level="DEBUG")
        initialize_observability()
        logger = get_logger(__name__)
        logger.info("Logs will be written to", extra={"log_file": str(log_file)})
    """
    # Import here to avoid circular imports
    from .cli.artifact_structure import Workdir as WS

    if not isinstance(workdir, WS):
        raise TypeError(f"Expected Workdir, got {type(workdir)}")

    # Ensure the logs directory exists
    log_file = workdir.log_file
    log_file.parent.mkdir(parents=True, exist_ok=True)

    # Configure via environment variables (read by NSSObservabilitySettings)
    os.environ["NSS_LOG_FILE"] = str(log_file)
    os.environ["NSS_LOG_LEVEL"] = log_level
    if log_format:
        os.environ["NSS_LOG_FORMAT"] = log_format
    if not log_color:
        os.environ["NSS_LOG_COLOR"] = "false"

    return log_file

get_logger(name=None)

Return a category logger for structured logging.

Always pass __name__ as the argument. After initialize_observability() is called, returns a structlog-based logger with full formatting. Before initialization (e.g. when imported as a library), returns a basic stdlib logger that integrates with the parent application's logging configuration.

Source code in src/nemo_safe_synthesizer/observability.py
def get_logger(name: str | None = None) -> CategoryLogger:
    """Return a category logger for structured logging.

    Always pass ``__name__`` as the argument. After
    ``initialize_observability()`` is called, returns a structlog-based
    logger with full formatting. Before initialization (e.g. when imported
    as a library), returns a basic stdlib logger that integrates with the
    parent application's logging configuration.
    """
    if _INITIALIZED_OBSERVABILITY:
        return CategoryLogger(structlog.get_logger(name))

    # Return basic stdlib logger when logging hasn't been initialized
    # This allows the package to be used as a library without taking over
    # the parent application's logging configuration
    return CategoryLogger(logging.getLogger(name))

traced(name=None, category=LogCategory.RUNTIME, log_entry=True, log_exit=True, record_duration=True, level='DEBUG')

Create a traced context for logging operation entry/exit.

Parameters:

Name Type Description Default
name str | None

Operation name (defaults to function qualname when used as a decorator).

None
category LogCategory

Log category for entry/exit messages.

RUNTIME
log_entry bool

Whether to log function entry.

True
log_exit bool

Whether to log function exit.

True
record_duration bool

Whether to record duration in the exit log.

True
level Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']

Log level for entry/exit messages.

'DEBUG'

Example:: # Usage as a decorator @traced("training.epoch", category=LogCategory.USER) def train_epoch(self, epoch_num: int): ...

@traced(category=LogCategory.RUNTIME)  # Internal operation
def _compute_gradients(self): ...


# Usage as a context manager
with traced("data_loading", category=LogCategory.USER):
    data = load_data()
    process(data)
Source code in src/nemo_safe_synthesizer/observability.py
def traced(
    name: str | None = None,
    category: LogCategory = LogCategory.RUNTIME,
    log_entry: bool = True,
    log_exit: bool = True,
    record_duration: bool = True,
    level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "DEBUG",
) -> TracedContext:
    """Create a traced context for logging operation entry/exit.

    Args:
        name: Operation name (defaults to function qualname when used as
            a decorator).
        category: Log category for entry/exit messages.
        log_entry: Whether to log function entry.
        log_exit: Whether to log function exit.
        record_duration: Whether to record duration in the exit log.
        level: Log level for entry/exit messages.

    Example::
        # Usage as a decorator
        @traced("training.epoch", category=LogCategory.USER)
        def train_epoch(self, epoch_num: int): ...


        @traced(category=LogCategory.RUNTIME)  # Internal operation
        def _compute_gradients(self): ...


        # Usage as a context manager
        with traced("data_loading", category=LogCategory.USER):
            data = load_data()
            process(data)
    """
    return TracedContext(
        name=name,
        category=category,
        log_entry=log_entry,
        log_exit=log_exit,
        record_duration=record_duration,
        level=level,
    )

traced_user(name=None, **kwargs)

Log a user-relevant operation (progress, results).

Source code in src/nemo_safe_synthesizer/observability.py
def traced_user(name: str | None = None, **kwargs):
    """Log a user-relevant operation (progress, results)."""
    return traced(name=name, category=LogCategory.USER, **kwargs)

traced_runtime(name=None, **kwargs)

Log a runtime/internal operation.

Source code in src/nemo_safe_synthesizer/observability.py
def traced_runtime(name: str | None = None, **kwargs):
    """Log a runtime/internal operation."""
    return traced(name=name, category=LogCategory.RUNTIME, **kwargs)

traced_system(name=None, **kwargs)

Log a system-level operation.

Source code in src/nemo_safe_synthesizer/observability.py
def traced_system(name: str | None = None, **kwargs):
    """Log a system-level operation."""
    return traced(name=name, category=LogCategory.SYSTEM, **kwargs)

traced_backend(name=None, **kwargs)

Log a backend operation.

Source code in src/nemo_safe_synthesizer/observability.py
def traced_backend(name: str | None = None, **kwargs):
    """Log a backend operation."""
    return traced(name=name, category=LogCategory.BACKEND, **kwargs)

heartbeat(message, interval=60.0, *, logger_name=None, progress_note=None, **extra_fields)

Context manager that logs a periodic heartbeat during a long-running operation.

Parameters:

Name Type Description Default
message str

Description of the operation (e.g. "Model loading", "Generation").

required
interval float

Seconds between heartbeat log messages.

60.0
logger_name str | None

Logger name (pass __name__ so heartbeat logs attribute to the calling module).

None
progress_note str | None

Optional sentence appended only to periodic ... in progress lines (so message can stay short for ... complete / ... failed).

None
**extra_fields

Additional structured fields passed to the logger (e.g. model="SmolLM3").

{}
Source code in src/nemo_safe_synthesizer/observability.py
@contextlib.contextmanager
def heartbeat(
    message: str,
    interval: float = 60.0,
    *,
    logger_name: str | None = None,
    progress_note: str | None = None,
    **extra_fields,
) -> Generator[None, None, None]:
    """Context manager that logs a periodic heartbeat during a long-running operation.

    Args:
        message: Description of the operation (e.g. "Model loading", "Generation").
        interval: Seconds between heartbeat log messages.
        logger_name: Logger name (pass ``__name__`` so heartbeat logs attribute
            to the calling module).
        progress_note: Optional sentence appended only to periodic ``... in progress``
            lines (so ``message`` can stay short for ``... complete`` / ``... failed``).
        **extra_fields: Additional structured fields passed to the logger
            (e.g. ``model="SmolLM3"``).
    """
    if interval <= 0:
        raise ValueError(f"heartbeat interval must be positive, got {interval}")
    _logger = get_logger(logger_name or __name__)
    stop = threading.Event()
    start = time.monotonic()

    def _extra() -> dict:
        return {"elapsed_seconds": round(time.monotonic() - start, 1), **extra_fields}

    def _run() -> None:
        while not stop.wait(timeout=interval):
            event = f"{message} in progress"
            if progress_note:
                event = f"{event}. {progress_note}"
            _logger.info(event, extra={"ctx": _extra()})

    thread = threading.Thread(target=_run, daemon=True)
    thread.start()
    exc: BaseException | None = None
    try:
        yield
    except BaseException as e:
        exc = e
        raise
    finally:
        stop.set()
        thread.join(timeout=1)
        if exc is not None:
            ctx = {**_extra(), "error_type": type(exc).__name__}
            _logger.error(f"{message} failed", extra={"ctx": ctx})
        else:
            _logger.info(f"{message} complete", extra={"ctx": _extra()})