Skip to content

edit

edit

Classes:

Name Description
TransformFnAccounting

Tracks which transform functions or filters are applied to each column for reporting.

ProgressStatus

Mutable progress counters and labels for transformation steps (step, rule, row, column).

ProgressLog

Throttled progress logging for transformation; logs to logger.user at most every log_duration seconds.

Step

Single transformation step: applies column/row add/drop/rename/update rules to a DataFrame.

Editor

Applies a sequence of transformation steps to a DataFrame (columns/rows add, drop, rename, update).

Functions:

Name Description
instantiate_vars

Recursively render template strings in var_value and eval to Python types.

TransformFnAccounting(included_fns)

Tracks which transform functions or filters are applied to each column for reporting.

Parameters:

Name Type Description Default
included_fns list[str]

Function/filter names to track; others are ignored (or recorded as jinja).

required

Attributes:

Name Type Description
included_fns set[str]

Set of names that are included in accounting.

column_fns dict[str, set[str]]

Map of column name to set of function/filter names applied to that column.

Methods:

Name Description
update

Record that the given functions/filters were applied to the given columns.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def __init__(self, included_fns: list[str]):
    self.included_fns = set(included_fns)
    self.column_fns = defaultdict(set)

update(column_names, fns)

Record that the given functions/filters were applied to the given columns.

Parameters:

Name Type Description Default
column_names str | Iterable[str]

Column name(s) to record; a single string or iterable of strings.

required
fns str | set[str]

Name(s) of functions or filters applied; intersected with included_fns.

required
Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def update(self, column_names: str | Iterable[str], fns: str | set[str]) -> None:
    """Record that the given functions/filters were applied to the given columns.

    Args:
        column_names: Column name(s) to record; a single string or iterable of strings.
        fns: Name(s) of functions or filters applied; intersected with ``included_fns``.
    """
    if isinstance(fns, str):
        fns = set([fns])
    fns &= self.included_fns
    if not fns:
        fns = {"jinja"}
    if isinstance(column_names, str):
        column_names = [column_names]
    for column_name in column_names:
        self.column_fns[column_name] |= fns

ProgressStatus(step_n=0, step_n_total=0, update_rule_n=0, update_rule_n_total=0, update_rule_description='', row_n=0, row_n_total=0, column_n=0, column_n_total=0, column_name='') dataclass

Mutable progress counters and labels for transformation steps (step, rule, row, column).

Attributes:

Name Type Description
step_n int

Current step index (0-based).

step_n_total int

Total number of steps.

update_rule_n int

Current update rule index (0-based).

update_rule_n_total int

Total number of update rules in the current step.

update_rule_description str

Description of the current update rule (for logging).

row_n int

Number of rows processed so far.

row_n_total int

Total number of rows to process for the current column.

column_n int

Current column index (0-based).

column_n_total int

Total number of columns in the current update rule.

column_name str

Name of the column currently being processed.

step_n = 0 class-attribute instance-attribute

Current step index (0-based).

step_n_total = 0 class-attribute instance-attribute

Total number of steps.

update_rule_n = 0 class-attribute instance-attribute

Current update rule index (0-based).

update_rule_n_total = 0 class-attribute instance-attribute

Total number of update rules in the current step.

update_rule_description = '' class-attribute instance-attribute

Description of the current update rule (for logging).

row_n = 0 class-attribute instance-attribute

Number of rows processed so far.

row_n_total = 0 class-attribute instance-attribute

Total number of rows to process for the current column.

column_n = 0 class-attribute instance-attribute

Current column index (0-based).

column_n_total = 0 class-attribute instance-attribute

Total number of columns in the current update rule.

column_name = '' class-attribute instance-attribute

Name of the column currently being processed.

ProgressLog(log_duration)

Throttled progress logging for transformation; logs to logger.user at most every log_duration seconds.

Parameters:

Name Type Description Default
log_duration float

Minimum seconds between log emissions.

required

Attributes:

Name Type Description
status ProgressStatus

Current progress counters and labels.

start_time float

Monotonic time when logging started.

last_log float

Monotonic time of last log.

log_duration float

Minimum interval between logs in seconds.

Methods:

Name Description
log_throttled

Emit a progress log if at least log_duration seconds have passed, or if force is True.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def __init__(self, log_duration: float):
    self.status = ProgressStatus()
    self.start_time = monotonic()
    self.last_log = monotonic()
    self.log_duration = log_duration

log_throttled(force=False)

Emit a progress log if at least log_duration seconds have passed, or if force is True.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def log_throttled(self, force: bool = False) -> None:
    """Emit a progress log if at least ``log_duration`` seconds have passed, or if ``force`` is ``True``."""
    if force or monotonic() - self.last_log > self.log_duration:
        duration = monotonic() - self.start_time
        rows_per_second = 0 if duration == 0 else (self.status.row_n) / duration
        speed_emoji = "🐇" if rows_per_second >= 10 else "🐢"
        column_string = (
            f""""{self.status.column_name}", #{self.status.column_n + 1} of {self.status.column_n_total}"""
            if self.status.column_name
            else ""
        )
        duration_string = f"{duration:.1f} seconds" if duration < 120 else f"{duration / 60:.1f} minutes"
        update_rule_description = ""
        if self.status.update_rule_description:
            update_rule_description = f'"{self.status.update_rule_description}"'
        row_n_conditional_s = "s" if self.status.row_n != 1 else ""
        progress_data = {
            "transform_time": duration_string,
            "step": f"{self.status.step_n + 1} of {self.status.step_n_total}",
            "rule": f"{self.status.update_rule_n + 1} of {self.status.update_rule_n_total} {update_rule_description}",
            "column": column_string,
            "progress": f"{self.status.row_n} row{row_n_conditional_s} out of {self.status.row_n_total} transformed",
            "speed": f"{speed_emoji} {rows_per_second:.1f} rows per second.",
        }
        logger.user.info(
            "",
            extra={
                "ctx": {
                    "render_table": True,
                    "tabular_data": progress_data,
                    "title": "Transformation Progress",
                }
            },
        )

        self.last_log = monotonic()

Step

Single transformation step: applies column/row add/drop/rename/update rules to a DataFrame.

Used via Step.execute; holds _env (Jinja + faker) and _vars for the step.

Methods:

Name Description
do_make_template

Build a Jinja template from the string (may raise TemplateError).

make_template

Build a Jinja template; raise with error_id='param' on failure.

template_to_fnames

Return the set of filter/function names referenced in the template (e.g. fake, re).

update_ner_cache

Pre-fill the entity extractor cache for the given text series (e.g. before row updates).

execute

Run one transformation step: apply column add/drop/rename and row drop/update from step_config.

do_make_template(template_str)

Build a Jinja template from the string (may raise TemplateError).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def do_make_template(self, template_str: str) -> Template:
    """Build a Jinja template from the string (may raise ``TemplateError``)."""
    return self._env.make_template(template_str)

make_template(template_str)

Build a Jinja template; raise with error_id='param' on failure.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def make_template(self, template_str: str) -> Template:
    """Build a Jinja template; raise with ``error_id='param'`` on failure."""
    try:
        return self.do_make_template(template_str)
    except TemplateError as e:
        raise Exception(
            f"Error building jinja template '{template_str}': {e}",
            error_id="param",
        )

template_to_fnames(template_str)

Return the set of filter/function names referenced in the template (e.g. fake, re).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def template_to_fnames(self, template_str: str) -> set[str]:
    """Return the set of filter/function names referenced in the template (e.g. ``fake``, ``re``)."""
    retval = set()
    try:
        retval = self._env.template_to_fnames(template_str)
    except TemplateError:
        # Let other template functions raise the error
        pass
    return retval

update_ner_cache(texts, entities=None)

Pre-fill the entity extractor cache for the given text series (e.g. before row updates).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def update_ner_cache(self, texts: pd.Series, entities: set[str] | None = None) -> None:
    """Pre-fill the entity extractor cache for the given text series (e.g. before row updates)."""
    self._env.entity_extractor.batch_update_cache([str(s) for s in texts], entities)

execute(df, entities, column_types, step_config, env, progress, fnreport) classmethod

Run one transformation step: apply column add/drop/rename and row drop/update from step_config.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to transform (modified in place).

required
entities dict[str, str]

Column name to entity type.

required
column_types dict[str, str]

Column name to column type.

required
step_config dict[str, dict]

Step config with optional vars, columns (add/drop/rename), rows (drop/update).

required
env Environment

Environment (Jinja, faker, entity extractor).

required
progress ProgressLog

Progress logger for throttled output.

required
fnreport TransformFnAccounting | None

Optional accounting for which functions were applied per column.

required

Returns:

Type Description
DataFrame

The same DataFrame after applying the step (index reset).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
@classmethod
def execute(
    cls,
    df: pd.DataFrame,
    entities: dict[str, str],
    column_types: dict[str, str],
    step_config: dict[str, dict],
    env: Environment,
    progress: ProgressLog,
    fnreport: TransformFnAccounting | None,
) -> pd.DataFrame:
    """Run one transformation step: apply column add/drop/rename and row drop/update from ``step_config``.

    Args:
        df: DataFrame to transform (modified in place).
        entities: Column name to entity type.
        column_types: Column name to column type.
        step_config: Step config with optional ``vars``, ``columns`` (add/drop/rename), ``rows`` (drop/update).
        env: Environment (Jinja, faker, entity extractor).
        progress: Progress logger for throttled output.
        fnreport: Optional accounting for which functions were applied per column.

    Returns:
        The same DataFrame after applying the step (index reset).
    """
    step = cls()
    step._env = env
    step._vars = {}
    vars_config = step_config.get("vars") or {}
    for var_name, var_value in vars_config.items():
        step._vars[var_name] = instantiate_vars(var_name, var_value, step, df)
    columns_config = step_config.get("columns") or {}
    for action_name, action_config in columns_config.items():
        if action_name == "add" and action_config is not None:
            step._add_columns(df, action_config)
        elif action_name == "drop" and action_config is not None:
            step._drop_columns(df, action_config, entities, column_types, fnreport)
        elif action_name == "rename" and action_config is not None:
            step._rename_columns(df, action_config)
    rows_config = step_config.get("rows") or {}
    for action_name, action_config in rows_config.items():
        if action_name == "drop" and action_config is not None:
            step._drop_rows(df, action_config)
        elif action_name == "update" and action_config is not None:
            step._update_rows(df, action_config, entities, column_types, progress, fnreport)
    df = df.reset_index(drop=True)
    return df

Editor(config, entity_extractor)

Applies a sequence of transformation steps to a DataFrame (columns/rows add, drop, rename, update).

Config is a dict with steps; each step has optional vars, columns, and rows. Uses Environment for Jinja templates and entity extraction.

Parameters:

Name Type Description Default
config dict[str, dict]

Editor config (e.g. from YAML) with globals and steps.

required
entity_extractor EntityExtractor | None

Optional extractor for NER in templates; Environment holds it.

required

Methods:

Name Description
load_yaml

Build an Editor from a YAML string (e.g. yaml.safe_load(yaml_str)).

process_df

Apply all transformation steps to a deep copy of df and return the result.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def __init__(self, config: dict[str, dict], entity_extractor: EntityExtractor | None) -> None:
    self.config = config
    self._config_globals(entity_extractor)

load_yaml(yaml_str) classmethod

Build an Editor from a YAML string (e.g. yaml.safe_load(yaml_str)).

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
@classmethod
def load_yaml(cls, yaml_str: str) -> Editor:
    """Build an ``Editor`` from a YAML string (e.g. ``yaml.safe_load(yaml_str)``)."""
    return cls(yaml.safe_load(yaml_str))

process_df(df, entities, column_types, fnreport=None)

Apply all transformation steps to a deep copy of df and return the result.

Parameters:

Name Type Description Default
df DataFrame

Source DataFrame (not modified).

required
entities dict[str, str]

Column name to entity type.

required
column_types dict[str, str]

Column name to column type.

required
fnreport TransformFnAccounting | None

Optional accounting for which functions were applied per column.

None

Returns:

Type Description
DataFrame

Transformed DataFrame.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def process_df(
    self,
    df: pd.DataFrame,
    entities: dict[str, str],
    column_types: dict[str, str],
    fnreport: TransformFnAccounting | None = None,
) -> pd.DataFrame:
    """Apply all transformation steps to a deep copy of ``df`` and return the result.

    Args:
        df: Source DataFrame (not modified).
        entities: Column name to entity type.
        column_types: Column name to column type.
        fnreport: Optional accounting for which functions were applied per column.

    Returns:
        Transformed DataFrame.
    """
    df_copy = df.copy(deep=True)
    return self._process_df(df_copy, entities, column_types, fnreport)

instantiate_vars(var_name, var_value, step, df)

Recursively render template strings in var_value and eval to Python types.

Strings are rendered with step and df; then ast.literal_eval is attempted. Dicts and lists are processed recursively. Template errors for var_name raise with error_id='param'. Order of vars in config can affect what is available during render.

Parameters:

Name Type Description Default
var_name str

Name of the variable (used in error messages).

required
var_value dict | list | str

Current value (string, list, or dict) to render and optionally eval.

required
step Step

Step with _env and _vars for template rendering.

required
df DataFrame

DataFrame available as data in templates.

required

Returns:

Type Description
Any

Rendered value, with strings possibly converted to bool/int/float/list/dict via literal_eval.

Source code in src/nemo_safe_synthesizer/pii_replacer/data_editor/edit.py
def instantiate_vars(var_name: str, var_value: dict | list | str, step: Step, df: pd.DataFrame) -> Any:
    """Recursively render template strings in ``var_value`` and eval to Python types.

    Strings are rendered with ``step`` and ``df``; then ``ast.literal_eval`` is attempted.
    Dicts and lists are processed recursively. Template errors for ``var_name`` raise with
    ``error_id='param'``. Order of vars in config can affect what is available during render.

    Args:
        var_name: Name of the variable (used in error messages).
        var_value: Current value (string, list, or dict) to render and optionally eval.
        step: Step with ``_env`` and ``_vars`` for template rendering.
        df: DataFrame available as ``data`` in templates.

    Returns:
        Rendered value, with strings possibly converted to bool/int/float/list/dict via ``literal_eval``.
    """
    if isinstance(var_value, str):
        try:
            var_value = step.do_make_template(var_value).render(data=df, vars=step._vars)
        except TemplateSyntaxError:
            # If it cannot be rendered as template, take the literal string.
            pass
        except TemplateError as e:
            # If it's valid jinja syntax but some other error occurred, assume user error.
            raise Exception(
                f"Error building jinja template for var '{var_name}': '{var_value}': {e}",
                error_id="param",
            )

        try:
            var_value = ast.literal_eval(var_value)
        except (ValueError, TypeError, SyntaxError):
            # Assume just a regular string. Can also raise MemoryError and RecursionError,
            # but that would likely mean the user did not intend it to be used as a string
            # and should throw an error.
            pass
    elif isinstance(var_value, list):
        var_value = [instantiate_vars(var_name, elm, step, df) for elm in var_value]
    elif isinstance(var_value, dict):
        var_value = {k: instantiate_vars(var_name, v, step, df) for k, v in var_value.items()}

    return var_value