Source code for smlmlp.modules.analysis_LP._functions.metric.metric_nena

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author        : Lancelot PINCET
# GitHub        : https://github.com/LancelotPincet



import numba as nb
import numpy as np
from arrlp import nb_threads
from scipy.optimize import curve_fit
from smlmlp import analysis



[docs] @analysis(df_name="points") def metric_nena( col, blk, fr, *, bins=101, cuda=False, parallel=False, ): """ Estimate one-dimensional NeNA precision from consecutive points in each blink. Parameters ---------- col : array-like Scalar localization values used for NeNA differences. blk : array-like Blink index for each localization. fr : array-like Frame identifiers used to order localizations inside each blink. bins : int or array-like, optional Histogram bin specification passed to :func:`numpy.histogram`. cuda, parallel : bool or int, optional Execution options accepted by all analysis functions. ``cuda`` is kept for API consistency and is not used in this CPU implementation. Returns ------- precision : float One-dimensional NeNA precision, computed as ``sigma / sqrt(2)`` from the fitted Gaussian width. info : dict Diagnostic information with differences, histogram, fit parameters, and blink counts. """ values = np.asarray(col, dtype=np.float32) blinks = np.asarray(blk, dtype=np.uint64) frames = np.asarray(fr, dtype=np.int64) n_input = len(values) if not (len(blinks) == len(frames) == n_input): raise ValueError("col, blk, and fr must have the same length.") valid = np.isfinite(values) & np.isfinite(frames) valid &= blinks > 0 values = values[valid] blinks = blinks[valid] frames = frames[valid] n = len(values) if n == 0: raise ValueError("No valid localizations found.") order = np.lexsort((frames, blinks)) values_s = values[order].astype(np.float64) blinks_s = blinks[order] unique_blinks, starts, counts = np.unique( blinks_s, return_index=True, return_counts=True, ) differences_s = np.full(n, np.nan, dtype=np.float64) with nb_threads(parallel): _nena_differences(values_s, starts, counts, differences_s) differences = np.full(n, np.nan, dtype=np.float64) differences[order] = differences_s differences = differences[np.isfinite(differences)] if len(differences) < 3: raise ValueError("At least three non-degenerate differences are required.") sigma0 = float(np.std(differences, ddof=1)) if not np.isfinite(sigma0) or sigma0 <= 0: raise ValueError("At least three non-degenerate differences are required.") hist, edges = np.histogram(differences, bins=bins) centers = 0.5 * (edges[:-1] + edges[1:]) if len(centers) < 3: raise ValueError("At least three histogram bins are required.") amplitude0 = float(hist.max()) mean0 = float(np.mean(differences)) p0 = (amplitude0, mean0, sigma0) lower = (0.0, -np.inf, np.finfo(float).eps) upper = (np.inf, np.inf, np.inf) try: popt, pcov = curve_fit( _gaussian_1d, centers, hist.astype(np.float64), p0=p0, bounds=(lower, upper), maxfev=10000, ) except (RuntimeError, ValueError) as exc: raise ValueError("Gaussian fit failed.") from exc fit_amplitude = float(popt[0]) fit_mean = float(popt[1]) fit_sigma = float(popt[2]) precision = fit_sigma / np.sqrt(2.0) info = { "differences": differences, "hist": hist, "bin_edges": edges, "bin_centers": centers, "fit_params": popt, "fit_covariance": pcov, "fit_amplitude": fit_amplitude, "fit_mean": fit_mean, "fit_sigma": fit_sigma, "precision": float(precision), "n_input_localizations": n_input, "n_localizations": n, "n_pairs": len(differences), "n_blinks": len(unique_blinks), "locs_per_blink": counts, "blink_ids": unique_blinks, } return float(precision), info
def _gaussian_1d(x, amplitude, mean, sigma): """Evaluate a one-dimensional Gaussian profile.""" return amplitude * np.exp(-0.5 * ((x - mean) / sigma) ** 2) @nb.njit(cache=True, fastmath=True, parallel=True) def _nena_differences(values, starts, counts, out): """Compute (col_i-col_{i+1}) within each blink; last sample stays NaN.""" for blink_index in nb.prange(len(starts)): start = starts[blink_index] count = counts[blink_index] for point_index in range(count - 1): idx = start + point_index out[idx] = values[idx] - values[idx + 1]