#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author : Lancelot PINCET
# GitHub : https://github.com/LancelotPincet
import numba as nb
import numpy as np
from arrlp import nb_threads
from scipy.optimize import curve_fit
from smlmlp import analysis
[docs]
@analysis(df_name="points")
def metric_nena(
col,
blk,
fr,
*,
bins=101,
cuda=False,
parallel=False,
):
"""
Estimate one-dimensional NeNA precision from consecutive points in each blink.
Parameters
----------
col : array-like
Scalar localization values used for NeNA differences.
blk : array-like
Blink index for each localization.
fr : array-like
Frame identifiers used to order localizations inside each blink.
bins : int or array-like, optional
Histogram bin specification passed to :func:`numpy.histogram`.
cuda, parallel : bool or int, optional
Execution options accepted by all analysis functions. ``cuda`` is kept
for API consistency and is not used in this CPU implementation.
Returns
-------
precision : float
One-dimensional NeNA precision, computed as ``sigma / sqrt(2)`` from the
fitted Gaussian width.
info : dict
Diagnostic information with differences, histogram, fit parameters, and
blink counts.
"""
values = np.asarray(col, dtype=np.float32)
blinks = np.asarray(blk, dtype=np.uint64)
frames = np.asarray(fr, dtype=np.int64)
n_input = len(values)
if not (len(blinks) == len(frames) == n_input):
raise ValueError("col, blk, and fr must have the same length.")
valid = np.isfinite(values) & np.isfinite(frames)
valid &= blinks > 0
values = values[valid]
blinks = blinks[valid]
frames = frames[valid]
n = len(values)
if n == 0:
raise ValueError("No valid localizations found.")
order = np.lexsort((frames, blinks))
values_s = values[order].astype(np.float64)
blinks_s = blinks[order]
unique_blinks, starts, counts = np.unique(
blinks_s,
return_index=True,
return_counts=True,
)
differences_s = np.full(n, np.nan, dtype=np.float64)
with nb_threads(parallel):
_nena_differences(values_s, starts, counts, differences_s)
differences = np.full(n, np.nan, dtype=np.float64)
differences[order] = differences_s
differences = differences[np.isfinite(differences)]
if len(differences) < 3:
raise ValueError("At least three non-degenerate differences are required.")
sigma0 = float(np.std(differences, ddof=1))
if not np.isfinite(sigma0) or sigma0 <= 0:
raise ValueError("At least three non-degenerate differences are required.")
hist, edges = np.histogram(differences, bins=bins)
centers = 0.5 * (edges[:-1] + edges[1:])
if len(centers) < 3:
raise ValueError("At least three histogram bins are required.")
amplitude0 = float(hist.max())
mean0 = float(np.mean(differences))
p0 = (amplitude0, mean0, sigma0)
lower = (0.0, -np.inf, np.finfo(float).eps)
upper = (np.inf, np.inf, np.inf)
try:
popt, pcov = curve_fit(
_gaussian_1d,
centers,
hist.astype(np.float64),
p0=p0,
bounds=(lower, upper),
maxfev=10000,
)
except (RuntimeError, ValueError) as exc:
raise ValueError("Gaussian fit failed.") from exc
fit_amplitude = float(popt[0])
fit_mean = float(popt[1])
fit_sigma = float(popt[2])
precision = fit_sigma / np.sqrt(2.0)
info = {
"differences": differences,
"hist": hist,
"bin_edges": edges,
"bin_centers": centers,
"fit_params": popt,
"fit_covariance": pcov,
"fit_amplitude": fit_amplitude,
"fit_mean": fit_mean,
"fit_sigma": fit_sigma,
"precision": float(precision),
"n_input_localizations": n_input,
"n_localizations": n,
"n_pairs": len(differences),
"n_blinks": len(unique_blinks),
"locs_per_blink": counts,
"blink_ids": unique_blinks,
}
return float(precision), info
def _gaussian_1d(x, amplitude, mean, sigma):
"""Evaluate a one-dimensional Gaussian profile."""
return amplitude * np.exp(-0.5 * ((x - mean) / sigma) ** 2)
@nb.njit(cache=True, fastmath=True, parallel=True)
def _nena_differences(values, starts, counts, out):
"""Compute (col_i-col_{i+1}) within each blink; last sample stays NaN."""
for blink_index in nb.prange(len(starts)):
start = starts[blink_index]
count = counts[blink_index]
for point_index in range(count - 1):
idx = start + point_index
out[idx] = values[idx] - values[idx + 1]