Source code for biopsykit.saliva.saliva

"""Functions for processing saliva data and computing established features (AUC, slope, maximum increase, ...)."""
import warnings
from typing import Dict, List, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd
from biopsykit.saliva.utils import (
    _check_sample_times,
    _get_group_cols,
    _get_saliva_idx_labels,
    _get_sample_times,
    _remove_s0,
)
from biopsykit.utils.datatype_helper import (
    SalivaFeatureDataFrame,
    SalivaMeanSeDataFrame,
    SalivaRawDataFrame,
    _SalivaFeatureDataFrame,
    _SalivaMeanSeDataFrame,
    is_saliva_feature_dataframe,
    is_saliva_mean_se_dataframe,
    is_saliva_raw_dataframe,
)
from biopsykit.utils.exceptions import DataFrameTransformationError
from biopsykit.utils.functions import se


[docs]def max_value( data: SalivaRawDataFrame, saliva_type: Optional[Union[str, Sequence[str]]] = "cortisol", remove_s0: Optional[bool] = False, ) -> Union[SalivaFeatureDataFrame, Dict[str, SalivaFeatureDataFrame]]: """Compute maximum value. The output feature name will be ``max_val``, preceded by the name of the saliva type to allow better conversion into long-format later on (if desired). So e.g., for cortisol, it will be: ``cortisol_max_val``. Parameters ---------- data : :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format saliva_type : str or list of str saliva type or list of saliva types to compute features on remove_s0 : bool, optional whether to remove the first saliva sample for computing maximum or not. Default: ``False`` Returns ------- :obj:`~biopsykit.utils.datatype_helper.SalivaFeatureDataFrame` or dict of such dataframe containing the computed features, or a dict of such if ``saliva_type`` is a list Raises ------ :exc:`~biopsykit.utils.exceptions.ValidationError` if ``data`` is not a :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` """ # check input is_saliva_raw_dataframe(data, saliva_type) if isinstance(saliva_type, list): dict_result = {} for saliva in saliva_type: saliva_col = [saliva] if "time" in data: saliva_col = [*saliva_col, "time"] dict_result[saliva] = max_value( data[saliva_col], saliva_type=saliva, remove_s0=remove_s0, ) return dict_result if remove_s0: # We have a S0 sample => drop it data = data.drop(0, level="sample", errors="ignore") data = data.drop("0", level="sample", errors="ignore") data = data.drop("S0", level="sample", errors="ignore") data = data[[saliva_type]].unstack(level="sample") max_val = data.max(axis=1) out = pd.DataFrame( max_val, columns=[f"{saliva_type}_max_val"], index=max_val.index, ) out.columns.name = "saliva_feature" # check output is_saliva_feature_dataframe(out, saliva_type) return out
[docs]def initial_value( data: SalivaRawDataFrame, saliva_type: Optional[Union[str, Sequence[str]]] = "cortisol", remove_s0: Optional[bool] = False, ) -> Union[SalivaFeatureDataFrame, Dict[str, SalivaFeatureDataFrame]]: """Compute initial saliva sample. The output feature name will be ``ini_val``, preceded by the name of the saliva type to allow better conversion into long-format later on (if desired). So e.g., for cortisol, it will be: ``cortisol_ini_val``. Parameters ---------- data : :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format saliva_type : str or list of str saliva type or list of saliva types to compute features on remove_s0 : bool, optional whether to remove the first saliva sample for computing initial value or not. Default: ``False`` Returns ------- :obj:`~biopsykit.utils.datatype_helper.SalivaFeatureDataFrame` or dict of such dataframe containing the computed features, or a dict of such if ``saliva_type`` is a list Raises ------ :exc:`~biopsykit.utils.exceptions.ValidationError` if ``data`` is not a :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` """ # check input is_saliva_raw_dataframe(data, saliva_type) if isinstance(saliva_type, list): dict_result = {} for saliva in saliva_type: saliva_col = [saliva] if "time" in data: saliva_col = [*saliva_col, "time"] dict_result[saliva] = initial_value( data[saliva_col], saliva_type=saliva, remove_s0=remove_s0, ) return dict_result if remove_s0: # We have a S0 sample => drop it data = _remove_s0(data) data = data[[saliva_type]].unstack(level="sample") ini_val = data.iloc[:, 0] out = pd.DataFrame( ini_val.values, columns=[f"{saliva_type}_ini_val"], index=ini_val.index, ) out.columns.name = "saliva_feature" # check output is_saliva_feature_dataframe(out, saliva_type) return out
[docs]def max_increase( data: SalivaRawDataFrame, saliva_type: Optional[Union[str, Sequence[str]]] = "cortisol", remove_s0: Optional[bool] = False, percent: Optional[bool] = False, ) -> Union[SalivaFeatureDataFrame, Dict[str, SalivaFeatureDataFrame]]: """Compute maximum increase between first saliva sample and all others. The maximum increase (`max_inc`) is defined as the difference between the `first` sample and the maximum of all `subsequent` samples. If the first sample should be excluded from computation, e.g., because the first sample was just collected for controlling against high initial saliva levels, ``remove_s0`` needs to set to ``True``. The output is either the absolute increase or the relative increase to the first sample in percent (if ``percent`` is ``True``). The output feature name will be ``max_inc``, preceded by the name of the saliva type to allow better conversion into long-format later on (if desired). So e.g., for cortisol, it will be: ``cortisol_max_inc``. Parameters ---------- data : :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format saliva_type : str or list of str saliva type or list of saliva types to compute features on remove_s0 : bool, optional whether to exclude the first saliva sample from computing `max_inc` or not. Default: ``False`` percent : bool, optional whether to compute ``max_inc`` in percent (i.e., relative increase) or not. Default: ``False`` Returns ------- :obj:`~biopsykit.utils.datatype_helper.SalivaFeatureDataFrame` or dict of such dataframe containing the computed features, or a dict of such if ``saliva_type`` is a list Raises ------ :exc:`~biopsykit.utils.exceptions.ValidationError` if ``data`` is not a :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` """ # check input is_saliva_raw_dataframe(data, saliva_type) if isinstance(saliva_type, list): dict_result = {} for saliva in saliva_type: saliva_col = [saliva] if "time" in data: saliva_col = [*saliva_col, "time"] dict_result[saliva] = max_increase( data[saliva_col], saliva_type=saliva, remove_s0=remove_s0, percent=percent, ) return dict_result if remove_s0: # We have a S0 sample => drop it data = _remove_s0(data) data = data[[saliva_type]].unstack(level="sample") max_inc = data.iloc[:, 1:].max(axis=1) - data.iloc[:, 0] if percent: max_inc = 100.0 * max_inc / np.abs(data.iloc[:, 0]) out = pd.DataFrame( max_inc, columns=[f"{saliva_type}_max_inc_percent" if percent else f"{saliva_type}_max_inc"], index=max_inc.index, ) out.columns.name = "saliva_feature" # check output is_saliva_feature_dataframe(out, saliva_type) return out
[docs]def auc( data: SalivaRawDataFrame, saliva_type: Optional[Union[str, Sequence[str]]] = "cortisol", remove_s0: Optional[bool] = False, compute_auc_post: Optional[bool] = False, sample_times: Optional[Union[np.ndarray, Sequence[int], str]] = None, ) -> Union[SalivaFeatureDataFrame, Dict[str, SalivaFeatureDataFrame]]: r"""Compute area-under-the-curve (AUC) for saliva samples. The area-under-the-curve is computed according to Pruessner et al. (2003) using the trapezoidal rule (:func:`numpy.trapz`). To compute an AUC the saliva time points are required in minutes. They can either be part of the :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` (`time` column) or can be supplied as extra parameter (``sample_times``). Pruessner defined two types of AUC, which are computed by default: * AUC with respect to `ground` (:math:`AUC_{G}`), and * AUC with respect to the first sample, i.e., AUC with respect to `increase` (:math:`AUC_{I}`) If the first sample should be excluded from computation, e.g., because the first sample was just collected for controlling against high initial saliva levels, ``remove_s0`` needs to set to ``True``. If saliva samples were collected during an acute stress task :math:`AUC_{I}` can additionally be computed only for the saliva values *after* the stressor by setting ``compute_auc_post`` to ``True``. .. note:: For a *pre/post* stress scenario *post*-stress saliva samples are indicated by time points :math:`t \geq 0`, saliva sampled collected *before* start of the stressor are indicated by time points :math:`t < 0`. This means that a saliva sample collected at time :math:`t = 0` is defined as *right after stressor*. The feature names will be ``auc_g``, ``auc_i`` (and ``auc_i_post`` if ``compute_auc_post`` is ``True``), preceded by the name of the saliva type to allow better conversion into long-format later on (if desired). So e.g., for cortisol, it will be: ``cortisol_auc_g``. Parameters ---------- data : :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format saliva_type : str or list of str saliva type or list of saliva types to compute features on remove_s0 : bool, optional whether to exclude the first saliva sample from computing ``auc`` or not. Default: ``False`` compute_auc_post : bool, optional whether to additionally compute :math:`AUC_I` only for saliva samples *post* stressor. Saliva samples *post* stressor are defined as all samples with non-negative ``sample_times``. Default: ``False`` sample_times: :any:`numpy.ndarray` or list, optional Saliva sampling times (corresponding to x-axis values for computing AUC). By default (``sample_times`` is ``None``) sample times are expected to be part of the dataframe (in the `time` column). Alternatively, sample times can be specified by passing a list or a numpy array to this argument. If ``sample_times`` is a 1D array, it is assumed that saliva times are the same for all subjects. Then, ``sample_times`` needs to have the shape (n_samples,). If ``sample_times`` is a 2D array, it is assumed that saliva times are individual for all subjects. Then, ``saliva_times`` needs to have the shape (n_subjects, n_samples). Returns ------- :obj:`~biopsykit.utils.datatype_helper.SalivaFeatureDataFrame` or dict of such dataframe containing the computed features, or a dict of such if ``saliva_type`` is a list Raises ------ :exc:`~biopsykit.utils.exceptions.ValidationError` if ``data`` is not a :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` References ---------- Pruessner, J. C., Kirschbaum, C., Meinlschmid, G., & Hellhammer, D. H. (2003). Two formulas for computation of the area under the curve represent measures of total hormone concentration versus time-dependent change. Psychoneuroendocrinology, 28(7), 916-931. https://doi.org/10.1016/S0306-4530(02)00108-7 """ # check input is_saliva_raw_dataframe(data, saliva_type) if isinstance(saliva_type, list): dict_result = {} for saliva in saliva_type: saliva_col = [saliva] if "time" in data: saliva_col = [*saliva_col, "time"] dict_result[saliva] = auc( data[saliva_col], saliva_type=saliva, remove_s0=remove_s0, sample_times=sample_times, ) return dict_result sample_times = _get_sample_times(data, saliva_type, sample_times, remove_s0) _check_sample_times(sample_times) if remove_s0: # We have a S0 sample => drop it data = _remove_s0(data) data = data[[saliva_type]].unstack(level="sample") auc_data = { "auc_g": np.trapz(data, sample_times), "auc_i": np.trapz(data.sub(data.iloc[:, 0], axis=0), sample_times), } if compute_auc_post: auc_data = _auc_compute_auc_post(data, auc_data, sample_times) out = pd.DataFrame(auc_data, index=data.index).add_prefix(f"{saliva_type}_") out.columns.name = "saliva_feature" # check output is_saliva_feature_dataframe(out, saliva_type) return _SalivaFeatureDataFrame(out)
def _auc_compute_auc_post( data: SalivaRawDataFrame, auc_data: Dict[str, np.ndarray], sample_times: np.ndarray ) -> Dict[str, np.ndarray]: idxs_post = None if sample_times.ndim == 1: idxs_post = np.where(sample_times >= 0)[0] elif sample_times.ndim == 2: warnings.warn( "Not computing `auc_i_post` values because this is only implemented if `saliva_times` " "are the same for all subjects." ) if idxs_post is not None: data_post = data.iloc[:, idxs_post] auc_data["auc_i_post"] = np.trapz(data_post.sub(data_post.iloc[:, 0], axis=0), sample_times[idxs_post]) return auc_data
[docs]def slope( data: SalivaRawDataFrame, sample_labels: Optional[Union[Tuple, Sequence]] = None, sample_idx: Optional[Union[Tuple[int, int], Sequence[int]]] = None, saliva_type: Optional[Union[str, Sequence[str]]] = "cortisol", sample_times: Optional[Sequence[int]] = None, ) -> Union[SalivaFeatureDataFrame, Dict[str, SalivaFeatureDataFrame]]: """Compute the slope between two saliva samples. The samples to compute the slope can either be specified by `index` (parameter `sample_idx`) [0, num_of_samples-1] or by `label` (parameter `sample_idx`). The output feature name for the slope between saliva samples with labels `label1` and `label2` will be ``slope<label1><label2>``, preceded by the name of the saliva type to allow better conversion into long-format later on (if desired). So e.g., for cortisol, it will be: ``cortisol_slopeS1S2``. Parameters ---------- data : :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format saliva_type : str or list of str saliva type or list of saliva types to compute features on sample_labels : list or tuple pair of saliva sample labels to compute slope between. Labels correspond to the names in the `sample` column of the dataframe. An error will the raised if not exactly 2 samples are specified. sample_idx : list or tuple pair of saliva sample indices to compute slope between. An error will the raised if not exactly 2 sample are specified sample_times: :any:`numpy.ndarray` or list, optional Saliva sampling times (corresponding to x-axis values for computing slope). By default (``sample_times`` is ``None``) sample times are expected to be part of the dataframe (in the `time` column). Alternatively, sample times can be specified by passing a list or a numpy array to this argument. If ``sample_times`` is a 1D array, it is assumed that saliva times are the same for all subjects. Then, ``sample_times`` needs to have the shape (n_samples,). If ``sample_times`` is a 2D array, it is assumed that saliva times are individual for all subjects. Then, ``saliva_times`` needs to have the shape (n_subjects, n_samples). Returns ------- :obj:`~biopsykit.utils.datatype_helper.SalivaFeatureDataFrame` or dict of such dataframe containing the computed features, or a dict of such if ``saliva_type`` is a list Raises ------ IndexError if invalid `sample_labels` or `sample_idx` is provided :exc:`~biopsykit.utils.exceptions.ValidationError` if ``data`` is not a :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` """ # check input is_saliva_raw_dataframe(data, saliva_type) if sample_idx is None and sample_labels is None: raise IndexError("Either `sample_labels` or `sample_idx` must be supplied as parameter!") if sample_idx is not None and sample_labels is not None: raise IndexError("Either `sample_labels` or `sample_idx` must be supplied as parameter, not both!") if isinstance(saliva_type, list): dict_result = {} for saliva in saliva_type: saliva_col = [saliva] if "time" in data: saliva_col = [*saliva_col, "time"] dict_result[saliva] = slope( data[saliva_col], sample_labels=sample_labels, sample_idx=sample_idx, saliva_type=saliva, sample_times=sample_times, ) return dict_result sample_times = _get_sample_times(data, saliva_type, sample_times) _check_sample_times(sample_times) data = data[[saliva_type]].unstack() sample_labels, sample_idx = _get_saliva_idx_labels( data[saliva_type].columns, sample_labels=sample_labels, sample_idx=sample_idx ) out = pd.DataFrame( np.diff(data.iloc[:, sample_idx]) / np.diff(sample_times[..., sample_idx]), index=data.index, columns=["{}_slope{}{}".format(saliva_type, *sample_labels)], ) out.columns.name = "saliva_feature" # check output is_saliva_feature_dataframe(out, saliva_type) return _SalivaFeatureDataFrame(out)
[docs]def standard_features( data: pd.DataFrame, saliva_type: Optional[Union[str, Sequence[str]]] = "cortisol", group_cols: Optional[Union[str, Sequence[str]]] = None, keep_index: Optional[bool] = True, ) -> Union[SalivaFeatureDataFrame, Dict[str, SalivaFeatureDataFrame]]: """Compute a set of `standard features` on saliva data. The following list of features is computed: * ``argmax``: Argument (=index) of the maximum value * ``mean``: Mean value * ``std``: Standard deviation * ``skew``: Skewness * ``kurt``: Kurtosis For all features the built-in pandas functions (e.g. :meth:`pandas.DataFrame.mean`) will be used, except for ``argmax``, which will use numpy's function (:func:`numpy.argmax`). The functions will be applied on the dataframe using the `aggregate` functions from pandas (:meth:`pandas.DataFrame.agg`). The output feature names will be ``argmax``, ``mean``, ``std``, ``skew``, ``kurt``, preceded by the name of the saliva type to allow better conversion into long-format later on (if desired). So e.g., for cortisol, it will be: ``cortisol_argmax``. Parameters ---------- data : :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format saliva_type : str or list of str saliva type or list of saliva types to compute features on group_cols: str or list of str, optional columns to group on before applying the aggregate function. If ``group_cols`` is ``None`` (the default), data will be grouped on by all columns except the `sample` column. Usually, data wants to be grouped by `subject` (followed by `condition`, `day`, `night`, etc., if applicable). keep_index : bool, optional whether to try keeping the old index or use the new index returned by the groupby-aggregate-function. Keeping the old index is e.g. useful if the dataframe has a multiindex with several levels, but grouping is only performed on a subset of these levels. Default: ``True`` Returns ------- :obj:`~biopsykit.utils.datatype_helper.SalivaFeatureDataFrame` or dict of such dataframe containing the computed features, or a dict of such if ``saliva_type`` is a list Raises ------ :exc:`~biopsykit.utils.exceptions.ValidationError` if ``data`` is not a :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` :exc:`~biopsykit.utils.exceptions.DataFrameTransformationError` if ``keep_index`` is ``True``, but applying the old index fails """ # check input is_saliva_raw_dataframe(data, saliva_type) if isinstance(saliva_type, list): dict_result = {} for saliva in saliva_type: saliva_col = [saliva] if "time" in data: saliva_col = [*saliva_col, "time"] dict_result[saliva] = standard_features(data[saliva_col], saliva_type=saliva) return dict_result group_cols = _get_group_cols(data, group_cols, "sample", "standard_features") out = ( data[[saliva_type]] .groupby(group_cols) .agg( [ np.argmax, pd.DataFrame.mean, pd.DataFrame.std, pd.DataFrame.skew, pd.DataFrame.kurt, ], ) ) if keep_index: try: out.index = data.unstack(level="sample").index except ValueError as e: raise DataFrameTransformationError( "DataFrame transformation failed: Unable to keep old dataframe index because index does not match with " "output data shape, possibly because 'groupby' recuded the index. " "Consider setting 'keep_index' to 'False'. " "The exact error was:\n\n{}".format(str(e)) ) from e # drop 'saliva_type' multiindex column and add as prefix to columns to ensure consistent naming with # the other saliva functions out.columns = out.columns.droplevel(0) out = out.add_prefix(f"{saliva_type}_") out.columns.name = "saliva_feature" # check output is_saliva_feature_dataframe(out, saliva_type) return _SalivaFeatureDataFrame(out)
[docs]def mean_se( data: SalivaRawDataFrame, saliva_type: Optional[Union[str, Sequence[str]]] = "cortisol", group_cols: Optional[Union[str, List[str]]] = None, remove_s0: Optional[bool] = False, ) -> Union[SalivaMeanSeDataFrame, Dict[str, SalivaMeanSeDataFrame]]: """Compute mean and standard error per saliva sample. Parameters ---------- data : :class:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format saliva_type : str or list of str saliva type or list of saliva types to compute features on group_cols: str or list of str, optional columns to group on before computing mean and se. If ``group_cols`` is ``None`` (the default), data will be grouped on by all columns except the `sample` column. Usually, data wants to be grouped by `subject` (followed by `condition`, `day`, `night`, etc., if applicable). remove_s0 : bool, optional whether to exclude the first saliva sample from computing mean and standard error or not. Default: ``False`` Returns ------- :class:`~biopsykit.utils.datatype_helper.SalivaMeanSeDataFrame` dataframe with mean and standard error per saliva sample or a dict of such if ``saliva_type`` is a list Raises ------ :exc:`~biopsykit.utils.exceptions.ValidationError` if ``data`` is not a :obj:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame` """ # check input is_saliva_raw_dataframe(data, saliva_type) if isinstance(saliva_type, list): dict_result = {} for biomarker in saliva_type: biomarker_cols = [biomarker] if "time" in data: biomarker_cols = ["time", *biomarker_cols] dict_result[biomarker] = mean_se(data[biomarker_cols], saliva_type=biomarker, remove_s0=remove_s0) return dict_result if remove_s0: # We have a S0 sample => drop it data = _remove_s0(data) group_cols = _get_group_cols(data, group_cols, "subject", "mean_se") _mean_se_assert_group_cols(data, group_cols) if "time" in data.columns and "time" not in group_cols: # add 'time' column to grouper if it's in the data and wasn't added yet because otherwise # we would loose this column group_cols = [*group_cols, "time"] data_mean_se = data.groupby(group_cols).agg([np.mean, se])[saliva_type] is_saliva_mean_se_dataframe(data_mean_se) return _SalivaMeanSeDataFrame(data_mean_se)
def _mean_se_assert_group_cols(data: pd.DataFrame, group_cols: Sequence[str]): if group_cols == list(data.index.names): # if data should be grouped by *all* index levels we can not compute an aggregation raise DataFrameTransformationError( "Cannot compute mean and standard error on data because *all* index " "columns were selected as group columns, so each sample value would be one group of its own!" ) if "sample" not in group_cols: raise DataFrameTransformationError( "Error computing mean and standard error on data: 'sample' index level needs to be added as group column!" )