Source code for smlmlp.modules.block_LP._functions.integrate.integrate_crop

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author        : Lancelot PINCET
# GitHub        : https://github.com/LancelotPincet

import numpy as np
from arrlp import coordinates, get_xp
from funclp import Gaussian2D, IsoGaussian, Spline3D

from smlmlp import block
from smlmlp import crop_remove_bkgd

SIGMA = 0.21 * 670 / 1.5
FIT_MODELS = ("isogauss", "gauss", "spline")


[docs] @block() def integrate_crop( crops, X0=None, Y0=None, /, ch=None, *, x_fit=None, y_fit=None, z_fit=None, os_fit=None, sigma_fit=None, sigma_x_fit=None, sigma_y_fit=None, sigma_angle=None, channels_weighted_integrations=True, channels_fit_models="isogauss", channels_pixels_nm=1.0, channels_gains=1.0, channels_QE=1.0, channels_psf_sigmas_nm=SIGMA, channels_psf_xsigmas_nm=SIGMA, channels_psf_ysigmas_nm=SIGMA, channels_psf_thetas_deg=0.0, channels_psf_3d_xtangents=None, channels_psf_3d_ytangents=None, channels_psf_3d_ztangents=None, channels_psf_3d_spline_coeffs=None, cuda=False, parallel=False, ): """ Integrate photon counts from per-channel crop stacks. Parameters ---------- crops : sequence of array-like Crop stacks, one stack per channel, shaped ``(N, Y, X)``. The order inside each channel must match the detections where ``ch == channel``. X0, Y0 : array-like or sequence of array-like, optional Crop x/y origins in pixels. Detection-aligned arrays are split with ``ch``. Per-channel arrays are also accepted. These origins are required for weighted channels because fitted coordinates are stored globally. ch : array-like or None, optional One-based channel index for each detection. When provided, integrated values are remapped to this detection order. x_fit, y_fit : array-like, optional Fitted x/y coordinates in nanometers. Required for weighted channels. z_fit : array-like, optional Fitted z coordinates for spline channels. Missing values default to zero. os_fit : array-like, optional Fitted offset in photons per pixel. Required for weighted channels and converted back to raw crop units before subtraction. sigma_fit : array-like, optional Fitted isotropic sigma in nanometers for ``"isogauss"`` channels. sigma_x_fit, sigma_y_fit : array-like, optional Fitted x/y sigmas in nanometers for ``"gauss"`` channels. sigma_angle : array-like, optional Per-detection Gaussian angle. If omitted, ``channels_psf_thetas_deg`` is used for each channel. channels_weighted_integrations : bool or sequence of bool, optional Per-channel switch selecting weighted PSF projection when true and border-background summation when false. channels_fit_models : str or sequence of str, optional Per-channel fit model used to reconstruct weighted PSFs. Supported values are ``"isogauss"``, ``"gauss"``, and ``"spline"``. channels_pixels_nm : float, tuple, or sequence, optional Pixel sizes in ``(y, x)`` nanometers. A scalar or one tuple is broadcast to all channels. channels_gains : float or sequence, optional Gain values used to convert raw crop counts to photons. channels_QE : float or sequence, optional Quantum efficiencies used to convert raw crop counts to photons. channels_psf_sigmas_nm, channels_psf_xsigmas_nm, channels_psf_ysigmas_nm : float or sequence, optional Channel PSF defaults used when fitted sigma columns are not supplied. channels_psf_thetas_deg : float or sequence, optional Channel Gaussian angle defaults. channels_psf_3d_xtangents, channels_psf_3d_ytangents, channels_psf_3d_ztangents : sequence, optional Spline tangents used by weighted ``"spline"`` channels. channels_psf_3d_spline_coeffs : sequence, optional Spline coefficients used by weighted ``"spline"`` channels. cuda : bool, optional Whether to run array operations on CUDA-compatible arrays. parallel : bool, optional Whether to parallelize the background-removal step for unweighted channels. Returns ------- tuple A tuple ``(intensity, info)`` where ``intensity`` is a one-dimensional detection-aligned photon count array. ``info`` contains normalized channel settings, intermediate weighted denominators, and the background vectors applied to each crop. Raises ------ ValueError If channel lengths are inconsistent, channel indices are invalid, or a weighted channel is missing required fitted coordinates or offsets. SyntaxError If spline metadata is required but missing. Notes ----- Unweighted channels subtract the border-median background with :func:`crop_remove_bkgd`, sum all corrected pixels, and convert raw counts as ``raw * gain / QE``. Weighted channels reconstruct the fitted PSF shape with the matching :mod:`funclp` function and zero offset. The fitted offset is subtracted from the raw crop after conversion back to raw units. The normalized PSF ``p = psf_model / psf_model.sum()`` is then used as a matched filter: ``N = sum(p * crop_no_bkgd) / sum(p**2)``. The resulting raw count is converted with the same ``gain / QE`` factor. Examples -------- >>> import numpy as np >>> crops = [np.ones((2, 3, 3), dtype=np.float32)] >>> crops[0][:, 1, 1] = [11.0, 21.0] >>> intensity, info = integrate_crop(crops, channels_weighted_integrations=False) >>> intensity.shape (2,) >>> ch = np.array([2, 1], dtype=np.uint8) >>> crops = [np.ones((1, 3, 3), dtype=np.float32), np.ones((1, 3, 3), dtype=np.float32)] >>> crops[1][0, 1, 1] = 6.0 >>> intensity, info = integrate_crop( ... crops, ... np.array([0, 0]), ... np.array([0, 0]), ... ch=ch, ... x_fit=np.array([np.nan, 1.0], dtype=np.float32), ... y_fit=np.array([np.nan, 1.0], dtype=np.float32), ... os_fit=np.array([0.0, 1.0], dtype=np.float32), ... channels_weighted_integrations=[True, False], ... ) >>> intensity.ndim 1 """ # Normalize and validate inputs n_channels = len(crops) cuda = bool(cuda) xp = get_xp(cuda) positions = _channel_positions(crops, ch) channels_weighted_integrations = [ bool(value) for value in _normalize_channels_parameter( channels_weighted_integrations, n_channels, "channels_weighted_integrations") ] channels_fit_models = _normalize_channels_fit_models(channels_fit_models, n_channels) channels_pixels_nm = _normalize_channels_pixels_nm(channels_pixels_nm, n_channels) channels_gains = _normalize_channels_parameter(channels_gains, n_channels, "channels_gains") channels_QE = _normalize_channels_parameter(channels_QE, n_channels, "channels_QE") channels_psf_sigmas_nm = _normalize_channels_parameter(channels_psf_sigmas_nm, n_channels, "channels_psf_sigmas_nm") channels_psf_xsigmas_nm = _normalize_channels_parameter(channels_psf_xsigmas_nm, n_channels, "channels_psf_xsigmas_nm") channels_psf_ysigmas_nm = _normalize_channels_parameter(channels_psf_ysigmas_nm, n_channels, "channels_psf_ysigmas_nm") channels_psf_thetas_deg = _normalize_channels_parameter(channels_psf_thetas_deg, n_channels, "channels_psf_thetas_deg") needs_spline = any(weighted and model == "spline" for weighted, model in zip(channels_weighted_integrations, channels_fit_models)) channels_psf_3d_xtangents = _normalize_spline_parameter( channels_psf_3d_xtangents, n_channels, "channels_psf_3d_xtangents", required=needs_spline) channels_psf_3d_ytangents = _normalize_spline_parameter( channels_psf_3d_ytangents, n_channels, "channels_psf_3d_ytangents", required=needs_spline) channels_psf_3d_ztangents = _normalize_spline_parameter( channels_psf_3d_ztangents, n_channels, "channels_psf_3d_ztangents", required=needs_spline) channels_psf_3d_spline_coeffs = _normalize_spline_parameter( channels_psf_3d_spline_coeffs, n_channels, "channels_psf_3d_spline_coeffs", required=needs_spline) any_weighted = any(channels_weighted_integrations) any_unweighted = any(not w for w in channels_weighted_integrations) # Split fitted parameters per channel if any_weighted: X0_channels, Y0_channels = _split_origins(crops, X0, Y0, positions, cuda=cuda) x_fit_channels = _split_detection_values(x_fit, crops, positions, cuda=cuda, name="x_fit", required=True) y_fit_channels = _split_detection_values(y_fit, crops, positions, cuda=cuda, name="y_fit", required=True) z_fit_channels = _split_detection_values(z_fit, crops, positions, cuda=cuda) os_fit_channels = _split_detection_values(os_fit, crops, positions, cuda=cuda, name="os_fit", required=True) sigma_fit_channels = _split_detection_values(sigma_fit, crops, positions, cuda=cuda) sigma_x_fit_channels = _split_detection_values(sigma_x_fit, crops, positions, cuda=cuda) sigma_y_fit_channels = _split_detection_values(sigma_y_fit, crops, positions, cuda=cuda) sigma_angle_channels = _split_detection_values(sigma_angle, crops, positions, cuda=cuda) else: X0_channels = Y0_channels = [None for _ in range(n_channels)] x_fit_channels = y_fit_channels = z_fit_channels = [None for _ in range(n_channels)] os_fit_channels = sigma_fit_channels = [None for _ in range(n_channels)] sigma_x_fit_channels = sigma_y_fit_channels = [None for _ in range(n_channels)] sigma_angle_channels = [None for _ in range(n_channels)] # Remove background for unweighted channels bkgd_crops, bkgd_info = (None, {}) if any_unweighted: bkgd_crops, bkgd_info = crop_remove_bkgd(crops, cuda=cuda, parallel=parallel) # Integrate each channel intensities, psf_sums, weighted_denominators, background_vectors = [], [], [], [] for channel_index, values in enumerate(zip( crops, channels_weighted_integrations, channels_fit_models, channels_pixels_nm, channels_gains, channels_QE, channels_psf_sigmas_nm, channels_psf_xsigmas_nm, channels_psf_ysigmas_nm, channels_psf_thetas_deg, channels_psf_3d_xtangents, channels_psf_3d_ytangents, channels_psf_3d_ztangents, channels_psf_3d_spline_coeffs, X0_channels, Y0_channels, x_fit_channels, y_fit_channels, z_fit_channels, os_fit_channels, sigma_fit_channels, sigma_x_fit_channels, sigma_y_fit_channels, sigma_angle_channels, )): (crop, weighted, model, pixel, gain, qe, psf_sigma, psf_xsigma, psf_ysigma, psf_theta, tx, ty, tz, coeffs, x0, y0, x_fit_ch, y_fit_ch, z_fit_ch, os_fit_ch, sigma_fit_ch, sigma_x_fit_ch, sigma_y_fit_ch, sigma_angle_ch) = values crop = xp.asarray(crop) if len(crop) == 0: intensity = xp.empty(0, dtype=xp.float32) psf_sums.append(np.empty(0, dtype=np.float32)) weighted_denominators.append(np.empty(0, dtype=np.float32)) background_vectors.append(np.empty(0, dtype=np.float32)) elif weighted: intensity, psf_sum, denominator = _integrate_weighted_channel( crop, x0, y0, x_fit_ch, y_fit_ch, z_fit_ch, os_fit_ch, sigma_fit_ch, sigma_x_fit_ch, sigma_y_fit_ch, sigma_angle_ch, model=model, pixel=pixel, gain=gain, qe=qe, psf_sigma=psf_sigma, psf_xsigma=psf_xsigma, psf_ysigma=psf_ysigma, psf_theta=psf_theta, tx=tx, ty=ty, tz=tz, coeffs=coeffs, cuda=cuda, ) psf_sums.append(_asnumpy(psf_sum)) weighted_denominators.append(_asnumpy(denominator)) background_vectors.append( _asnumpy(xp.asarray(os_fit_ch, dtype=xp.float32) * qe / gain).astype(np.float32, copy=False) ) else: crop_no_bkgd = xp.asarray(bkgd_crops[channel_index]) intensity = xp.sum(crop_no_bkgd, axis=(1, 2)) * gain / qe psf_sums.append(np.full(len(crop), np.nan, dtype=np.float32)) weighted_denominators.append(np.full(len(crop), np.nan, dtype=np.float32)) background_vectors.append( _asnumpy(bkgd_info["border_medians"][channel_index]).astype(np.float32, copy=False) ) intensities.append(_asnumpy(intensity).astype(np.float32, copy=False)) info = { "channels_weighted_integrations": channels_weighted_integrations, "channels_fit_models": channels_fit_models, "channels_pixels_nm": channels_pixels_nm, "psf_sums": psf_sums, "weighted_denominators": weighted_denominators, "background_vector": stack_channel_values(background_vectors, positions), } info.update(bkgd_info) return stack_channel_values(intensities, positions), info
def _integrate_weighted_channel( crop, x0, y0, x_fit, y_fit, z_fit, os_fit, sigma_fit, sigma_x_fit, sigma_y_fit, sigma_angle, /, *, model, pixel, gain, qe, psf_sigma, psf_xsigma, psf_ysigma, psf_theta, tx, ty, tz, coeffs, cuda, ): """Integrate one channel by projecting crops on fitted PSF shapes.""" xp = get_xp(cuda) _, height, width = crop.shape # Cast to float32 arrays x0, y0 = xp.asarray(x0, dtype=xp.float32), xp.asarray(y0, dtype=xp.float32) x_fit, y_fit = xp.asarray(x_fit, dtype=xp.float32), xp.asarray(y_fit, dtype=xp.float32) os_fit = xp.asarray(os_fit, dtype=xp.float32) # Build PSF grid relative to crop origins yy, xx = coordinates(shape=(height, width), center=False, pixel=pixel, cuda=cuda) mux = x_fit - x0 * pixel[1] muy = y_fit - y0 * pixel[0] amp, offset = xp.ones_like(mux, dtype=xp.float32), xp.zeros_like(mux, dtype=xp.float32) # Reconstruct PSF model function = _make_model( model=model, mux=mux, muy=muy, amp=amp, offset=offset, z_fit=z_fit, sigma_fit=sigma_fit, sigma_x_fit=sigma_x_fit, sigma_y_fit=sigma_y_fit, sigma_angle=sigma_angle, pixel=pixel, psf_sigma=psf_sigma, psf_xsigma=psf_xsigma, psf_ysigma=psf_ysigma, psf_theta=psf_theta, tx=tx, ty=ty, tz=tz, coeffs=coeffs, cuda=cuda, ) psf_model = function(xx, yy, xp.zeros_like(xx)) if model == "spline" else function(xx, yy) # Matched-filter integration with fitted PSF os_fit_raw = os_fit * qe / gain crop_no_bkgd = crop - os_fit_raw[:, None, None] psf_sum = xp.sum(psf_model, axis=(1, 2)) psf = psf_model / psf_sum[:, None, None] denominator = xp.sum(psf * psf, axis=(1, 2)) raw_intensity = xp.sum(psf * crop_no_bkgd, axis=(1, 2)) / denominator valid = xp.isfinite(psf_sum) & xp.isfinite(denominator) & (psf_sum != 0) & (denominator != 0) raw_intensity = xp.where(valid, raw_intensity, xp.nan) return raw_intensity * gain / qe, psf_sum, denominator def _make_model( *, model, mux, muy, amp, offset, z_fit, sigma_fit, sigma_x_fit, sigma_y_fit, sigma_angle, pixel, psf_sigma, psf_xsigma, psf_ysigma, psf_theta, tx, ty, tz, coeffs, cuda, ): """Instantiate the fitted PSF model for one weighted channel.""" xp = get_xp(cuda) if model == "isogauss": sigma = _parameter_or_default(sigma_fit, mux, psf_sigma, cuda=cuda) return IsoGaussian(mux=mux, muy=muy, amp=amp, offset=offset, sig=sigma, pixx=pixel[1], pixy=pixel[0], cuda=cuda) if model == "gauss": sigx = _parameter_or_default(sigma_x_fit, mux, psf_xsigma, cuda=cuda) sigy = _parameter_or_default(sigma_y_fit, mux, psf_ysigma, cuda=cuda) if sigma_x_fit is None and sigma_fit is not None: sigx = xp.asarray(sigma_fit, dtype=xp.float32) if sigma_y_fit is None and sigma_fit is not None: sigy = xp.asarray(sigma_fit, dtype=xp.float32) theta = _parameter_or_default(sigma_angle, mux, psf_theta, cuda=cuda) return Gaussian2D(mux=mux, muy=muy, amp=amp, offset=offset, sigx=sigx, sigy=sigy, theta=theta, pixx=pixel[1], pixy=pixel[0], cuda=cuda) if model == "spline": muz = xp.zeros_like(mux) if z_fit is None else xp.asarray(z_fit, dtype=xp.float32) return Spline3D(mux=mux, muy=muy, muz=muz, amp=amp, offset=offset, tx=tx, ty=ty, tz=tz, coeffs=coeffs, cuda=cuda) raise ValueError(f"Unknown model: {model}") def _channel_positions(crops, ch): """Return per-channel detection positions from one-based channel labels.""" n_channels = len(crops) crop_lengths = [len(crop) for crop in crops] total = sum(crop_lengths) if ch is None: return None ch_np = _asnumpy(ch) if len(ch_np) != total: raise ValueError("ch must match the total number of crops") if total and (ch_np.min() < 1 or ch_np.max() > n_channels): raise ValueError("Channel indices must be one-based and within crops.") positions = [] for channel_index, crop_length in enumerate(crop_lengths, start=1): pos = np.flatnonzero(ch_np == channel_index) if len(pos) != crop_length: raise ValueError("ch channel counts must match crop stack lengths") positions.append(pos) return positions def _split_origins(crops, X0, Y0, positions, *, cuda=False): """Split crop origins into one x/y pair per channel.""" if X0 is None or Y0 is None: raise ValueError("X0 and Y0 are required for weighted crop integration") n_channels = len(crops) crop_lengths = [len(crop) for crop in crops] if isinstance(X0, (list, tuple)) or isinstance(Y0, (list, tuple)): if not isinstance(X0, (list, tuple)) or not isinstance(Y0, (list, tuple)): raise ValueError("X0 and Y0 must use the same format") if len(X0) != n_channels or len(Y0) != n_channels: raise ValueError("X0 and Y0 must have same length as crops") _validate_channel_lengths(crop_lengths, X0, Y0) xp = get_xp(cuda) return [xp.asarray(x0) for x0 in X0], [xp.asarray(y0) for y0 in Y0] X0_channels = _split_detection_values(X0, crops, positions, cuda=cuda, name="X0") Y0_channels = _split_detection_values(Y0, crops, positions, cuda=cuda, name="Y0") return X0_channels, Y0_channels def _split_detection_values(values, crops, positions, *, cuda=False, name="value", required=False): """Split a detection-aligned array or per-channel sequence by channel.""" n_channels = len(crops) crop_lengths = [len(crop) for crop in crops] if values is None: if required: raise ValueError(f"{name} is required for weighted crop integration") return [None for _ in range(n_channels)] xp = get_xp(cuda) if isinstance(values, (list, tuple)) and len(values) == n_channels: for crop_length, value in zip(crop_lengths, values): if len(value) != crop_length: raise ValueError(f"{name} channel counts must match crop stack lengths") return [xp.asarray(value) for value in values] total = sum(crop_lengths) values = xp.asarray(values) if len(values) != total: raise ValueError(f"{name} must match the total number of crops") if positions is not None: return [values[xp.asarray(pos)] for pos in positions] channel_values = [] start = 0 for crop_length in crop_lengths: stop = start + crop_length channel_values.append(values[start:stop]) start = stop return channel_values def _normalize_channels_fit_models(values, n_channels): """Normalize and validate per-channel fit model names.""" if isinstance(values, str): values = [values for _ in range(n_channels)] elif len(values) != n_channels: raise ValueError("channels_fit_models must have same length as crops") models = [str(value).lower() for value in values] invalid = [model for model in models if model not in FIT_MODELS] if invalid: raise ValueError( f"channels_fit_models contains unsupported values {invalid}; " f"expected one of {FIT_MODELS}" ) return models def _normalize_channels_pixels_nm(channels_pixels_nm, n_channels): """Normalize pixel sizes to one ``(py, px)`` tuple per channel.""" try: if len(channels_pixels_nm) != n_channels: if len(channels_pixels_nm) == 2: channels_pixels_nm = [channels_pixels_nm for _ in range(n_channels)] else: raise ValueError("channels_pixels_nm must have same length as crops") except TypeError: channels_pixels_nm = [(channels_pixels_nm, channels_pixels_nm) for _ in range(n_channels)] return channels_pixels_nm def _normalize_channels_parameter(values, n_channels, name): """Normalize scalar/per-channel values to a per-channel sequence.""" if isinstance(values, str): return [values for _ in range(n_channels)] try: if len(values) != n_channels: raise ValueError(f"{name} must have same length as crops") except TypeError: values = [values for _ in range(n_channels)] return values def _normalize_spline_parameter(values, n_channels, name, required): """Normalize spline metadata to one object per channel.""" if values is None: if required: raise SyntaxError(f"{name} must be specified for weighted spline channels") return [None for _ in range(n_channels)] if isinstance(values, np.ndarray): if n_channels != 1: raise ValueError(f"{name} must have same length as crops") return [values] if len(values) != n_channels: raise ValueError(f"{name} must have same length as crops") return values def _parameter_or_default(values, reference, default, *, cuda=False): """Return fitted parameter values or a per-event default array.""" xp = get_xp(cuda) if values is None: return xp.full_like(reference, fill_value=default, dtype=xp.float32) return xp.asarray(values, dtype=xp.float32) def _validate_channel_lengths(crop_lengths, X0, Y0): """Validate per-channel origin array lengths.""" for crop_length, x0, y0 in zip(crop_lengths, X0, Y0): if len(x0) != crop_length or len(y0) != crop_length: raise ValueError("X0/Y0 channel counts must match crop stack lengths") def _asnumpy(array): """Return a NumPy view/copy for CPU-side remapping.""" if hasattr(array, "get"): return array.get() return np.asarray(array) def stack_channel_values(values, positions=None, *, fill_value=np.nan): """Stack channel values or remap them to detection order.""" if positions is None: return np.hstack(values) total = sum(len(pos) for pos in positions) dtype = np.result_type(*[np.asarray(value).dtype for value in values], np.float32) output = np.full(total, fill_value, dtype=dtype) for value, pos in zip(values, positions): output[pos] = value return output if __name__ == "__main__": from corelp import test test(__file__)