Skip to content

fragment

fragment

Metadata fragment assembly for NER-annotated records.

Provides Metadata and MetadataFragment for aggregating per-field NER predictions, along with helpers to merge fragments, build entity maps, and produce API-compatible response dicts.

Classes:

Name Description
MetadataError

Raised when metadata fragments cannot be merged (e.g., mismatched IDs).

Metadata

Merged record metadata aggregated from one or more MetadataFragment objects.

MetadataFragment

A single annotation pass over a record (e.g., one NER model's output).

Functions:

Name Description
merge_fragments

Merge one or more MetadataFragment objects into a single Metadata.

fragment_for_record

Create a new MetadataFragment timestamped to the current time.

predictions_to_dict

Aggregate NER predictions into per-field results and an entity map.

fragment_from_ner_predictions

Build a MetadataFragment and entity map from NER predictions.

build_ner_metadata

Construct a Metadata object from raw prediction dicts.

create_ner_api_response

Build an API-compatible list of {data, model_metadata} dicts.

MetadataError

Bases: Exception

Raised when metadata fragments cannot be merged (e.g., mismatched IDs).

Metadata(record_id, fields, entities, received_at) dataclass

Merged record metadata aggregated from one or more MetadataFragment objects.

The fields dict has the structure::

field_name -> fragment_name -> metadata_type -> [metadata_items]

Methods:

Name Description
as_dict

Serialize to a plain dictionary.

Attributes:

Name Type Description
fields dict

Nested dict of per-field, per-fragment metadata.

entities dict

Entity map produced by predictions_to_dict.

received_at str

ISO-8601 timestamp of the earliest fragment.

fields instance-attribute

Nested dict of per-field, per-fragment metadata.

entities instance-attribute

Entity map produced by predictions_to_dict.

received_at instance-attribute

ISO-8601 timestamp of the earliest fragment.

as_dict()

Serialize to a plain dictionary.

Source code in src/nemo_safe_synthesizer/data_processing/records/fragment.py
def as_dict(self):
    """Serialize to a plain dictionary."""
    return self.__dict__

MetadataFragment(record_id, fragment_ts, fragment_epoch, fragment_name) dataclass

A single annotation pass over a record (e.g., one NER model's output).

Fragments are later merged via merge_fragments into a single Metadata object per record.

Parameters:

Name Type Description Default
record_id str

Unique identifier for the source record.

required
fragment_ts str

ISO-8601 timestamp string.

required
fragment_epoch float

Unix epoch of the fragment creation.

required
fragment_name str

Identifier for this annotation pass (e.g., "ner").

required

Methods:

Name Description
add_field_data

Append metadata entries for a field.

as_dict

Serialize to a plain dictionary.

Attributes:

Name Type Description
fragment_datetime datetime

Fragment creation time as a datetime object.

fragment_datetime property

Fragment creation time as a datetime object.

add_field_data(field_name, metadata_type, field_data)

Append metadata entries for a field.

Parameters:

Name Type Description Default
field_name str

Name of the field to annotate.

required
metadata_type str

Category of metadata (e.g., "labels").

required
field_data dict | list

A dict (single entry) or list of entries to add.

required

Raises:

Type Description
TypeError

If field_data is neither a dict nor a list.

Source code in src/nemo_safe_synthesizer/data_processing/records/fragment.py
def add_field_data(self, field_name: str, metadata_type: str, field_data: dict | list):
    """Append metadata entries for a field.

    Args:
        field_name: Name of the field to annotate.
        metadata_type: Category of metadata (e.g., ``"labels"``).
        field_data: A dict (single entry) or list of entries to add.

    Raises:
        TypeError: If ``field_data`` is neither a dict nor a list.
    """
    if isinstance(field_data, list):
        self.fields[field_name][metadata_type].extend(field_data)
    elif isinstance(field_data, dict):
        self.fields[field_name][metadata_type].append(field_data)
    else:
        raise TypeError("field_data must be a dict or list, got ", type(field_data))

as_dict()

Serialize to a plain dictionary.

Source code in src/nemo_safe_synthesizer/data_processing/records/fragment.py
def as_dict(self):
    """Serialize to a plain dictionary."""
    return self.__dict__

merge_fragments(*fragments, ts=None)

Merge one or more MetadataFragment objects into a single Metadata.

Parameters:

Name Type Description Default
*fragments

Fragments to merge. All must share the same gretel_id.

()
ts str | None

Override timestamp for received_at. Defaults to the earliest fragment timestamp.

None

Returns:

Type Description
Metadata

A single Metadata object with all fragment data merged.

Raises:

Type Description
MetadataError

If the fragments have different gretel_id values.

Source code in src/nemo_safe_synthesizer/data_processing/records/fragment.py
def merge_fragments(*fragments, ts: str | None = None) -> Metadata:
    """Merge one or more ``MetadataFragment`` objects into a single ``Metadata``.

    Args:
        *fragments: Fragments to merge. All must share the same ``gretel_id``.
        ts: Override timestamp for ``received_at``. Defaults to the earliest
            fragment timestamp.

    Returns:
        A single ``Metadata`` object with all fragment data merged.

    Raises:
        MetadataError: If the fragments have different ``gretel_id`` values.
    """
    if len(set([fragment.record_id for fragment in fragments])) != 1:
        raise MetadataError("cannot merge fragments from different records")
    else:
        record_id = fragments[0].record_id

    # todo(dn): there might be a better way to build up this object
    merged_fragment = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))
    ts = ts or min([f.fragment_datetime for f in fragments]).isoformat() + "Z"
    fragment: MetadataFragment
    for fragment in fragments:
        for field_name, field_data in fragment.fields.items():
            for meta_type, meta_data in field_data.items():
                merged_fragment[field_name][fragment.fragment_name][meta_type].extend(meta_data)
    return Metadata(record_id=record_id, fields=merged_fragment, received_at=ts, entities={})

fragment_for_record(record_id, fragment_name)

Create a new MetadataFragment timestamped to the current time.

Source code in src/nemo_safe_synthesizer/data_processing/records/fragment.py
def fragment_for_record(record_id: str, fragment_name: str) -> MetadataFragment:
    """Create a new ``MetadataFragment`` timestamped to the current time."""
    epoch = time.time()
    ts = datetime.fromtimestamp(epoch).isoformat() + "Z"
    return MetadataFragment(
        record_id=record_id,
        fragment_epoch=epoch,
        fragment_ts=ts,
        fragment_name=fragment_name,
    )

predictions_to_dict(predictions, *, high_score=Score.HIGH, med_score=Score.MED)

Aggregate NER predictions into per-field results and an entity map.

Groups predictions by field and builds a score-bucketed entity map::

{
    "score_high": ["ip_address", ...],
    "score_med": [],
    "score_low": [],
    "fields_by_entity": {"ip_address": ["conn_str"]},
}

Parameters:

Name Type Description Default
predictions list[NERPrediction]

List of NER prediction objects.

required
high_score float

Minimum score threshold for the score_high bucket.

HIGH
med_score float

Minimum score threshold for the score_med bucket.

MED

Returns:

Type Description
tuple[dict, dict]

A tuple of (predictions_by_field, entity_map).

Source code in src/nemo_safe_synthesizer/data_processing/records/fragment.py
def predictions_to_dict(
    predictions: list[NERPrediction],
    *,
    high_score: float = Score.HIGH,
    med_score: float = Score.MED,
) -> tuple[dict, dict]:
    """Aggregate NER predictions into per-field results and an entity map.

    Groups predictions by field and builds a score-bucketed entity map::

        {
            "score_high": ["ip_address", ...],
            "score_med": [],
            "score_low": [],
            "fields_by_entity": {"ip_address": ["conn_str"]},
        }

    Args:
        predictions: List of NER prediction objects.
        high_score: Minimum score threshold for the ``score_high`` bucket.
        med_score: Minimum score threshold for the ``score_med`` bucket.

    Returns:
        A tuple of (predictions_by_field, entity_map).
    """
    entity_map = {
        SCORE_HIGH: set(),
        SCORE_MED: set(),
        SCORE_LOW: set(),
        E2F: defaultdict(set),
    }
    predictions_by_key = defaultdict(list)
    for prediction in predictions:
        predictions_by_key[prediction.field].append(
            {
                "start": prediction.start,
                "end": prediction.end,
                "label": prediction.label,
                "score": prediction.score,
                "source": prediction.source,
                "text": prediction.text,
            }
        )
        # NOTE:(jm) this covers the Spacy case where
        # no score is emitted. Predictions here could be
        # hit or miss so we throw it into medium
        if prediction.score is None:
            entity_map[SCORE_MED].add(prediction.label)
        elif prediction.score >= high_score:
            entity_map[SCORE_HIGH].add(prediction.label)
        elif prediction.score >= med_score:
            entity_map[SCORE_MED].add(prediction.label)
        else:
            entity_map[SCORE_LOW].add(prediction.label)
        entity_map[E2F][prediction.label].add(prediction.field)
    for _, preds in predictions_by_key.items():
        preds.sort(key=lambda p: p["start"])
    for level in (SCORE_HIGH, SCORE_MED, SCORE_LOW):
        entity_map[level] = list(entity_map[level])
    for entity, _set in entity_map[E2F].items():
        entity_map[E2F][entity] = list(_set)
    return predictions_by_key, entity_map

fragment_from_ner_predictions(fragment_name, predictions, record_id)

Build a MetadataFragment and entity map from NER predictions.

Parameters:

Name Type Description Default
fragment_name str

Identifier for this annotation pass (e.g., "ner").

required
predictions list[NERPrediction]

List of NER predictions to aggregate.

required
gretel_id

Unique identifier for the source record.

required

Returns:

Type Description
tuple[MetadataFragment, dict]

A tuple of (fragment, entity_map).

Source code in src/nemo_safe_synthesizer/data_processing/records/fragment.py
def fragment_from_ner_predictions(
    fragment_name: str,
    predictions: list[NERPrediction],
    record_id: str,
) -> tuple[MetadataFragment, dict]:
    """Build a ``MetadataFragment`` and entity map from NER predictions.

    Args:
        fragment_name: Identifier for this annotation pass (e.g., ``"ner"``).
        predictions: List of NER predictions to aggregate.
        gretel_id: Unique identifier for the source record.

    Returns:
        A tuple of (fragment, entity_map).
    """
    epoch = time.time()
    fragment = MetadataFragment(
        record_id=record_id,
        fragment_ts=datetime.fromtimestamp(epoch).isoformat() + "Z",
        fragment_epoch=epoch,
        fragment_name=fragment_name,
    )
    preds_by_field, ent_map = predictions_to_dict(predictions)
    for _field, preds in preds_by_field.items():
        fragment.add_field_data(_field, "labels", preds)

    return fragment, ent_map

build_ner_metadata(preds)

Construct a Metadata object from raw prediction dicts.

Source code in src/nemo_safe_synthesizer/data_processing/records/fragment.py
def build_ner_metadata(preds: list[dict]) -> Metadata:
    """Construct a ``Metadata`` object from raw prediction dicts."""
    preds = [NERPrediction.from_dict(p) for p in preds]
    fragment, ent_map = fragment_from_ner_predictions(
        "ner",
        preds,
        uuid.uuid4().hex,
    )
    meta = merge_fragments(fragment)
    meta.entities = ent_map
    return meta.as_dict()

create_ner_api_response(records, predictions, pure_dict=False)

Build an API-compatible list of {data, model_metadata} dicts.

Parameters:

Name Type Description Default
records list[dict]

Raw record dictionaries.

required
predictions list[dict]

Per-record NER prediction lists (parallel with records).

required
pure_dict bool

If True, round-trip through JSON to eliminate non-dict types.

False

Returns:

Type Description
list[dict]

List of dicts, each containing data and model_metadata keys.

Source code in src/nemo_safe_synthesizer/data_processing/records/fragment.py
def create_ner_api_response(records: list[dict], predictions: list[dict], pure_dict: bool = False) -> list[dict]:
    """Build an API-compatible list of ``{data, model_metadata}`` dicts.

    Args:
        records: Raw record dictionaries.
        predictions: Per-record NER prediction lists (parallel with ``records``).
        pure_dict: If True, round-trip through JSON to eliminate non-dict types.

    Returns:
        List of dicts, each containing ``data`` and ``model_metadata`` keys.
    """
    out = [
        {"data": record, "model_metadata": build_ner_metadata(prediction)}
        for record, prediction in zip(records, predictions)
    ]
    if pure_dict:
        return json.loads(json.dumps(out))
    return out