"""Module providing various functions for low-level handling of array data."""
from typing import List, Optional, Tuple, Union
import neurokit2 as nk
import numpy as np
import pandas as pd
from biopsykit.utils._types import arr_t, str_t
from scipy import interpolate, signal
[docs]def find_extrema_in_radius(
data: arr_t,
indices: arr_t,
radius: Union[int, Tuple[int, int]],
extrema_type: Optional[str] = "min",
) -> np.ndarray:
"""Find extrema values (min or max) within a given radius around array indices.
Parameters
----------
data : array_like
input data
indices : array_like
array with indices for which to search for extrema values around
radius: int or tuple of int
radius around ``indices`` to search for extrema:
* if ``radius`` is an ``int`` then search for extrema equally in both directions in the interval
[index - radius, index + radius].
* if ``radius`` is a ``tuple`` then search for extrema in the interval
[ index - radius[0], index + radius[1] ]
extrema_type : {'min', 'max'}, optional
extrema type to be searched for. Default: 'min'
Returns
-------
:class:`~numpy.ndarray`
numpy array containing the indices of the found extrema values in the given radius around ``indices``.
Has the same length as ``indices``.
Examples
--------
>>> from biopsykit.utils.array_handling import find_extrema_in_radius
>>> data = pd.read_csv("data.csv")
>>> indices = np.array([16, 25, 40, 57, 86, 100])
>>>
>>> radius = 4
>>> # search minima in 'data' in a 4 sample 'radius' around each entry of 'indices'
>>> find_extrema_in_radius(data, indices, radius)
>>>
>>> radius = (5, 0)
>>> # search maxima in 'data' in a 5 samples before each entry of 'indices'
>>> find_extrema_in_radius(data, indices, radius, extrema_type='max')
"""
extrema_funcs = {"min": np.nanargmin, "max": np.nanargmax}
if extrema_type not in extrema_funcs:
raise ValueError(f"`extrema_type` must be one of {list(extrema_funcs.keys())}, not {extrema_type}")
extrema_func = extrema_funcs[extrema_type]
# ensure numpy
data = sanitize_input_1d(data)
indices = sanitize_input_1d(indices)
indices = indices.astype(int)
# possible start offset if beginning of array needs to be padded to ensure radius
start_padding = 0
lower_limit, upper_limit = _find_extrema_in_radius_get_limits(radius)
# pad end/start of array if last_index+radius/first_index-radius is longer/shorter than array
if len(data) - np.max(indices) <= upper_limit:
data = np.pad(data, (0, upper_limit), constant_values=np.nan)
if np.min(indices) < lower_limit:
start_padding = lower_limit
data = np.pad(data, (lower_limit, 0), constant_values=np.nan)
# initialize window array
windows = np.zeros(shape=(len(indices), lower_limit + upper_limit + 1))
for i, index in enumerate(indices):
# get windows around index
windows[i] = data[index - lower_limit + start_padding : index + upper_limit + start_padding + 1]
return extrema_func(windows, axis=1) + indices - lower_limit
def _find_extrema_in_radius_get_limits(radius: Union[int, Tuple[int, int]]) -> Tuple[int, int]:
# determine upper and lower limit
lower_limit = radius[0] if isinstance(radius, tuple) else radius
upper_limit = radius[-1] if isinstance(radius, tuple) else radius
# round up and make sure it's an integer
lower_limit = np.ceil(lower_limit).astype(int)
upper_limit = np.ceil(upper_limit).astype(int)
return lower_limit, upper_limit
[docs]def remove_outlier_and_interpolate(
data: arr_t,
outlier_mask: np.ndarray,
x_old: Optional[np.ndarray] = None,
desired_length: Optional[int] = None,
) -> np.ndarray:
"""Remove outliers, impute missing values and optionally interpolate data to desired length.
Detected outliers are removed from array and imputed by linear interpolation.
Optionally, the output array can be linearly interpolated to a new length.
Parameters
----------
data : array_like
input data
outlier_mask : :class:`~numpy.ndarray`
boolean outlier mask. Has to be the same length as ``data``. ``True`` entries indicate outliers.
If ``outlier_mask`` is not a bool array values will be casted to bool
x_old : array_like, optional
x values of the input data to be interpolated or ``None`` if output data should not be interpolated
to new length. Default: ``None``
desired_length : int, optional
desired length of the output signal or ``None`` to keep length of input signal. Default: ``None``
Returns
-------
:class:`~numpy.ndarray`
data with removed and imputed outliers, optionally interpolated to desired length
Raises
------
ValueError
if ``data`` and ``outlier_mask`` don't have the same length or if ``x_old`` is ``None`` when ``desired_length``
is passed as parameter
"""
# ensure numpy and ensure that outlier_mask is a boolean array
data = sanitize_input_nd(data)
outlier_mask = np.array(outlier_mask).astype(bool)
if len(data) != len(outlier_mask):
raise ValueError("'data' and 'outlier_mask' need to have same length!")
if x_old is None and desired_length:
raise ValueError("'x_old' must also be passed when 'desired_length' is passed!")
x_old = np.array(x_old)
# remove outlier
data[outlier_mask] = np.nan
# fill outlier by linear interpolation of neighbors
data = pd.Series(data).interpolate(limit_direction="both").to_numpy()
# interpolate signal
x_new = np.linspace(x_old[0], x_old[-1], desired_length)
return nk.signal_interpolate(x_old, data, x_new, method="linear")
[docs]def sliding_window(
data: arr_t,
window_samples: Optional[int] = None,
window_sec: Optional[int] = None,
sampling_rate: Optional[float] = None,
overlap_samples: Optional[int] = None,
overlap_percent: Optional[float] = None,
) -> np.ndarray:
"""Create sliding windows from an input array.
The window size of sliding windows can either be specified in *samples* (``window_samples``)
or in *seconds* (``window_sec``, together with ``sampling_rate``).
The overlap of windows can either be specified in *samples* (``overlap_samples``)
or in *percent* (``overlap_percent``).
.. note::
If ``data`` has more than one dimension the sliding window view is applied to the **first** dimension.
In the 2-d case this would correspond to applying windows along the **rows**.
Parameters
----------
data : array_like
input data
window_samples : int, optional
window size in samples or ``None`` if window size is specified in seconds + sampling rate. Default: ``None``
window_sec : int, optional
window size in seconds or ``None`` if window size is specified in samples. Default: ``None``
sampling_rate : float, optional
sampling rate of data in Hz. Only needed if window size is specified in seconds (``window_sec`` parameter).
Default: ``None``
overlap_samples : int, optional
overlap of windows in samples or ``None`` if window overlap is specified in percent. Default: ``None``
overlap_percent : float, optional
overlap of windows in percent or ``None`` if window overlap is specified in samples. Default: ``None``
Returns
-------
:class:`~numpy.ndarray`
sliding windows from input array.
See Also
--------
:func:`~biopsykit.utils.array_handling.sliding_window_view`
create sliding window of input array. low-level function with less input parameter configuration possibilities
"""
# check input
data = sanitize_input_nd(data)
window, overlap = sanitize_sliding_window_input(
window_samples=window_samples,
window_sec=window_sec,
sampling_rate=sampling_rate,
overlap_samples=overlap_samples,
overlap_percent=overlap_percent,
)
return sliding_window_view(data, window_length=window, overlap=overlap, nan_padding=True)
def _compute_overlap_samples(
window: int, overlap_samples: Optional[int] = None, overlap_percent: Optional[float] = None
):
if overlap_samples is not None:
overlap = int(overlap_samples)
elif overlap_percent is not None:
if overlap_percent > 1:
overlap_percent /= 100
overlap = int(overlap_percent * window)
else:
overlap = window - 1
return overlap
[docs]def sliding_window_view(array: np.ndarray, window_length: int, overlap: int, nan_padding: bool = False) -> np.ndarray:
"""Create a sliding window view of an input array with given window length and overlap.
.. warning::
This function will return by default a view onto your input array, modifying values in your result will directly
affect your input data which might lead to unexpected behaviour! If padding is disabled (default), last window
fraction of input may not be returned! However, if `nan_padding` is enabled, this will always return a copy
instead of a view of your input data, independent if padding was actually performed or not!
Parameters
----------
array : :class:`~numpy.ndarray` with shape (n,) or (n, m)
array on which sliding window action should be performed. Windowing
will always be performed along axis 0.
window_length : int
length of desired window (must be smaller than array length n)
overlap : int
length of desired overlap (must be smaller than window_length)
nan_padding : bool
select if last window should be nan-padded or discarded if it not fits with input array length. If nan-padding
is enabled the return array will always be a copy of the input array independent if padding was actually
performed or not!
Returns
-------
:class:`~numpy.ndarray`
windowed view (or copy if ``nan_padding`` is ``True``) of input array as specified,
last window might be nan-padded if necessary to match window size
Examples
--------
>>> data = np.arange(0,10)
>>> windowed_view = sliding_window_view(array = data, window_length = 5, overlap = 3, nan_padding = True)
>>> windowed_view
array([[ 0., 1., 2., 3., 4.],
[ 2., 3., 4., 5., 6.],
[ 4., 5., 6., 7., 8.],
[ 6., 7., 8., 9., nan]])
"""
if overlap >= window_length:
raise ValueError("Invalid Input, overlap must be smaller than window length!")
if window_length < 2:
raise ValueError("Invalid Input, window_length must be larger than 1!")
# calculate length of necessary np.nan-padding to make sure windows and overlaps exactly fits data length
n_windows = np.ceil((len(array) - window_length) / (window_length - overlap)).astype(int)
pad_length = window_length + n_windows * (window_length - overlap) - len(array)
# had to handle 1D arrays separately
if array.ndim == 1:
if nan_padding:
# np.pad always returns a copy of the input array even if pad_length is 0!
array = np.pad(array.astype(float), (0, pad_length), constant_values=np.nan)
new_shape = (array.size - window_length + 1, window_length)
else:
if nan_padding:
# np.pad always returns a copy of the input array even if pad_length is 0!
array = np.pad(array.astype(float), [(0, pad_length), (0, 0)], constant_values=np.nan)
shape = (window_length, array.shape[-1])
n = np.array(array.shape)
o = n - shape + 1 # output shape
new_shape = np.concatenate((o, shape), axis=0)
# apply stride_tricks magic
new_strides = np.concatenate((array.strides, array.strides), axis=0)
view = np.lib.stride_tricks.as_strided(array, new_shape, new_strides)[0 :: (window_length - overlap)]
view = np.squeeze(view) # get rid of single-dimensional entries from the shape of an array.
return view
[docs]def downsample(
data: arr_t,
fs_in: float,
fs_out: float,
) -> np.ndarray:
"""Downsample input signal to a new sampling rate.
If the output sampling rate is a divisor of the input sampling rate, the signal is downsampled using
:func:`~scipy.signal.decimate`. Otherwise, data is first filtered using an aliasing filter before it is
downsampled using linear interpolation.
Parameters
----------
data : :class:`~numpy.ndarray`
input data
fs_in : float
sampling rate of input data in Hz.
fs_out : float
sampling rate of output data in Hz
Returns
-------
:class:`~numpy.ndarray`
output data with new sampling rate
"""
data = sanitize_input_nd(data)
if (fs_in / fs_out) % 1 == 0:
return signal.decimate(data, int(fs_in / fs_out), axis=0)
# aliasing filter
b, a = signal.cheby1(N=8, rp=0.05, Wn=0.8 / (fs_in / fs_out))
data_lp = signal.filtfilt(a=a, b=b, x=data)
# interpolation
x_old = np.linspace(0, len(data_lp), num=len(data_lp), endpoint=False)
x_new = np.linspace(0, len(data_lp), num=int(len(data_lp) / (fs_in / fs_out)), endpoint=False)
interpol = interpolate.interp1d(x=x_old, y=data_lp)
return interpol(x_new)
def _bool_fill(indices: np.ndarray, bool_values: np.ndarray, array: np.ndarray) -> np.ndarray:
"""Fill a pre-allocated array with bool values.
This function iterates over the indices and adds the bool_values to the array at the given
indices using a logical "or".
Parameters
----------
indices : :class:`~numpy.ndarray`
list with indices to add to the array
bool_values : :class:`~numpy.ndarray`
bool values to add to the array
array : :class:`~numpy.ndarray`
pre-allocated array to fill with bool values
Returns
-------
:class:`~numpy.ndarray`
array filled with bool values
"""
for i in range(len(indices)): # pylint:disable=consider-using-enumerate
index = indices[i]
val = bool_values[i]
index = index[~np.isnan(index)]
# perform logical or operation to combine all overlapping window results
array[index] = np.logical_or(array[index], val)
return array
[docs]def bool_array_to_start_end_array(bool_array: np.ndarray) -> np.ndarray:
"""Find regions in bool array and convert those to start-end indices.
.. note::
The end index is inclusive!
Parameters
----------
bool_array : :class:`~numpy.ndarray` with shape (n,)
boolean array with either 0/1, 0.0/1.0 or True/False elements
Returns
-------
:class:`~numpy.ndarray`
array of [start, end] indices with shape (n,2)
Examples
--------
>>> example_array = np.array([0,0,1,1,0,0,1,1,1])
>>> start_end_list = bool_array_to_start_end_array(example_array)
>>> start_end_list
array([[2, 4],
[6, 9]])
>>> example_array[start_end_list[0, 0]: start_end_list[0, 1]]
array([1, 1])
"""
# check if input is actually a boolean array
if not np.array_equal(bool_array, bool_array.astype(bool)):
raise ValueError("Input must be boolean array!")
slices = np.ma.flatnotmasked_contiguous(np.ma.masked_equal(bool_array, 0))
return np.array([[s.start, s.stop] for s in slices])
[docs]def split_array_equally(data: arr_t, n_splits: int) -> List[Tuple[int, int]]:
"""Generate indices to split array into parts with equal lengths.
Parameters
----------
data : array_like
data to split
n_splits : int
number of splits
Returns
-------
list of tuples
list with start and end indices which will lead to splitting array into parts with equal lengths
"""
idx_split = np.arange(0, n_splits + 1) * ((len(data) - 1) // n_splits)
split_boundaries = list(zip(idx_split[:-1], idx_split[1:]))
return split_boundaries
[docs]def accumulate_array(data: arr_t, fs_in: float, fs_out: float) -> arr_t:
"""Accumulate 1-d array by summing over windows.
Parameters
----------
data : array_like
data to accumulate. must be 1-d array
fs_in : float
sampling rate of input data in Hz
fs_out : float
sampling rate of output data in Hz
Returns
-------
array_like
accumulated array
"""
arr = sanitize_input_nd(data, ncols=1)
n_samples = int(fs_in // fs_out)
padded_arr = np.pad(arr, (0, n_samples - len(arr) % n_samples), "constant", constant_values=0)
out = padded_arr.reshape((len(padded_arr) // n_samples, -1)).sum(axis=1)
if isinstance(data, pd.DataFrame) and isinstance(data.index, pd.DatetimeIndex):
out = add_datetime_index(out, start_time=data.index[0], sampling_rate=fs_out, column_name=data.columns)
return out
[docs]def add_datetime_index(
arr: arr_t, start_time: pd.Timestamp, sampling_rate: float, column_name: Optional[str_t] = None
) -> pd.DataFrame:
"""Add datetime index to dataframe.
Parameters
----------
arr : array_like
numpy array to add index to
start_time : :class:`~pandas.Timestamp`
start time of the index
sampling_rate : float
sampling rate of input data in Hz.
column_name : str, optional
column of the resulting dataframe or ``None`` to leave it empty.
Returns
-------
:class:`~pandas.DataFrame`
dataframe with datetime index
"""
if isinstance(column_name, str):
# assert list
column_name = [column_name]
arr = pd.DataFrame(arr, columns=column_name) if column_name is not None else pd.DataFrame(arr)
start_time_s = float(start_time.to_datetime64()) / 1e9
arr.index = pd.to_datetime(
(arr.index * 1 / sampling_rate + start_time_s).astype(int), utc=True, unit="s"
).tz_convert(start_time.tzinfo)
arr.index.name = "time"
return arr