Source code for smlmlp.modules.Locs_LP.Locs

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Date          : 2026-02-20
# Author        : Lancelot PINCET
# GitHub        : https://github.com/LancelotPincet
# Library       : smlmLP
# Module        : Locs

"""
Store localization dataframes and metadata for one experiment.

This class defines objects corresponding to localization sets for a single
SMLM experiment. It holds various dataframes (detections, blinks, etc.) and
provides methods for loading, saving, filtering, and manipulating the data.
"""

from contextlib import nullcontext

import numpy as np
import pandas as pd

from corelp import folder, prop, selfkwargs, Path
from smlmlp import Config, DetsDataFrame, LocsReceiver, append_df, columns, open_df, save_df


[docs] class Locs(LocsReceiver): """ Store localization dataframes and metadata for one experiment. Parameters ---------- source : Locs, pandas.DataFrame, path-like, list[pandas.DataFrame], or None, default=None Localization source to open. Can be an existing Locs object, a DataFrame, a path to a file or folder, a list of DataFrames, or None. config : Config, path-like, dict, or None, default=None Configuration source passed to Config. **kwargs Additional attributes assigned on the instance. Attributes ---------- df_dict : dict Mapping of dataframe names to dataframe instances. config : Config Configuration attached to this localization set. detections : DetsDataFrame Shortcut to the detections dataframe. ndetections : int Number of detections in the dataset. times : dict Alias for timing storage used by analysis decorators. Examples -------- >>> from smlmlp import Locs >>> instance = Locs() >>> # Or from a DataFrame >>> import pandas as pd >>> import numpy as np >>> df = pd.DataFrame({ ... "frame": [1, 1, 2], ... "x detection [nm]": [10.0, 20.0, 30.0], ... "y detection [nm]": [15.0, 25.0, 35.0], ... }) >>> locs = Locs(df) >>> locs.ndetections 3 """ df_dict = None def __new__(cls, source=None, config=None, **kwargs): """Create or reuse an existing Locs instance.""" if isinstance(source, cls): return source return object.__new__(cls) def __init__(self, source=None, config=None, **kwargs): """Initialize the Locs object.""" self.df_dict = dict(detections=DetsDataFrame(self)) selfkwargs(self, kwargs) if isinstance(source, Locs): return if source is not None: self.open(source) if config is not None: self.config_source = config timeit = ( self.printer.timeit("opening config file") if self.printer is not None else nullcontext() ) with timeit: self.config = Config( config=self.config_source, **self.config_kwargs ) # Config source @prop() def config_source(self): """Return config source.""" return None @config_source.setter def config_source(self, value): """Set config source.""" if getattr(self, "_config_source", None) is not None: raise ValueError("Config source is defined twice") self._config_source = value @property def config_kwargs(self): """Return config kwargs.""" return dict(ncameras=1) if self.config_source is None else dict() # Detections properties @property def detections(self): """Return detections dataframe.""" return self.df_dict["detections"] @property def ndetections(self): """Return number of detections.""" return len(self.detections) # Analysis timing @prop(cache=True) def time(self): """Return timing storage dictionary.""" return {} @property def times(self): """Alias timing storage used by analysis decorators.""" return self.time printer = None # Opening
[docs] def open(self, source): """Open localization data from a supported source. Parameters ---------- source : pandas.DataFrame, str, Path, list, tuple, or None Source to open data from. Supported types: - pandas.DataFrame: directly loaded as detections - str/Path to CSV: loaded as DataFrame - str/Path to JSON: loaded as config - str/Path to directory: loads all CSV/JSON files - list/tuple of DataFrames: multiple dataframes combined - None: does nothing Raises ------ SyntaxError If source type is not recognized. ValueError If multiple or no detections dataframes are provided in a list. """ if source is None: return elif isinstance(source, pd.DataFrame): open_df(self, source, self.printer) elif isinstance(source, str) or isinstance(source, Path("").__class__): path = Path(source) if path.is_dir(): stem = path.stem detection_file = path / f"{stem}_[detections].csv" if not detection_file.exists(): raise ValueError( 'Cannot open folder without "detections" file' ) self.open(detection_file) for file in path.glob(f"{stem}_[*].csv"): if file == detection_file: continue self.open(file) config_file = path / f"{stem}_[metadata].json" if config_file.exists(): self.open(config_file) else: if path.suffix == ".csv": self.open(pd.read_csv(path)) elif path.suffix == ".json": self.config_source = path else: raise SyntaxError(f"Locs cannot open file {path}") elif isinstance(source, (list, tuple)): source = list(source) dets_idx = None for idx, df in enumerate(source): if not isinstance(df, pd.DataFrame): raise ValueError( "Can only open lists/tuples of DataFrames" ) index = df.index.name if index is not None: index = index.replace('"', "") index = index.replace("'", "") else: index = "detection" col_index = columns.headers[index] df_name = col_index.df_name if df_name == "detections": if dets_idx is not None: raise ValueError( "Multiple detections dataframe to open is not possible" ) dets_idx = idx if dets_idx is None: raise ValueError( "No detections dataframe to open is not possible" ) dets = source.pop(dets_idx) source = [dets] + source for value in source: self.open(value) else: raise SyntaxError("source type was not recognized for Locs")
# Saving
[docs] def save(self, path, file=None): """Save localization dataframes and metadata to a folder. Parameters ---------- path : path-like Directory path to save to. file : str or None, optional Optional filename within the directory. """ path = Path(path) if file is None else Path(path) / file saving_folder = folder(path.with_suffix(""), warning=False) stem = saving_folder.stem mainpath = saving_folder / f"{stem}.csv" for df_name, df in self.df_dict.items(): if df is None or len(df.columns) == 0: continue path = mainpath.with_stem(f"{stem}_[{df_name}]") save_df(df, path, df.head2save, self.printer) mainpath = saving_folder / f"{stem}_[metadata].json" self.config.save(mainpath)
[docs] def append(self, path, file=None): """Append localization dataframes to existing CSV files. If the destination files are missing, fallback to :meth:`save`. """ target_path = Path(path) if file is None else Path(path) / file saving_folder = folder(target_path.with_suffix(""), warning=False, new=False) stem = saving_folder.stem mainpath = saving_folder / f"{stem}.csv" df_entries = [] for df_name, df in self.df_dict.items(): if df is None or len(df.columns) == 0: continue df_path = mainpath.with_stem(f"{stem}_[{df_name}]") df_entries.append((df, df_path)) metadata_path = saving_folder / f"{stem}_[metadata].json" if len(df_entries) == 0: self.config.save(metadata_path) return if (not metadata_path.exists()) or any(not df_path.exists() for _, df_path in df_entries): self.save(target_path) return for df, df_path in df_entries: append_df(df, df_path, df.head2save, self.printer) self.config.save(metadata_path)
# Filter
[docs] def filter(self, *filter_names, mask=None, df_name="detections"): """Return a new Locs object containing rows matching all filters. Parameters ---------- *filter_names : str Names of filters to apply. mask : array-like or None, optional Additional boolean mask to apply. df_name : str, optional Name of the dataframe to filter. Default is ``'detections'``. Returns ------- Locs New Locs object with filtered data. """ base_df = self.df_dict[df_name] filters = [getattr(base_df, name, None) for name in filter_names] if mask is not None: filters.append(mask) keep = np.logical_and.reduce(filters) if filters else np.ones(len(base_df), bool) keep = np.asarray(keep, dtype=bool) if len(keep) != len(base_df): raise ValueError( f"Filter mask length {len(keep)} does not match {df_name} length {len(base_df)}" ) detections = self.detections if base_df is detections: detections_keep = keep elif base_df.index_header in detections.columns: kept_ids = base_df.index[keep] detections_keep = detections[base_df.index_header].isin(kept_ids).to_numpy() else: raise ValueError( f"Cannot filter detections from {df_name}: missing " f"{base_df.index_header!r} in detections" ) filtered_detections = detections.loc[detections_keep].copy() df_list = [filtered_detections] for name, df in self.df_dict.items(): if name == "detections" or df is None: continue if df is base_df: df_keep = keep elif df.index_header in filtered_detections.columns: kept_ids = filtered_detections[df.index_header].dropna().unique() kept_ids = kept_ids[kept_ids != 0] df_keep = df.index.isin(kept_ids) else: continue df_list.append(df.loc[df_keep].copy()) for df in df_list: df.drop(columns=["filter"], inplace=True, errors="ignore") return Locs(df_list, config=self.config, printer=self.printer)
# Split
[docs] def split(self, nlocs=2): """Split localizations by frame modulo ``nlocs``. Parameters ---------- nlocs : int, optional Number of splits. Default is ``2``. Returns ------- tuple of Locs Tuple of Locs objects, one for each split. """ locs_list = [] for i in range(nlocs): mask = self.blinks.fr % nlocs == i newloc = self.filter(mask=mask, df_name="blinks") locs_list.append(newloc) return tuple(locs_list)
# Combine
[docs] def combine(self, *locs_list, col_name="ch"): """Combine this Locs object with others and label their source. Parameters ---------- *locs_list : Locs Additional Locs objects to combine. col_name : str, optional Name of the column to store channel labels. Default is ``'ch'``. Returns ------- Locs New Locs object with combined detections and channel labels. """ locs_list = [self] + list(locs_list) detections = pd.concat( [locs.detections for locs in locs_list], ignore_index=True ) newlocs = Locs(detections, config=self.config, printer=self.printer) newcol = np.hstack( [ np.full( locs.ndetections, fill_value=i + 1, dtype=np.uint8 ) for i, locs in enumerate(locs_list) ] ) setattr(newlocs.detections, col_name, newcol) return newlocs
# Crop
[docs] def crop(self, xmin, ymin, xmax, ymax): """Crop detections to an inclusive xy rectangle. Parameters ---------- xmin : float Minimum x coordinate in nm. ymin : float Minimum y coordinate in nm. xmax : float Maximum x coordinate in nm. ymax : float Maximum y coordinate in nm. Returns ------- Locs New Locs object with cropped detections. """ x, y = self.detections.x, self.detections.y mask = (x >= xmin) & (x <= xmax) & (y >= ymin) & (y <= ymax) return self.filter(mask=mask)
if __name__ == "__main__": from corelp import test test(__file__)