observability
observability
¶
Observability for Safe Synthesizer.
Provides structured logging with category support for batch/CLI operations.
Log categories:
- RUNTIME: internal operational details (memory, timings, debug info)
- USER: user-relevant progress and results
- SYSTEM: system-level events (startup, shutdown, config)
- BACKEND: logs from dependencies
Configure via environment variables:
NSS_LOG_FORMAT:"json"or"plain"(default: auto-detect from tty)NSS_LOG_LEVEL:"INFO","WARNING","ERROR","CRITICAL","DEBUG_DEPENDENCIES", or"DEBUG"(default:"INFO")NSS_LOG_FILE: path to file for JSON logs (optional)OTEL_SERVICE_NAME: OpenTelemetry service name (default:"nemo-safe-synthesizer")
Logging is NOT auto-initialized on import. Entry points (CLI, scripts) must
call initialize_observability() first. When used as a library,
get_logger() returns basic stdlib loggers that integrate with the parent
application's logging configuration.
Classes:
| Name | Description |
|---|---|
NSSObservabilitySettings |
Logging configuration read from environment variables or CLI flags. |
LogCategory |
Categories for log messages. |
CategoryFilter |
Filter logs by category. |
CategoryLogger |
Logger wrapper that adds category support. |
TracedContext |
Traced context usable as both a decorator and a context manager. |
NvmlPeakSampler |
Daemon-thread sampler tracking peak device VRAM via NVML. |
Functions:
| Name | Description |
|---|---|
initialize_observability |
Initialize observability for Safe Synthesizer. |
configure_logging_from_workdir |
Configure observability settings from a Workdir before initialization. |
get_logger |
Return a category logger for structured logging. |
traced |
Create a traced context for logging operation entry/exit. |
traced_user |
Log a user-relevant operation (progress, results). |
traced_runtime |
Log a runtime/internal operation. |
traced_system |
Log a system-level operation. |
traced_backend |
Log a backend operation. |
heartbeat |
Context manager that logs a periodic heartbeat during a long-running operation. |
read_loadavg |
Return |
NSSObservabilitySettings
¶
Bases: BaseSettings
Logging configuration read from environment variables or CLI flags.
Methods:
| Name | Description |
|---|---|
set_log_format_default |
Set nss_log_format default based on whether stdout is a tty or notebook. |
set_log_color_default |
Set nss_log_color default based on whether stdout is a tty at instantiation time. |
set_log_format_default(value)
classmethod
¶
Set nss_log_format default based on whether stdout is a tty or notebook.
Source code in src/nemo_safe_synthesizer/observability.py
set_log_color_default(value)
classmethod
¶
Set nss_log_color default based on whether stdout is a tty at instantiation time.
Source code in src/nemo_safe_synthesizer/observability.py
LogCategory
¶
Bases: str, Enum
Categories for log messages.
DiscardSensitiveMessages
¶
Bases: Filter
Discards messages marked as sensitive via the sensitive flag.
CategoryFilter(include_categories=None)
¶
CategoryLogger(base_logger)
¶
Bases: Logger
Logger wrapper that adds category support.
Usage::
logger = get_logger(__name__)
# Runtime logs (internal details)
logger.runtime.debug("Memory allocated", extra={"bytes": 1024})
logger.runtime.info("Cache hit rate", extra={"rate": 0.95})
# User-relevant logs (progress, results)
logger.user.info("Training started", extra={"epochs": 10})
logger.user.info("Generation complete", extra={"records": 1000})
# Backend logs
logger.backend.info("Configuration loaded")
# Default (runtime)
logger.info("Some message")
Source code in src/nemo_safe_synthesizer/observability.py
TracedContext(name, category=LogCategory.RUNTIME, log_entry=True, log_exit=True, record_duration=True, logger=None, level='DEBUG')
¶
Traced context usable as both a decorator and a context manager.
As a decorator::
@traced("operation_name", category=LogCategory.USER)
def my_function(): ...
As a context manager::
with traced("operation_name", category=LogCategory.USER):
...
Source code in src/nemo_safe_synthesizer/observability.py
NvmlPeakSampler(device_index=None, interval_seconds=0.25)
¶
Daemon-thread sampler tracking peak device VRAM via NVML.
Use as a context manager wrapping the work whose peak VRAM you want::
with NvmlPeakSampler() as vram:
... # build engine / run training / generate
peak_gb = vram.peak_gb # float | None
Returns None from :attr:peak_gb when NVML isn't available (driver
missing, pynvml import failed, device index invalid). Reads at the driver
layer, so it sees allocations made by worker subprocesses regardless of
which process holds the torch handle. Reports device-wide VRAM -- on a
dedicated host that equals the workload's allocation; on a shared GPU it
includes other process allocations.
device_index defaults to the first CUDA_VISIBLE_DEVICES entry (see
:func:_default_nvml_device_index) so the sampler follows the workload's
GPU on multi-GPU hosts instead of always reading physical GPU 0. Pass an
explicit index to override.
Attributes:
| Name | Type | Description |
|---|---|---|
peak_gb |
float | None
|
Peak device-wide VRAM (GiB) observed during sampling; |
Source code in src/nemo_safe_synthesizer/observability.py
peak_gb
property
¶
Peak device-wide VRAM (GiB) observed during sampling; None if NVML unavailable.
initialize_observability()
¶
Initialize observability for Safe Synthesizer.
Central entry point for all observability setup -- currently initializes logging only. Must be called explicitly by entry points (CLI, scripts); not called automatically on import. Idempotent.
Source code in src/nemo_safe_synthesizer/observability.py
configure_logging_from_workdir(workdir, log_level='INFO', log_format=None, log_color=True)
¶
Configure observability settings from a Workdir before initialization.
This should be called BEFORE initialize_observability() to set the log file path and other settings based on the workdir structure. The workdir's log_file path will be used for file logging.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workdir
|
'Workdir'
|
The Workdir that defines artifact paths |
required |
log_level
|
Literal['INFO', 'WARNING', 'ERROR', 'CRITICAL', 'DEBUG_DEPENDENCIES', 'DEBUG']
|
Log level (default: INFO) |
'INFO'
|
log_format
|
Literal['json', 'plain'] | None
|
Log format - 'json' or 'plain' (default: auto-detect from tty) |
None
|
log_color
|
bool
|
Whether to colorize console output (default: True) |
True
|
Returns:
| Type | Description |
|---|---|
Path
|
The configured log file path |
Example
workdir = Workdir(base_path=Path("artifacts"), config_name="default", dataset_name="mydata") log_file = configure_logging_from_workdir(workdir, log_level="DEBUG") initialize_observability() logger = get_logger(name) logger.info("Logs will be written to", extra={"log_file": str(log_file)})
Source code in src/nemo_safe_synthesizer/observability.py
get_logger(name=None)
¶
Return a category logger for structured logging.
Always pass __name__ as the argument. After
initialize_observability() is called, returns a structlog-based
logger with full formatting. Before initialization (e.g. when imported
as a library), returns a basic stdlib logger that integrates with the
parent application's logging configuration.
Source code in src/nemo_safe_synthesizer/observability.py
traced(name=None, category=LogCategory.RUNTIME, log_entry=True, log_exit=True, record_duration=True, level='DEBUG')
¶
Create a traced context for logging operation entry/exit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str | None
|
Operation name (defaults to function qualname when used as a decorator). |
None
|
category
|
LogCategory
|
Log category for entry/exit messages. |
RUNTIME
|
log_entry
|
bool
|
Whether to log function entry. |
True
|
log_exit
|
bool
|
Whether to log function exit. |
True
|
record_duration
|
bool
|
Whether to record duration in the exit log. |
True
|
level
|
Literal['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
|
Log level for entry/exit messages. |
'DEBUG'
|
Example:: # Usage as a decorator @traced("training.epoch", category=LogCategory.USER) def train_epoch(self, epoch_num: int): ...
@traced(category=LogCategory.RUNTIME) # Internal operation
def _compute_gradients(self): ...
# Usage as a context manager
with traced("data_loading", category=LogCategory.USER):
data = load_data()
process(data)
Source code in src/nemo_safe_synthesizer/observability.py
traced_user(name=None, **kwargs)
¶
Log a user-relevant operation (progress, results).
traced_runtime(name=None, **kwargs)
¶
traced_system(name=None, **kwargs)
¶
traced_backend(name=None, **kwargs)
¶
heartbeat(message, interval=60.0, *, logger_name=None, progress_note=None, **extra_fields)
¶
Context manager that logs a periodic heartbeat during a long-running operation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
str
|
Description of the operation (e.g. "Model loading", "Generation"). |
required |
interval
|
float
|
Seconds between heartbeat log messages. |
60.0
|
logger_name
|
str | None
|
Logger name (pass |
None
|
progress_note
|
str | None
|
Optional sentence appended only to periodic |
None
|
**extra_fields
|
Additional structured fields passed to the logger
(e.g. |
{}
|
Source code in src/nemo_safe_synthesizer/observability.py
read_loadavg()
¶
Return /proc/loadavg as a (1m, 5m, 15m) triple; None when unavailable.
Linux-only. Cheap (one syscall). Safe to call from any process -- the read is host-scoped, not process-scoped. Designed to bracket a workload: caller reads pre + post, the pair is informative about whether host load drifted during the run.