Source code for smlmlp.modules.analysis_LP._functions.associate.associate_consecutive_frames

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author        : Lancelot PINCET
# GitHub        : https://github.com/LancelotPincet



import os

from joblib import Parallel, delayed
import numpy as np
from scipy.optimize import linear_sum_assignment
from scipy.spatial import cKDTree
from scipy.sparse import coo_matrix
from scipy.sparse.csgraph import connected_components
from smlmlp import analysis



[docs] @analysis(df_name="points") def associate_consecutive_frames( xx, yy, fr, zz=None, *, association_radius_nm=30.0, z_association_radius_nm=100.0, cuda=False, parallel=False, ): """ Associate localizations across consecutive frames. Parameters ---------- xx, yy : array-like Localization coordinates. fr : array-like Frame identifiers. zz : array-like or None, optional Optional z coordinates used for independent z validation. association_radius_nm : float or array-like, optional Maximum xy link radius. z_association_radius_nm : float or array-like, optional Maximum z link radius when ``zz`` is provided. cuda, parallel : bool or int, optional Execution options accepted by all analysis functions. Returns ------- tracks : ndarray One-based track identifiers per localization. info : dict Link and track diagnostics. """ if parallel is False or parallel == 1: n_jobs = 1 elif parallel is True or parallel == -1: n_jobs = max(1, (os.cpu_count() or 2) - 1) elif isinstance(parallel, int) and parallel > 1: n_jobs = parallel else: raise ValueError("Invalid value for 'parallel'") x = np.asarray(xx, dtype=np.float32) y = np.asarray(yy, dtype=np.float32) fr = np.asarray(fr, dtype=np.uint32) n = len(fr) if n == 0: info = { "n_localizations": 0, "n_links": 0, "n_tracks": 0, "mean_track_length": np.nan, "link_ratio": np.nan, "links_per_frame": np.empty(0, dtype=np.int64), "locs_per_frame": np.empty(0, dtype=np.int64), } return np.empty(0, dtype=np.uint64), info if np.ndim(association_radius_nm) == 0: association_radius_nm = np.full(n, association_radius_nm, dtype=np.float32) else: association_radius_nm = np.asarray(association_radius_nm, dtype=np.float32) use_z = zz is not None if use_z: z = np.asarray(zz, dtype=np.float32) if np.ndim(z_association_radius_nm) == 0: z_association_radius_nm = np.full(n, z_association_radius_nm, dtype=np.float32) else: z_association_radius_nm = np.asarray(z_association_radius_nm, dtype=np.float32) else: z = None z_association_radius_nm = None order = np.argsort(fr, kind="stable") xs = x[order] ys = y[order] fs = fr[order] rs = association_radius_nm[order] if use_z: zs = z[order] zrs = z_association_radius_nm[order] else: zs = None zrs = None unique, start, counts = np.unique( fs, return_index=True, return_counts=True, ) def _associate_one(k): """Associate one pair of consecutive frames.""" if unique[k + 1] != unique[k] + 1: return np.empty(0, dtype=np.uint32), np.empty(0, dtype=np.uint32) a0 = start[k] a1 = a0 + counts[k] b0 = start[k + 1] b1 = b0 + counts[k + 1] pts_a = np.column_stack((xs[a0:a1], ys[a0:a1])) pts_b = np.column_stack((xs[b0:b1], ys[b0:b1])) r_a = rs[a0:a1] r_b = rs[b0:b1] n_a = len(pts_a) n_b = len(pts_b) if n_a == 0 or n_b == 0: return np.empty(0, dtype=np.uint64), np.empty(0, dtype=np.uint64) max_r = float(max(r_a.max(), r_b.max())) tree_a = cKDTree(pts_a) tree_b = cKDTree(pts_b) sparse = tree_a.sparse_distance_matrix( tree_b, max_distance=max_r, output_type="coo_matrix", ) if sparse.nnz == 0: return np.empty(0, dtype=np.uint64), np.empty(0, dtype=np.uint64) row = sparse.row.astype(np.uint64) col = sparse.col.astype(np.uint64) dist = sparse.data.astype(np.float64) keep = dist <= np.minimum(r_a[row], r_b[col]) if use_z: z_a = zs[a0:a1] z_b = zs[b0:b1] zr_a = zrs[a0:a1] zr_b = zrs[b0:b1] dz = np.abs(z_a[row] - z_b[col]) keep &= dz <= np.minimum(zr_a[row], zr_b[col]) row = row[keep] col = col[keep] dist = dist[keep] if len(dist) == 0: return np.empty(0, dtype=np.uint64), np.empty(0, dtype=np.uint64) graph_row = np.concatenate((row, n_a + col)) graph_col = np.concatenate((n_a + col, row)) graph = coo_matrix( (np.ones(len(graph_row)), (graph_row, graph_col)), shape=(n_a + n_b, n_a + n_b), ) _, labels = connected_components(graph, directed=False) used_nodes = np.unique(graph_row) component_ids = np.unique(labels[used_nodes]) accepted_a = [] accepted_b = [] for cid in component_ids: mask = labels[row] == cid comp_row = row[mask] comp_col = col[mask] comp_dist = dist[mask] if len(comp_dist) == 1: accepted_a.append(comp_row[0]) accepted_b.append(comp_col[0]) continue comp_a = np.unique(comp_row) comp_b = np.unique(comp_col) if len(comp_a) == 1 or len(comp_b) == 1: best = np.argmin(comp_dist) accepted_a.append(comp_row[best]) accepted_b.append(comp_col[best]) continue deg_a = np.bincount( np.searchsorted(comp_a, comp_row), minlength=len(comp_a), ) deg_b = np.bincount( np.searchsorted(comp_b, comp_col), minlength=len(comp_b), ) if deg_a.max() == 1 and deg_b.max() == 1: accepted_a.extend(comp_row.tolist()) accepted_b.extend(comp_col.tolist()) continue local_row = np.searchsorted(comp_a, comp_row) local_col = np.searchsorted(comp_b, comp_col) cost = np.full((len(comp_a), len(comp_b)), 1e20) cost[local_row, local_col] = comp_dist ia, ib = linear_sum_assignment(cost) valid = cost[ia, ib] < 1e20 for i, j in zip(ia[valid], ib[valid]): accepted_a.append(comp_a[i]) accepted_b.append(comp_b[j]) return a0 + np.array(accepted_a), b0 + np.array(accepted_b) if n_jobs == 1: links = [_associate_one(k) for k in range(len(unique) - 1)] else: links = Parallel(n_jobs=n_jobs, prefer="threads")( delayed(_associate_one)(k) for k in range(len(unique) - 1) ) src = [] dst = [] for s, d in links: if len(s) > 0: src.append(s) dst.append(d) if len(src) == 0: out = np.arange(n) out_original = np.empty_like(out) out_original[order] = out info = { "n_localizations": n, "n_links": 0, "n_tracks": n, "mean_track_length": 1.0, "link_ratio": 0.0, "links_per_frame": np.array([0 for _ in links]), "locs_per_frame": counts, } return out_original.astype(np.uint64) + 1, info src = np.concatenate(src) dst = np.concatenate(dst) graph = coo_matrix( (np.ones(len(src) * 2), (np.concatenate([src, dst]), np.concatenate([dst, src]))), shape=(n, n), ) _, labels = connected_components(graph, directed=False) out = np.empty_like(labels) out[order] = labels _, out = np.unique(out, return_inverse=True) info = { "n_localizations": n, "n_links": len(src), "n_tracks": np.max(out) + 1, "mean_track_length": n / (np.max(out) + 1), "link_ratio": len(src) / n, "links_per_frame": np.array([len(s) for s, _ in links]), "locs_per_frame": counts, } return out.astype(np.uint64) + 1, info