Skip to content

detect

detect

Classes:

Name Description
DefaultLLMConfig

Default settings for the LLM used in column classification.

ColumnClassifier

Abstract column-type classifier; implementations may use LLM, VertexAI, or other backends.

ColumnClassifierNoop

No-op classifier that assigns UNKNOWN_ENTITY to every column.

IAPIClassifierConfig

Configuration for an inference-API-based column classifier.

ColumnClassifierLLM

Classify column types using an LLM (OpenAI-compatible inference API).

ClassifyConfig

Configuration for column classification and NER (entities, thresholds, GLiNER, regex).

EntityExtractor

Abstract extractor of entity/value pairs from free text.

EntityExtractorNoop

No-op extractor that returns no entities.

EntityReport

Per-entity stats for one column: count of detections and set of unique values.

EntityExtractorRegexp

Extract entities using regex-based NER pipeline.

EntityExtractorGliner

Extract entities from text using a GLiNER model with chunking and optional batch caching.

EntityExtractorMulti

Composite extractor that runs multiple extractors and concatenates their results.

Functions:

Name Description
classify_columns

Classify DataFrame columns to entity types via LLM and return column-to-entity map.

sample_columns

Sample up to num_samples unique values per non-empty column for classification prompts.

redact_from_entities

Replace each detected span in text with the result of redact_fn(prediction).

traverse_redact

Yield iterables of text segments and redacted spans for assembly via chain().

find_best

Return the prediction with the largest span (used when merging overlapping spans).

merge_subsume

Merge overlapping NER spans into a single prediction per span using find_best.

Attributes:

Name Type Description
NerReport

Per-column NER report: column name → entity name → EntityReport (counts and values).

NerReport = dict[str, dict[str, EntityReport]] module-attribute

Per-column NER report: column name → entity name → EntityReport (counts and values).

DefaultLLMConfig

Default settings for the LLM used in column classification.

All attributes are class-level. Used by classify_columns when calling the inference API for column-type classification.

Attributes:

Name Type Description
CONFIG_ID

Model identifier for the LLM. From env NIM_MODEL_ID, or qwen/qwen2.5-coder-32b-instruct if unset.

SYSTEM_PROMPT

System message describing the column-type annotation task sent to the LLM.

MAX_OUTPUT_TOKENS

Maximum number of tokens allowed in the LLM response (default 2048).

TEMPERATURE

Sampling temperature for LLM generation (default 0.2). Lower values give more deterministic output.

ColumnClassifier

Bases: ABC

Abstract column-type classifier; implementations may use LLM, VertexAI, or other backends.

Methods:

Name Description
detect_types

Classify each column into one of the given entity types.

detect_types(df, entities) abstractmethod

Classify each column into one of the given entity types.

Implementations may sample column values and use an LLM, lookup table, or other backend to assign exactly one entity type per column. Columns that cannot be classified or are not in entities should be mapped to UNKNOWN_ENTITY.

Parameters:

Name Type Description Default
df DataFrame

DataFrame whose columns are to be classified.

required
entities Optional[set[str]]

Set of valid entity type names to assign; may be None for implementations that use a fixed or default set.

required
Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
@abstractmethod
def detect_types(self, df: pd.DataFrame, entities: Optional[set[str]]) -> dict[str, Optional[str]]:
    """Classify each column into one of the given entity types.

    Implementations may sample column values and use an LLM, lookup table, or
    other backend to assign exactly one entity type per column. Columns that
    cannot be classified or are not in ``entities`` should be mapped to
    ``UNKNOWN_ENTITY``.

    Args:
        df: DataFrame whose columns are to be classified.
        entities: Set of valid entity type names to assign; may be ``None``
            for implementations that use a fixed or default set.
    """
    ...

ColumnClassifierNoop

Bases: ColumnClassifier

No-op classifier that assigns UNKNOWN_ENTITY to every column.

IAPIClassifierConfig(endpoint, model_key, job_id, num_samples) dataclass

Configuration for an inference-API-based column classifier.

Attributes:

Name Type Description
endpoint str

Inference endpoint URL.

model_key str

Model identifier.

job_id str

Job identifier.

num_samples int

Number of value samples per column for classification.

endpoint instance-attribute

Inference endpoint URL.

model_key instance-attribute

Model identifier.

job_id instance-attribute

Job identifier.

num_samples instance-attribute

Number of value samples per column for classification.

ColumnClassifierLLM()

Bases: ColumnClassifier

Classify column types using an LLM (OpenAI-compatible inference API).

Construct via factory; set _llm and _num_samples before calling detect_types. Not initialized with config in __init__.

Methods:

Name Description
detect_types

Sample column data and call the inference API to classify columns into entity types.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def __init__(self):
    self._llm = None
    self._num_samples = None

detect_types(df, entities)

Sample column data and call the inference API to classify columns into entity types.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def detect_types(self, df: pd.DataFrame, entities: set[str]) -> dict[str, Optional[str]]:
    """Sample column data and call the inference API to classify columns into entity types."""
    if self._llm is None:
        raise Exception("InferenceAPI classifier not initialized. Use get_classifier() method.")

    return classify_columns(
        df=df,
        entities=entities,
        num_samples=self._num_samples,
        client=self._llm,
        on_validation_error=self._on_validation_error,
        logger=logger,
    )

ClassifyConfig(valid_entities, ner_threshold, ner_regexps_enabled, ner_entities, gliner_enabled, gliner_batch_mode_enabled, gliner_batch_mode_chunk_length, gliner_batch_mode_batch_size, gliner_model) dataclass

Configuration for column classification and NER (entities, thresholds, GLiNER, regex).

Attributes:

Name Type Description
valid_entities set[str]

Set of valid entity type names for classification.

ner_threshold float

Score threshold for NER predictions.

ner_regexps_enabled bool

Whether regex-based NER is enabled.

ner_entities set[str] | None

Entity types for NER (or None to use default).

gliner_enabled bool

Whether GLiNER model is used.

gliner_batch_mode_enabled bool

Whether GLiNER batch mode is enabled.

gliner_batch_mode_chunk_length int

Chunk length for GLiNER.

gliner_batch_mode_batch_size int

Batch size for GLiNER.

gliner_model str

GLiNER model name or path.

valid_entities instance-attribute

Set of valid entity type names for classification.

ner_threshold instance-attribute

Score threshold for NER predictions.

ner_regexps_enabled instance-attribute

Whether regex-based NER is enabled.

ner_entities instance-attribute

Entity types for NER (or None to use default).

gliner_enabled instance-attribute

Whether GLiNER model is used.

gliner_batch_mode_enabled instance-attribute

Whether GLiNER batch mode is enabled.

gliner_batch_mode_chunk_length instance-attribute

Chunk length for GLiNER.

gliner_batch_mode_batch_size instance-attribute

Batch size for GLiNER.

gliner_model instance-attribute

GLiNER model name or path.

EntityExtractor()

Bases: ABC

Abstract extractor of entity/value pairs from free text.

Attributes:

Name Type Description
column_report NerReport

Per-column NER report (entity counts and values).

current_column str

Name of the column currently being processed.

Methods:

Name Description
extract_entity_values

Return a list of dicts with entity and value keys for each detection.

extract_ner_predictions

Return NER predictions with spans and labels for dedup/merge across extractors.

extract_and_replace_entities

Run NER, merge/dedupe predictions, update column_report, and replace spans with redact_fn.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def __init__(self):
    self.column_report = {}
    self.current_column = "unknown"

extract_entity_values(text, entities) abstractmethod

Return a list of dicts with entity and value keys for each detection.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
@abstractmethod
def extract_entity_values(self, text: str, entities: Optional[set[str]]) -> list[dict[str, str]]:
    """Return a list of dicts with ``entity`` and ``value`` keys for each detection."""
    ...

extract_ner_predictions(text, entities) abstractmethod

Return NER predictions with spans and labels for dedup/merge across extractors.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
@abstractmethod
def extract_ner_predictions(self, text: str, entities: Optional[set[str]]) -> list[NERPrediction]:
    """Return NER predictions with spans and labels for dedup/merge across extractors."""
    ...

extract_and_replace_entities(redact_fn, text, entities=None)

Run NER, merge/dedupe predictions, update column_report, and replace spans with redact_fn.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def extract_and_replace_entities(self, redact_fn: RedactFn, text: str, entities: Optional[set[str]] = None) -> str:
    """Run NER, merge/dedupe predictions, update ``column_report``, and replace spans with ``redact_fn``."""
    # Ensure text is a string - Jinja templates may pass non-string types (e.g., float/NaN)
    text = str(text)

    detected = merge_subsume(self.extract_ner_predictions(text, entities))
    if not detected:
        return text

    report = self.column_report.setdefault(self.current_column, {})
    for entity in detected:
        report.setdefault(entity.label, EntityReport(0, set()))
        report[entity.label].count += 1
        report[entity.label].values.add(entity.text)

    return redact_from_entities(text, detected, redact_fn)

EntityExtractorNoop()

Bases: EntityExtractor

No-op extractor that returns no entities.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def __init__(self):
    self.column_report = {}
    self.current_column = "unknown"

EntityReport(count, values) dataclass

Per-entity stats for one column: count of detections and set of unique values.

Attributes:

Name Type Description
count int

Number of detections for this entity in the column.

values set

Set of unique detected values for this entity.

count instance-attribute

Number of detections for this entity in the column.

values instance-attribute

Set of unique detected values for this entity.

EntityExtractorRegexp()

Bases: EntityExtractor

Extract entities using regex-based NER pipeline.

Methods:

Name Description
pipeline_from_entities

Build a pipeline factory for the given entity set (or _entity_types if empty).

get_entity_extractor

Return a regex extractor with entity types from clsfy_cfg (or DEFAULT_ENTITIES).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def __init__(self):
    self.column_report = {}
    self.current_column = "unknown"

pipeline_from_entities(entities)

Build a pipeline factory for the given entity set (or _entity_types if empty).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def pipeline_from_entities(self, entities: set[str]) -> Callable[[], Pipeline]:
    """Build a pipeline factory for the given entity set (or ``_entity_types`` if empty)."""
    if not entities:
        entities = self._entity_types
    predictor_filter = LabelSetPredictorFilter(entities)
    factory = NERFactory(regex_only=True)
    ner = factory.create(predictor_filter=predictor_filter)
    return ner.pipeline_factory

get_entity_extractor(clsfy_cfg) classmethod

Return a regex extractor with entity types from clsfy_cfg (or DEFAULT_ENTITIES).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
@classmethod
def get_entity_extractor(
    cls,
    clsfy_cfg: ClassifyConfig,
) -> EntityExtractor:
    """Return a regex extractor with entity types from ``clsfy_cfg`` (or ``DEFAULT_ENTITIES``)."""
    entity_types = DEFAULT_ENTITIES
    if clsfy_cfg.ner_entities:
        entity_types = clsfy_cfg.ner_entities
    self = cls()
    self._entity_types = entity_types
    return self

EntityExtractorGliner()

Bases: EntityExtractor

Extract entities from text using a GLiNER model with chunking and optional batch caching.

Use get_entity_extractor to construct; config comes from ClassifyConfig.

Methods:

Name Description
get_entity_extractor

Load GLiNER model and return extractor configured from clsfy_cfg.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def __init__(self):
    self.column_report = {}
    self.current_column = "unknown"

get_entity_extractor(clsfy_cfg) classmethod

Load GLiNER model and return extractor configured from clsfy_cfg.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
@classmethod
def get_entity_extractor(
    cls,
    clsfy_cfg: ClassifyConfig,
) -> EntityExtractorGliner:
    """Load GLiNER model and return extractor configured from ``clsfy_cfg``."""
    extractor = cls()
    extractor._model = None

    map_location = "cuda:0" if torch.cuda.is_available() else "cpu"
    logger.debug(
        f"Loading NER model from filesystem to {map_location}",
    )

    extractor._model = GLiNER.from_pretrained(
        clsfy_cfg.gliner_model,
        map_location=map_location,
        local_files_only=os.environ.get("LOCAL_FILES_ONLY") in ["true", "True"],
    )
    entity_types = DEFAULT_ENTITIES
    if clsfy_cfg.ner_entities:
        entity_types = clsfy_cfg.ner_entities
    extractor._entity_types = entity_types
    extractor._ner_threshold = 0.3
    if clsfy_cfg.ner_threshold is not None:
        extractor._ner_threshold = clsfy_cfg.ner_threshold
    extractor._batch_mode_enabled = clsfy_cfg.gliner_batch_mode_enabled
    extractor._chunk_length = clsfy_cfg.gliner_batch_mode_chunk_length
    extractor._chunk_overlap = 128
    if extractor._chunk_length <= extractor._chunk_overlap:
        extractor._chunk_overlap = 0
    extractor._entity_cache = {}
    extractor._batch_size = clsfy_cfg.gliner_batch_mode_batch_size
    return extractor

EntityExtractorMulti()

Bases: EntityExtractor

Composite extractor that runs multiple extractors and concatenates their results.

Methods:

Name Description
extract_entity_values

Return combined entity/value dicts from all sub-extractors.

extract_ner_predictions

Return merged NER predictions from all sub-extractors.

get_entity_extractor

Return an empty composite; add extractors with add_entity_extractor.

add_entity_extractor

Append an extractor to the composite.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def __init__(self):
    self.column_report = {}
    self.current_column = "unknown"

extract_entity_values(text, entities=None)

Return combined entity/value dicts from all sub-extractors.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def extract_entity_values(self, text: str, entities: Optional[set[str]] = None) -> list[dict[str, str]]:
    """Return combined entity/value dicts from all sub-extractors."""
    retval = []
    for extractor in self.extractors:
        retval += extractor.extract_entity_values(text, entities)
    return retval

extract_ner_predictions(text, entities=None)

Return merged NER predictions from all sub-extractors.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def extract_ner_predictions(self, text: str, entities: Optional[set[str]] = None) -> list[NERPrediction]:
    """Return merged NER predictions from all sub-extractors."""
    predictions = []
    for extractor in self.extractors:
        predictions += extractor.extract_ner_predictions(text, entities)
    return predictions

get_entity_extractor(clsfy_cfg) classmethod

Return an empty composite; add extractors with add_entity_extractor.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
@classmethod
def get_entity_extractor(cls, clsfy_cfg: ClassifyConfig) -> EntityExtractorMulti:
    """Return an empty composite; add extractors with ``add_entity_extractor``."""
    self = cls()
    self.extractors = []
    return self

add_entity_extractor(extractor)

Append an extractor to the composite.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def add_entity_extractor(self, extractor: EntityExtractor) -> None:
    """Append an extractor to the composite."""
    self.extractors.append(extractor)

classify_columns(df, entities, num_samples, client, on_validation_error, logger)

Classify DataFrame columns to entity types via LLM and return column-to-entity map.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to classify.

required
entities set[str]

Set of valid entity type names.

required
num_samples Optional[int]

Number of value samples per column for the prompt.

required
client Optional[OpenAI]

OpenAI client for chat completions.

required
on_validation_error Callable[[], None]

Callback invoked when LLM output is invalid JSON.

required
logger Logger

Logger for timing and context.

required

Returns:

Type Description
dict[str, Optional[str]]

Map of column name to entity type (or UNKNOWN_ENTITY).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def classify_columns(
    df: pd.DataFrame,
    entities: set[str],
    num_samples: Optional[int],
    client: Optional[OpenAI],
    on_validation_error: Callable[[], None],
    logger: logging.Logger,
) -> dict[str, Optional[str]]:
    """Classify DataFrame columns to entity types via LLM and return column-to-entity map.

    Args:
        df: DataFrame to classify.
        entities: Set of valid entity type names.
        num_samples: Number of value samples per column for the prompt.
        client: OpenAI client for chat completions.
        on_validation_error: Callback invoked when LLM output is invalid JSON.
        logger: Logger for timing and context.

    Returns:
        Map of column name to entity type (or ``UNKNOWN_ENTITY``).
    """
    formatted_prompt = _format_prompt(df, entities, num_samples)
    if not formatted_prompt:
        return {}

    llm_start = timer()
    response = client.chat.completions.create(
        model=DefaultLLMConfig.CONFIG_ID,
        messages=[
            {"role": "system", "content": DefaultLLMConfig.SYSTEM_PROMPT},
            {"role": "user", "content": formatted_prompt},
        ],
        temperature=DefaultLLMConfig.TEMPERATURE,
        max_tokens=DefaultLLMConfig.MAX_OUTPUT_TOKENS,
    )
    entities_str = response.choices[0].message.content
    llm_elapsed = timer() - llm_start
    logger.info(
        f"LLM column classification took {llm_elapsed} seconds.",
        extra={
            "ctx": {
                "llm_elapsed": llm_elapsed,
            },
        },
    )

    col_entities = _try_extract_entities(entities_str, on_validation_error)
    return {col: ent if ent in entities else UNKNOWN_ENTITY for col, ent in col_entities.items()}

sample_columns(df, num_samples, random_state=None)

Sample up to num_samples unique values per non-empty column for classification prompts.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def sample_columns(df: pd.DataFrame, num_samples: int, random_state: Optional[int] = None) -> dict[str, pd.Series]:
    """Sample up to ``num_samples`` unique values per non-empty column for classification prompts."""
    nonempty_columns = df.dropna(axis="columns", how="all").columns
    col_samples = {}
    for col in nonempty_columns:
        filtered = df[col][df[col].apply(lambda x: len(str(x)) < MAX_COL_STR_LEN)].dropna()
        if filtered.empty:
            continue
        col_samples[col] = (
            filtered.sample(frac=1, random_state=random_state).value_counts().index[:num_samples].astype(str)
        )
    return col_samples

redact_from_entities(text, detected, redact_fn)

Replace each detected span in text with the result of redact_fn(prediction).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def redact_from_entities(text: str, detected: list[NERPrediction], redact_fn: RedactFn) -> str:
    """Replace each detected span in ``text`` with the result of ``redact_fn(prediction)``."""
    return "".join(chain(*traverse_redact(text, detected, redact_fn)))

traverse_redact(text, entities, redact_fn)

Yield iterables of text segments and redacted spans for assembly via chain().

Entities must be sorted by span; yields alternating slices of text and redact_fn(entity) so that chain(*traverse_redact(...)) gives the full string.

Parameters:

Name Type Description Default
text str

Source text.

required
entities list[NERPrediction]

NER predictions with start/end indices (sorted by span).

required
redact_fn RedactFn

Function mapping each prediction to its replacement string.

required

Yields:

Type Description
Iterable[str]

Iterables of strings (text slices and redaction results).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def traverse_redact(text: str, entities: list[NERPrediction], redact_fn: RedactFn) -> Iterator[Iterable[str]]:
    """Yield iterables of text segments and redacted spans for assembly via ``chain()``.

    Entities must be sorted by span; yields alternating slices of ``text`` and
    ``redact_fn(entity)`` so that ``chain(*traverse_redact(...))`` gives the full string.

    Args:
        text: Source text.
        entities: NER predictions with ``start``/``end`` indices (sorted by span).
        redact_fn: Function mapping each prediction to its replacement string.

    Yields:
        Iterables of strings (text slices and redaction results).
    """
    prev = 0
    for entity in sorted(entities, key=lambda e: e.start):
        # Yes this is an iterator which yields iterators. It allows a
        # single pass, and single copy, across text, which may be very large.
        yield islice(text, prev, entity.start)
        yield redact_fn(entity)
        prev = entity.end
    yield islice(text, prev, None)

find_best(entities)

Return the prediction with the largest span (used when merging overlapping spans).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def find_best(entities: list[NERPrediction]) -> NERPrediction:
    """Return the prediction with the largest span (used when merging overlapping spans)."""
    span_max = 0
    best = entities[0]
    for entity in entities:
        span = entity.end - entity.start
        if span > span_max:
            best = entity
            span_max = span
    return best

merge_subsume(entities)

Merge overlapping NER spans into a single prediction per span using find_best.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/detect.py
def merge_subsume(entities: list[NERPrediction]) -> list[NERPrediction]:
    """Merge overlapping NER spans into a single prediction per span using ``find_best``."""
    result = []
    entities = sorted(entities, key=lambda e: (e.start, e.end))
    while entities:
        entity = entities.pop(0)
        if entity.end <= entity.start:
            continue
        if not entities:
            result.append(entity)
            break

        peek = entities[0]
        if entity.end <= peek.start:
            # No overlap
            result.append(entity)
            continue

        # Note - sorting prevents:
        # entity.start > peek.start
        # entity.end > peek.end when starts are equal

        # subsume greedily
        start = entity.start
        end = max(entity.end, peek.end)
        candidates = [entity]

        while entities and peek.start < end:
            candidates.append(entities.pop(0))
            end = max(end, peek.end)
            if entities:
                peek = entities[0]
        best = find_best(candidates)
        result.append(NERPrediction(best.text, start, end, best.label, best.source, best.score))
    return result