Skip to content

telemetry

telemetry

Telemetry handler for NeMo products.

Environment variables: - NEMO_TELEMETRY_ENABLED: Whether telemetry is enabled. - NEMO_DEPLOYMENT_TYPE: The deployment type the event came from. - NEMO_TELEMETRY_ENDPOINT: The endpoint to send the telemetry events to. - NEMO_SESSION_PREFIX: Optional prefix to add to session IDs.

Classes:

Name Description
NSSTrainingAndGenerationEvent
TelemetryHandler

Handles telemetry event batching, flushing, and retry logic for NeMo products.

Functions:

Name Description
sanitize_model_for_telemetry

Return a telemetry-safe pretrained model label.

bucket_records

Return a bucketed string label for a count of input records.

bucket_columns

Return a bucketed string label for a count of input columns.

NSSTrainingAndGenerationEvent pydantic-model

Bases: BaseModel

Config:

  • default: {'populate_by_name': True}

Fields:

nemo_source = NemoSourceEnum.SAFE_SYNTHESIZER pydantic-field

The NeMo product that created the event.

task pydantic-field

The type of task that was performed (e.g. train, generate, evaluate, run).

task_status pydantic-field

The final status of the task.

deployment_type pydantic-field

How Safe Synthesizer was invoked (cli, sdk, nmp).

job_duration_sec = -1.0 pydantic-field

Wall-clock duration of the job in seconds. -1.0 if not available.

num_records_generated = -1 pydantic-field

Number of valid synthetic records produced. -1 if not available.

num_tokens_generated = -1 pydantic-field

Number of tokens generated by the model. -1 if not available.

replace_pii_enabled = False pydantic-field

Whether PII replacement was enabled for this run.

differential_privacy_enabled = False pydantic-field

Whether differential privacy training was enabled for this run.

time_series_enabled = False pydantic-field

Whether time-series mode was enabled for this run.

group_by_enabled = False pydantic-field

Whether group-by was set on the input data for this run.

input_records_bucket = 'undefined' pydantic-field

Bucketed count of input training records (e.g. '101-1000'). Use bucket_records().

input_columns_bucket = 'undefined' pydantic-field

Bucketed count of input columns (e.g. '6-10'). Use bucket_columns().

synthetic_quality_score = -1.0 pydantic-field

Top-level Synthetic Quality Score from the evaluation report. -1.0 if not available.

data_privacy_score = -1.0 pydantic-field

Top-level Data Privacy Score from the evaluation report. -1.0 if not available.

model = 'undefined' pydantic-field

The pretrained model used for training/generation.

gpu = 'undefined' pydantic-field

GPU device name (e.g. 'NVIDIA A100 80GB PCIe'). 'undefined' if not on GPU.

TelemetryHandler(flush_interval_seconds=120.0, max_queue_size=50, max_retries=MAX_RETRIES, source_client_version='undefined', session_id='undefined')

Handles telemetry event batching, flushing, and retry logic for NeMo products.

Supports two usage patterns:

  • Background mode: call start() (or use with handler:) to spawn a daemon thread with its own event loop that drives periodic flushing. stop() schedules a final flush, then stops the loop and joins the thread.
  • Fire-and-flush mode: skip start(), enqueue() events, then call stop() to flush once via asyncio.run. No background thread is created.

Parameters:

Name Type Description Default
flush_interval_seconds float

The interval in seconds to flush the events.

120.0
max_queue_size int

The maximum number of events to queue before flushing.

50
max_retries int

The maximum number of times to retry sending an event.

MAX_RETRIES
source_client_version str

The version of the source client. This should be the version of the actual NeMo product that is sending the events, typically the same as the version of a PyPi package that a user would install.

'undefined'
session_id str

An optional session ID to associate with the events. This should be a unique identifier for the session, such as a UUID. It is used to group events together.

'undefined'

Methods:

Name Description
astart

Start the background timer task on the current event loop.

astop

Cancel the timer task and flush any remaining events.

aflush

Flush all queued events immediately and await completion.

start

Spawn a daemon thread with a persistent event loop for periodic flushing.

stop

Flush pending events. If a background thread is running, shut it down and join.

flush

Flush all queued events immediately and wait for completion.

Source code in src/nemo_safe_synthesizer/telemetry.py
def __init__(
    self,
    flush_interval_seconds: float = 120.0,
    max_queue_size: int = 50,
    max_retries: int = MAX_RETRIES,
    source_client_version: str = "undefined",
    session_id: str = "undefined",
):
    self._flush_interval = flush_interval_seconds
    self._max_queue_size = max_queue_size
    self._max_retries = max_retries
    self._events: list[QueuedEvent] = []
    self._dlq: list[QueuedEvent] = []  # Dead letter queue for retry
    self._queue_lock = threading.Lock()
    self._loop: asyncio.AbstractEventLoop | None = None
    self._thread: threading.Thread | None = None
    self._flush_signal: asyncio.Event | None = None
    self._timer_task: asyncio.Task | None = None
    self._running = False
    self._source_client_version = source_client_version
    prefix = _session_prefix()
    self._session_id = f"{prefix}{session_id}" if prefix else session_id

astart() async

Start the background timer task on the current event loop.

Source code in src/nemo_safe_synthesizer/telemetry.py
async def astart(self) -> None:
    """Start the background timer task on the current event loop."""
    if self._running:
        return
    self._loop = asyncio.get_running_loop()
    self._flush_signal = asyncio.Event()
    self._running = True
    self._timer_task = asyncio.create_task(self._timer_loop())

astop() async

Cancel the timer task and flush any remaining events.

Source code in src/nemo_safe_synthesizer/telemetry.py
async def astop(self) -> None:
    """Cancel the timer task and flush any remaining events."""
    if not self._running:
        await self._flush_events()
        return
    self._running = False
    if self._flush_signal is not None:
        self._flush_signal.set()
    if self._timer_task is not None:
        self._timer_task.cancel()
        try:
            await self._timer_task
        except asyncio.CancelledError:
            pass  # expected: we just cancelled the task during shutdown
        self._timer_task = None
    await self._flush_events()
    self._loop = None
    self._flush_signal = None

aflush() async

Flush all queued events immediately and await completion.

Source code in src/nemo_safe_synthesizer/telemetry.py
async def aflush(self) -> None:
    """Flush all queued events immediately and await completion."""
    await self._flush_events()

start()

Spawn a daemon thread with a persistent event loop for periodic flushing.

Source code in src/nemo_safe_synthesizer/telemetry.py
def start(self) -> None:
    """Spawn a daemon thread with a persistent event loop for periodic flushing."""
    if self._running:
        return
    ready = threading.Event()
    startup_error: list[BaseException] = []

    def _run() -> None:
        try:
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            self._loop = loop
            self._flush_signal = asyncio.Event()
            self._timer_task = loop.create_task(self._timer_loop())
            self._running = True
        except BaseException as exc:  # noqa: BLE001
            startup_error.append(exc)
            ready.set()
            return
        ready.set()
        try:
            loop.run_forever()
        finally:
            loop.close()

    self._thread = threading.Thread(target=_run, name="nemo-telemetry", daemon=True)
    self._thread.start()
    ready.wait()
    if startup_error:
        self._thread = None
        raise startup_error[0]

stop()

Flush pending events. If a background thread is running, shut it down and join.

Source code in src/nemo_safe_synthesizer/telemetry.py
def stop(self) -> None:
    """Flush pending events. If a background thread is running, shut it down and join."""
    if self._running and self._loop is not None and self._thread is not None:
        loop = self._loop
        future = asyncio.run_coroutine_threadsafe(self._astop_inner(), loop)
        try:
            future.result(timeout=30)
        except Exception:  # noqa: BLE001
            pass  # best-effort: telemetry must not disrupt callers
        loop.call_soon_threadsafe(loop.stop)
        self._thread.join(timeout=5)
        self._thread = None
        self._loop = None
        self._flush_signal = None
        self._timer_task = None
        self._running = False
        return
    # Fire-and-flush: no background thread; flush once on a fresh loop.
    if self._events or self._dlq:
        try:
            asyncio.run(self._flush_events())
        except Exception:  # noqa: BLE001
            pass  # best-effort: telemetry must not disrupt callers

flush()

Flush all queued events immediately and wait for completion.

Source code in src/nemo_safe_synthesizer/telemetry.py
def flush(self) -> None:
    """Flush all queued events immediately and wait for completion."""
    if self._running and self._loop is not None and self._thread is not None:
        future: Future[None] = asyncio.run_coroutine_threadsafe(self._flush_events(), self._loop)
        try:
            future.result(timeout=30)
        except Exception:  # noqa: BLE001
            pass  # best-effort
        return
    if self._events or self._dlq:
        try:
            asyncio.run(self._flush_events())
        except Exception:  # noqa: BLE001
            pass  # best-effort

sanitize_model_for_telemetry(model)

Return a telemetry-safe pretrained model label.

Hugging Face repo IDs are safe to report, but local model paths may embed user or machine details. Prefer the coarse local label when the value looks path-like or does not satisfy Hugging Face repo ID syntax.

Source code in src/nemo_safe_synthesizer/telemetry.py
def sanitize_model_for_telemetry(model: str | None) -> str:
    """Return a telemetry-safe pretrained model label.

    Hugging Face repo IDs are safe to report, but local model paths may embed
    user or machine details. Prefer the coarse local label when the value looks
    path-like or does not satisfy Hugging Face repo ID syntax.
    """
    if model is None:
        return "undefined"

    model = model.strip()
    if not model:
        return "undefined"

    if model.startswith(("/", "./", "../", "~")):
        return LOCAL_MODEL_LABEL
    if "\\" in model or PureWindowsPath(model).drive:
        return LOCAL_MODEL_LABEL
    if model.count("/") > 1:
        return LOCAL_MODEL_LABEL
    if Path(model).expanduser().exists():
        return LOCAL_MODEL_LABEL

    try:
        validate_repo_id(model)
    except HFValidationError:
        return LOCAL_MODEL_LABEL

    return model

bucket_records(n)

Return a bucketed string label for a count of input records.

Used to avoid transmitting exact record counts in telemetry.

Source code in src/nemo_safe_synthesizer/telemetry.py
def bucket_records(n: int) -> str:
    """Return a bucketed string label for a count of input records.

    Used to avoid transmitting exact record counts in telemetry.
    """
    if n <= 200:
        return "1-200"
    if n <= 1_000:
        return "201-1000"
    if n <= 10_000:
        return "1001-10000"
    if n <= 100_000:
        return "10001-100000"
    return "100001+"

bucket_columns(n)

Return a bucketed string label for a count of input columns.

Used to avoid transmitting exact column counts in telemetry.

Source code in src/nemo_safe_synthesizer/telemetry.py
def bucket_columns(n: int) -> str:
    """Return a bucketed string label for a count of input columns.

    Used to avoid transmitting exact column counts in telemetry.
    """
    if n <= 5:
        return "1-5"
    if n <= 10:
        return "6-10"
    if n <= 20:
        return "11-20"
    if n <= 50:
        return "21-50"
    return "51+"