"""A set of util functions to detect static regions in a IMU signal given certain constrains."""
from typing import Literal, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
from biopsykit.utils._types import arr_t
from biopsykit.utils.array_handling import (
_bool_fill,
bool_array_to_start_end_array,
sanitize_input_nd,
sanitize_sliding_window_input,
sliding_window_view,
)
from numpy.linalg import norm
# supported metric functions
_METRIC_FUNCTIONS = {
"maximum": np.nanmax,
"variance": np.nanvar,
"mean": np.nanmean,
"median": np.nanmedian,
}
METRIC_FUNCTION_NAMES = Literal["maximum", "variance", "mean", "median"]
def _find_static_samples(
data: np.ndarray,
window_length: int,
inactive_signal_th: float,
metric: METRIC_FUNCTION_NAMES = "mean",
overlap: int = None,
) -> np.ndarray:
"""Search for static samples within given input signal, based on windowed L2-norm thresholding.
.. warning::
Due to edge cases at the end of the input data where window size and overlap might not fit your data, the last
window might be discarded for analysis and will therefore always be considered as non-static!
Parameters
----------
data : array with shape (n, 3) or (n, 2)
3D or 2D signal on which static moment detection should be performed (e.g. 3D-acc or 3D-gyr data)
window_length : int
Length of desired window in units of samples
inactive_signal_th : float
Threshold to decide whether a window should be considered as active or inactive. Window will be tested on
<= threshold
metric : str, optional
Metric which will be calculated per window, one of the following strings:
* 'mean' (default)
Calculates mean value per window
* 'maximum'
Calculates maximum value per window
* 'median'
Calculates median value per window
* 'variance'
Calculates variance value per window
overlap : int, optional
Length of desired overlap in units of samples. If ``None`` (default) overlap will be window_length - 1
Returns
-------
Boolean array with length n to indicate static (=``True``) or non-static (=``False``) for each sample
Examples
--------
>>> _find_static_samples(data, window_length=128, overlap=64, inactive_signal_th = 5, metric = 'mean')
See Also
--------
:func:`~biopsykit.utils.array_handling.sliding_window_view`
Details on the used windowing function for this method.
"""
# test for correct input data shape
if np.shape(data)[-1] != 3 and np.shape(data)[-1] != 2:
raise ValueError("Invalid signal dimensions, signal must be of shape (n,3) or (n, 2).")
if metric not in _METRIC_FUNCTIONS:
raise ValueError(f"Invalid metric passed! {metric} as metric is not supported.")
# add default overlap value
if overlap is None:
overlap = window_length - 1
# allocate output array
inactive_signal_bool_array = np.zeros(len(data))
# calculate norm of input signal (do this outside of loop to boost performance at cost of memory!)
signal_norm = norm(data, axis=1)
mfunc = _METRIC_FUNCTIONS[metric]
# Create windowed view of norm
windowed_norm = sliding_window_view(signal_norm, window_length, overlap, nan_padding=False)
is_static = np.broadcast_to(mfunc(windowed_norm, axis=1) <= inactive_signal_th, windowed_norm.shape[::-1]).T
# create the list of indices for sliding windows with overlap
windowed_indices = sliding_window_view(np.arange(0, len(data)), window_length, overlap, nan_padding=False)
# iterate over sliding windows
inactive_signal_bool_array = _bool_fill(windowed_indices, is_static, inactive_signal_bool_array)
return inactive_signal_bool_array.astype(bool)
def _find_static_sequences(
data: np.ndarray,
window_length: int,
inactive_signal_th: float,
metric: METRIC_FUNCTION_NAMES = "variance",
overlap: int = None,
) -> np.ndarray:
"""Search for static sequences within given input signal, based on windowed L2-norm thresholding.
.. warning::
Due to edge cases at the end of the input data where window size and overlap might not fit your data, the last
window might be discarded for analysis and will therefore always be considered as non-static!
Parameters
----------
data : array with shape (n, 3) or (n, 2)
3D or 2D signal on which static moment detection should be performed (e.g. 3D-acc or 3D-gyr data)
window_length : int
Length of desired window in units of samples
inactive_signal_th : float
Threshold to decide whether a window should be considered as active or inactive. Window will be tested on
<= threshold
metric : str, optional
Metric which will be calculated per window, one of the following strings:
* 'variance' (default): Calculates variance value per window
* 'mean': Calculates mean value per window
* 'maximum': Calculates maximum value per window
* 'median': Calculates median value per window
overlap : int, optional
Length of desired overlap in units of samples. If None (default) overlap will be window_length - 1
Returns
-------
Array of [start, end] labels indication static regions within the input signal
Examples
--------
>>> _find_static_sequences(data, window_length=128, overlap=64, inactive_signal_th = 5, metric = 'mean')
See Also
--------
:func:`~biopsykit.signals.utils.array_handling.sliding_window`
Details on the used windowing function for this method.
"""
static_moment_bool_array = _find_static_samples(
data=data, window_length=window_length, inactive_signal_th=inactive_signal_th, metric=metric, overlap=overlap
)
return bool_array_to_start_end_array(static_moment_bool_array)
[docs]def find_static_moments(
data: arr_t,
threshold: float,
window_samples: Optional[int] = None,
window_sec: Optional[int] = None,
sampling_rate: Optional[Union[int, float]] = 0,
overlap_samples: Optional[int] = None,
overlap_percent: Optional[float] = None,
metric: METRIC_FUNCTION_NAMES = "variance",
) -> pd.DataFrame:
"""Search for static moments within given input signal, based on windowed L2-norm thresholding.
The window size of sliding windows can either be specified in *samples* (``window_samples``)
or in *seconds* (``window_sec``, together with ``sampling_rate``).
The overlap of windows can either be specified in *samples* (``overlap_samples``)
or in *percent* (``overlap_percent``).
Parameters
----------
data : array with shape (n, 3) or (n, 2)
3D or 2D signal on which static moment detection should be performed (e.g. 3D-acc or 3D-gyr data)
window_samples : int, optional
window size in samples or ``None`` if window size is specified in seconds + sampling rate. Default: ``None``
window_sec : int, optional
window size in seconds or ``None`` if window size is specified in samples. Default: ``None``
sampling_rate : float, optional
sampling rate of data in Hz. Only needed if window size is specified in seconds (``window_sec`` parameter).
Default: ``None``
overlap_samples : int, optional
overlap of windows in samples or ``None`` if window overlap is specified in percent. Default: ``None``
overlap_percent : float, optional
overlap of windows in percent or ``None`` if window overlap is specified in samples. Default: ``None``
threshold : float
Threshold to decide whether a window should be considered as active or inactive. Window will be tested on
<= threshold
metric : str, optional
Metric which will be calculated per window, one of the following strings:
* 'variance' (default): Calculates variance value per window
* 'mean': Calculates mean value per window
* 'maximum': Calculates maximum value per window
* 'median': Calculates median value per window
Returns
-------
:class:`~pandas.DataFrame`
dataframe with ["start", "end"] columns indicating beginning and end of static regions within the input signal
Examples
--------
>>> _find_static_sequences(data, window_length=128, overlap=64, inactive_signal_th = 5, metric = 'mean')
See Also
--------
:func:`~biopsykit.utils.array_handling.sliding_window`
Details on the used windowing function for this method.
"""
# compute the data_norm of the variance in the windows
window, overlap = sanitize_sliding_window_input(
window_samples=window_samples,
window_sec=window_sec,
sampling_rate=sampling_rate,
overlap_samples=overlap_samples,
overlap_percent=overlap_percent,
)
if data.empty:
start_end = np.zeros(shape=(0, 2))
else:
data = sanitize_input_nd(data)
start_end = _find_static_sequences(
data, window_length=window, overlap=overlap, inactive_signal_th=threshold, metric=metric
)
if len(start_end) == 0:
return pd.DataFrame(columns=["start", "end"])
# end indices are *inclusive*!
start_end[:, 1] -= 1
return pd.DataFrame(start_end, columns=["start", "end"])
[docs]def find_first_static_window_multi_sensor(
signals: Sequence[np.ndarray],
window_length: int,
inactive_signal_th: float,
metric: METRIC_FUNCTION_NAMES,
) -> Tuple[int, int]:
"""Find the first time window in the signal where all provided sensors are static.
Parameters
----------
signals : Sequence of n arrays with shape (k, m) or a 3D-array with shape (k, n, m)
The signals of n senors with m axis and k samples.
window_length
Length of the required static signal in samples
inactive_signal_th
The threshold for static windows.
If metric(norm(window, axis=-1)) <= `inactive_signal_th` for all sensors, it is considered static.
metric
The metric that should be calculated on the vectornorm over all axis for each sensor in each window
Returns
-------
(start, end)
Start and end index of the first static window.
Examples
--------
>>> sensor_1_gyro = ...
>>> sensor_2_gyro = ...
>>> find_first_static_window_multi_sensor([sensor_1_gyro, sensor_2_gyro], window_length=128, inactive_signal_th=5)
"""
if metric not in _METRIC_FUNCTIONS:
raise ValueError(f"`metric` must be one of {list(_METRIC_FUNCTIONS.keys())}")
if not isinstance(signals, np.ndarray):
# all signals should have the same shape
if not all(signals[0].shape == signal.shape for signal in signals):
raise ValueError("All provided signals need to have the same shape.")
if signals[0].ndim != 2:
raise ValueError(
"The array of each sensor must be 2D, where the first dimension is the time and the second dimension "
"the sensor axis."
)
signals = np.hstack(signals)
elif signals.ndim != 3:
raise ValueError(
"If a array is used as input, it must be 3D, where the first dimension is the time, "
"the second indicates the sensor and the third the axis of the sensor."
)
n_signals = signals.shape[1]
windows = sliding_window_view(
signals.reshape((signals.shape[0], -1)),
window_length=window_length,
overlap=window_length - 1,
nan_padding=False,
)
reshaped_windows = windows.reshape((*windows.shape[:-1], n_signals, -1))
window_norm = norm(reshaped_windows, axis=-1)
method = _METRIC_FUNCTIONS[metric]
# This is pretty wasteful as we calculate the the function on all windows, even though we are only interested in
# the first, where our threshold is valid.
window_over_thres = method(window_norm, axis=1).max(axis=-1) <= inactive_signal_th
valid_windows = np.nonzero(window_over_thres)[0]
if len(valid_windows) == 0:
raise ValueError("No static window was found")
return valid_windows[0], valid_windows[0] + window_length