Skip to content

telemetry

telemetry

Anonymous telemetry handler for NeMo Anonymizer.

Emits one anonymizer_event per Anonymizer.run() / Anonymizer.preview() invocation. Telemetry is opt-out via:

  • NEMO_TELEMETRY_ENABLED=false environment variable
  • AnonymizerConfig(emit_telemetry=False)
  • --no-emit-telemetry CLI flag

Related environment variables (read at runtime, not import time):

  • NEMO_TELEMETRY_ENABLED: set to false / 0 / no to disable.
  • NEMO_DEPLOYMENT_TYPE: cli, sdk, nmp. Defaults to sdk.
  • NEMO_TELEMETRY_ENDPOINT: override the destination URL.
  • NEMO_SESSION_PREFIX: prepended to session IDs. Set to "anonymizer-" automatically by Anonymizer.__init__ for dashboard filtering.

Classes:

Name Description
AnonymizerEvent

Pydantic model for the anonymizer_event payload.

TelemetryHandler

Fire-and-flush telemetry handler for Anonymizer.

Functions:

Name Description
avg_tokens_per_record

Mean tiktoken count across the input texts.

classify_model_host

Classify a ModelProvider's endpoint URL into one of the ModelHostEnum values.

collect_model_hosts

Sort + dedupe per-provider host classifications into the wire-format list.

sort_join_aliases

Canonical pool serialization: sorted ascending, comma-joined, no spaces.

AnonymizerEvent pydantic-model

Bases: BaseModel

Pydantic model for the anonymizer_event payload.

Field aliases match the camelCase schema in aire/microservices/nemo-telemetry schemas/anonymous_events.json.

Config:

  • default: {'populate_by_name': True}

Fields:

  • nemo_source (NemoSourceEnum)
  • task (TaskEnum)
  • task_status (TaskStatusEnum)
  • deployment_type (DeploymentTypeEnum)
  • job_duration_sec (float)
  • num_input_records (int)
  • num_success_records (int)
  • num_failure_records (int)
  • avg_tokens_per_record (int)
  • transformation_type (str)
  • custom_data_summary_provided (bool)
  • custom_privacy_goal_provided (bool)
  • custom_substitute_instructions_provided (bool)
  • max_repair_iterations (int)
  • strict_entity_protection (bool)
  • repair_iterations_triggered (int)
  • entity_detector_model (str)
  • entity_validator_model (str)
  • entity_augmenter_model (str)
  • latent_detector_model (str)
  • replacement_generator_model (str)
  • domain_classifier_model (str)
  • disposition_analyzer_model (str)
  • meaning_extractor_model (str)
  • qa_generator_model (str)
  • rewriter_model (str)
  • evaluator_model (str)
  • repairer_model (str)
  • judge_model (str)
  • model_hosts (list[str])
  • entity_detection_failure_count (int)
  • latent_detection_failure_count (int)
  • replace_map_generation_failure_count (int)
  • rewrite_pipeline_failure_count (int)
  • rewrite_evaluate_failure_count (int)
  • rewrite_repair_failure_count (int)
  • rewrite_final_judge_failure_count (int)
  • unknown_step_failure_count (int)

TelemetryHandler(*, source_client_version='undefined', session_id='undefined', max_queue_size=50, max_retries=MAX_RETRIES)

Fire-and-flush telemetry handler for Anonymizer.

Anonymizer runs are short, so we skip DD's background-daemon-thread mode entirely. Usage:

with TelemetryHandler(source_client_version=__version__, session_id=...) as h:
    h.enqueue(event)
# on __exit__, queued events are flushed synchronously

All errors are swallowed; telemetry must never disrupt the pipeline.

Source code in src/anonymizer/telemetry.py
def __init__(
    self,
    *,
    source_client_version: str = "undefined",
    session_id: str = "undefined",
    max_queue_size: int = 50,
    max_retries: int = MAX_RETRIES,
):
    self._max_queue_size = max_queue_size
    self._max_retries = max_retries
    self._events: list[QueuedEvent] = []
    self._dlq: list[QueuedEvent] = []
    self._source_client_version = source_client_version
    prefix = _session_prefix()
    self._session_id = f"{prefix}{session_id}" if prefix else session_id

avg_tokens_per_record(texts)

Mean tiktoken count across the input texts.

Returns -1 on empty input or tokenizer failure. Telemetry is best-effort. Accepts any iterable of strings (list, pd.Series).

Source code in src/anonymizer/telemetry.py
def avg_tokens_per_record(texts) -> int:
    """Mean tiktoken count across the input texts.

    Returns -1 on empty input or tokenizer failure. Telemetry is best-effort.
    Accepts any iterable of strings (list, pd.Series).
    """
    try:
        tokenizer = _get_tokenizer()
        counts = [len(tokenizer.encode(str(t), disallowed_special=())) for t in texts]
        if not counts:
            return -1
        return int(sum(counts) / len(counts))
    except Exception:
        return -1

classify_model_host(provider)

Classify a ModelProvider's endpoint URL into one of the ModelHostEnum values.

Substring-matches known host fragments against the (lower-cased) endpoint URL.

Source code in src/anonymizer/telemetry.py
def classify_model_host(provider: ModelProvider | None) -> ModelHostEnum:
    """Classify a ModelProvider's endpoint URL into one of the ModelHostEnum values.

    Substring-matches known host fragments against the (lower-cased) endpoint URL.
    """
    if provider is None:
        return ModelHostEnum.OTHER
    # ``endpoint`` is a single URL string; the ``in`` checks below are substring
    # searches against that string (not iteration over characters).
    endpoint = (getattr(provider, "endpoint", "") or "").lower()
    if "build.nvidia.com" in endpoint or "integrate.api.nvidia.com" in endpoint:
        return ModelHostEnum.NVIDIA_BUILD
    if "inference-api.nvidia.com" in endpoint:
        return ModelHostEnum.NVIDIA_INTERNAL
    if "openrouter.ai" in endpoint:
        return ModelHostEnum.OPENROUTER
    local_hosts = ("localhost", "127.0.0.1", "0.0.0.0", "[::1]")
    if any(host in endpoint for host in local_hosts):
        return ModelHostEnum.LOCAL
    return ModelHostEnum.OTHER

collect_model_hosts(hosts)

Sort + dedupe per-provider host classifications into the wire-format list.

Returns ["other"] when no hosts were observed — never an empty list, so the telemetry payload always carries at least one host string.

Source code in src/anonymizer/telemetry.py
def collect_model_hosts(hosts: list[ModelHostEnum]) -> list[str]:
    """Sort + dedupe per-provider host classifications into the wire-format list.

    Returns ``["other"]`` when no hosts were observed — never an empty list, so the
    telemetry payload always carries at least one host string.
    """
    unique = sorted({h.value for h in hosts if h is not None})
    return unique or [ModelHostEnum.OTHER.value]

sort_join_aliases(aliases)

Canonical pool serialization: sorted ascending, comma-joined, no spaces.

Source code in src/anonymizer/telemetry.py
def sort_join_aliases(aliases: list[str]) -> str:
    """Canonical pool serialization: sorted ascending, comma-joined, no spaces."""
    cleaned = [a.strip() for a in aliases if a and a.strip()]
    if not cleaned:
        return NOT_APPLICABLE
    return ",".join(sorted(cleaned))