telemetry
telemetry
¶
Anonymous telemetry handler for NeMo Anonymizer.
Emits one anonymizer_event per Anonymizer.run() / Anonymizer.preview()
invocation. Telemetry is opt-out via:
NEMO_TELEMETRY_ENABLED=falseenvironment variableAnonymizerConfig(emit_telemetry=False)--no-emit-telemetryCLI flag
Related environment variables (read at runtime, not import time):
NEMO_TELEMETRY_ENABLED: set tofalse/0/noto disable.NEMO_DEPLOYMENT_TYPE:cli,sdk,nmp. Defaults tosdk.NEMO_TELEMETRY_ENDPOINT: override the destination URL.NEMO_SESSION_PREFIX: prepended to session IDs. Set to"anonymizer-"automatically byAnonymizer.__init__for dashboard filtering.
Classes:
| Name | Description |
|---|---|
AnonymizerEvent |
Pydantic model for the anonymizer_event payload. |
TelemetryHandler |
Fire-and-flush telemetry handler for Anonymizer. |
Functions:
| Name | Description |
|---|---|
avg_tokens_per_record |
Mean tiktoken count across the input texts. |
classify_model_host |
Classify a ModelProvider's endpoint URL into one of the ModelHostEnum values. |
collect_model_hosts |
Sort + dedupe per-provider host classifications into the wire-format list. |
sort_join_aliases |
Canonical pool serialization: sorted ascending, comma-joined, no spaces. |
AnonymizerEvent
pydantic-model
¶
Bases: BaseModel
Pydantic model for the anonymizer_event payload.
Field aliases match the camelCase schema in
aire/microservices/nemo-telemetry schemas/anonymous_events.json.
Config:
default:{'populate_by_name': True}
Fields:
-
nemo_source(NemoSourceEnum) -
task(TaskEnum) -
task_status(TaskStatusEnum) -
deployment_type(DeploymentTypeEnum) -
job_duration_sec(float) -
num_input_records(int) -
num_success_records(int) -
num_failure_records(int) -
avg_tokens_per_record(int) -
transformation_type(str) -
custom_data_summary_provided(bool) -
custom_privacy_goal_provided(bool) -
custom_substitute_instructions_provided(bool) -
max_repair_iterations(int) -
strict_entity_protection(bool) -
repair_iterations_triggered(int) -
entity_detector_model(str) -
entity_validator_model(str) -
entity_augmenter_model(str) -
latent_detector_model(str) -
replacement_generator_model(str) -
domain_classifier_model(str) -
disposition_analyzer_model(str) -
meaning_extractor_model(str) -
qa_generator_model(str) -
rewriter_model(str) -
evaluator_model(str) -
repairer_model(str) -
judge_model(str) -
model_hosts(list[str]) -
entity_detection_failure_count(int) -
latent_detection_failure_count(int) -
replace_map_generation_failure_count(int) -
rewrite_pipeline_failure_count(int) -
rewrite_evaluate_failure_count(int) -
rewrite_repair_failure_count(int) -
rewrite_final_judge_failure_count(int) -
unknown_step_failure_count(int)
TelemetryHandler(*, source_client_version='undefined', session_id='undefined', max_queue_size=50, max_retries=MAX_RETRIES)
¶
Fire-and-flush telemetry handler for Anonymizer.
Anonymizer runs are short, so we skip DD's background-daemon-thread mode entirely. Usage:
with TelemetryHandler(source_client_version=__version__, session_id=...) as h:
h.enqueue(event)
# on __exit__, queued events are flushed synchronously
All errors are swallowed; telemetry must never disrupt the pipeline.
Source code in src/anonymizer/telemetry.py
def __init__(
self,
*,
source_client_version: str = "undefined",
session_id: str = "undefined",
max_queue_size: int = 50,
max_retries: int = MAX_RETRIES,
):
self._max_queue_size = max_queue_size
self._max_retries = max_retries
self._events: list[QueuedEvent] = []
self._dlq: list[QueuedEvent] = []
self._source_client_version = source_client_version
prefix = _session_prefix()
self._session_id = f"{prefix}{session_id}" if prefix else session_id
avg_tokens_per_record(texts)
¶
Mean tiktoken count across the input texts.
Returns -1 on empty input or tokenizer failure. Telemetry is best-effort. Accepts any iterable of strings (list, pd.Series).
Source code in src/anonymizer/telemetry.py
def avg_tokens_per_record(texts) -> int:
"""Mean tiktoken count across the input texts.
Returns -1 on empty input or tokenizer failure. Telemetry is best-effort.
Accepts any iterable of strings (list, pd.Series).
"""
try:
tokenizer = _get_tokenizer()
counts = [len(tokenizer.encode(str(t), disallowed_special=())) for t in texts]
if not counts:
return -1
return int(sum(counts) / len(counts))
except Exception:
return -1
classify_model_host(provider)
¶
Classify a ModelProvider's endpoint URL into one of the ModelHostEnum values.
Substring-matches known host fragments against the (lower-cased) endpoint URL.
Source code in src/anonymizer/telemetry.py
def classify_model_host(provider: ModelProvider | None) -> ModelHostEnum:
"""Classify a ModelProvider's endpoint URL into one of the ModelHostEnum values.
Substring-matches known host fragments against the (lower-cased) endpoint URL.
"""
if provider is None:
return ModelHostEnum.OTHER
# ``endpoint`` is a single URL string; the ``in`` checks below are substring
# searches against that string (not iteration over characters).
endpoint = (getattr(provider, "endpoint", "") or "").lower()
if "build.nvidia.com" in endpoint or "integrate.api.nvidia.com" in endpoint:
return ModelHostEnum.NVIDIA_BUILD
if "inference-api.nvidia.com" in endpoint:
return ModelHostEnum.NVIDIA_INTERNAL
if "openrouter.ai" in endpoint:
return ModelHostEnum.OPENROUTER
local_hosts = ("localhost", "127.0.0.1", "0.0.0.0", "[::1]")
if any(host in endpoint for host in local_hosts):
return ModelHostEnum.LOCAL
return ModelHostEnum.OTHER
collect_model_hosts(hosts)
¶
Sort + dedupe per-provider host classifications into the wire-format list.
Returns ["other"] when no hosts were observed — never an empty list, so the
telemetry payload always carries at least one host string.
Source code in src/anonymizer/telemetry.py
def collect_model_hosts(hosts: list[ModelHostEnum]) -> list[str]:
"""Sort + dedupe per-provider host classifications into the wire-format list.
Returns ``["other"]`` when no hosts were observed — never an empty list, so the
telemetry payload always carries at least one host string.
"""
unique = sorted({h.value for h in hosts if h is not None})
return unique or [ModelHostEnum.OTHER.value]
sort_join_aliases(aliases)
¶
Canonical pool serialization: sorted ascending, comma-joined, no spaces.
Source code in src/anonymizer/telemetry.py
def sort_join_aliases(aliases: list[str]) -> str:
"""Canonical pool serialization: sorted ascending, comma-joined, no spaces."""
cleaned = [a.strip() for a in aliases if a and a.strip()]
if not cleaned:
return NOT_APPLICABLE
return ",".join(sorted(cleaned))