Source code for smlmlp.modules.analysis_LP._functions.aggregate.aggregate_ratio

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Author        : Lancelot PINCET
# GitHub        : https://github.com/LancelotPincet


import numpy as np
import numba as nb
from arrlp import nb_threads
from smlmlp import analysis


[docs] @analysis(df_name="points") def aggregate_ratio( col, /, pnt, ch, *, x_channels=None, y_channels=None, cuda=False, parallel=False, ): """ Calculate channel ratios for each point. Parameters ---------- col : array-like Channel values to aggregate. pnt : array-like Point identifiers. ch : array-like Channel identifiers. x_channels, y_channels : array-like or None, optional Channels summed into the returned x and y ratios. cuda, parallel : bool, optional Execution options accepted by all analysis functions. Returns ------- ratio_x, ratio_y : ndarray Summed values per point, with NaN for incomplete channel sets. info : dict Empty diagnostics dictionary. """ col = np.asarray(col, dtype=np.float32) pnt = np.asarray(pnt, dtype=np.uint64) ch = np.asarray(ch, dtype=np.uint32) if x_channels is None or y_channels is None : x_channels = np.arange(1, ch.max()-1, dtype=np.uint32) y_channels = [ch.max()] x_channels = np.asarray(x_channels, dtype=np.uint32) y_channels = np.asarray(y_channels, dtype=np.uint32) order = np.lexsort((ch, pnt)) pnt_s = pnt[order] ch_s = ch[order] col_s = col[order] _, start, counts = np.unique( pnt_s, return_index=True, return_counts=True, ) with nb_threads(parallel): ratio_x, ratio_y = _aggregate_ratio( ch_s, col_s, start.astype(np.int64), counts.astype(np.int64), x_channels, y_channels, ) info = {} return ratio_x, ratio_y, info
@nb.njit(cache=True, parallel=True) def _aggregate_ratio( ch, col, start, counts, x_channels, y_channels, ): """Aggregate channel sums per sorted point.""" n_points = len(start) ratio_x = np.empty(n_points, dtype=np.float32) ratio_y = np.empty(n_points, dtype=np.float32) ratio_x[:] = np.nan ratio_y[:] = np.nan for i in nb.prange(n_points): s = start[i] c = counts[i] e = s + c x_sum = 0.0 y_sum = 0.0 x_found = np.zeros(len(x_channels), dtype=np.uint8) y_found = np.zeros(len(y_channels), dtype=np.uint8) for j in range(s, e): channel = ch[j] value = col[j] for k in range(len(x_channels)): if channel == x_channels[k]: x_found[k] += 1 x_sum += value if x_found[k] > 1: raise ValueError( "A requested ratio_x channel appears several times for one point." ) for k in range(len(y_channels)): if channel == y_channels[k]: y_found[k] += 1 y_sum += value if y_found[k] > 1: raise ValueError( "A requested ratio_y channel appears several times for one point." ) x_ok = True for k in range(len(x_channels)): if x_found[k] != 1: x_ok = False y_ok = True for k in range(len(y_channels)): if y_found[k] != 1: y_ok = False if x_ok: ratio_x[i] = x_sum if y_ok: ratio_y[i] = y_sum return ratio_x, ratio_y