"""Utility functions for working with saliva dataframes."""
import re
from datetime import datetime, time
from typing import Dict, List, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
from biopsykit.utils._datatype_validation_helper import _assert_has_index_levels, _assert_is_dtype
from biopsykit.utils._types import arr_t
from biopsykit.utils.datatype_helper import SalivaFeatureDataFrame, SalivaRawDataFrame, _SalivaRawDataFrame
__all__ = [
"saliva_feature_wide_to_long",
"get_saliva_column_suggestions",
"extract_saliva_columns",
"sample_times_datetime_to_minute",
]
[docs]def saliva_feature_wide_to_long(
data: SalivaFeatureDataFrame,
saliva_type: str,
) -> pd.DataFrame:
"""Convert ``SalivaFeatureDataFrame`` from wide-format into long-format.
Parameters
----------
data : :class:`~biopsykit.utils.datatype_helper.SalivaFeatureDataFrame`
dataframe containing saliva features in wide-format, i.e. one column per saliva sample, one row per subject.
saliva_type : str
saliva type (e.g. 'cortisol')
Returns
-------
:class:`~pandas.DataFrame`
dataframe with saliva features in long-format
"""
data = data.filter(like=saliva_type)
index_cols = list(data.index.names)
j = "saliva_feature"
# iteratively build up long-format dataframe
data = pd.wide_to_long(
data.reset_index(),
stubnames=saliva_type,
i=index_cols,
j=j,
sep="_",
suffix=r"\w+",
)
# reorder levels and sort
return data.reorder_levels([*index_cols, j]).sort_index()
[docs]def get_saliva_column_suggestions(data: pd.DataFrame, saliva_type: Union[str, Sequence[str]]) -> Sequence[str]:
"""Automatically extract possible saliva data columns from a pandas dataframe.
This is for example useful when one large dataframe is used to store demographic information,
questionnaire data and saliva data.
Parameters
----------
data: :class:`~pandas.DataFrame`
dataframe which should be extracted
saliva_type: str or list of str
saliva type variable which or list of saliva types should be used to extract columns (e.g. 'cortisol')
Returns
-------
list or dict
list of suggested columns containing saliva data or dict of such if ``saliva_type`` is a list
"""
# check if input is dataframe
_assert_is_dtype(data, pd.DataFrame)
if isinstance(saliva_type, list):
dict_result = {}
for saliva in saliva_type:
dict_result[saliva] = get_saliva_column_suggestions(data=data, saliva_type=saliva)
return dict_result
if saliva_type not in _dict_saliva_type_suggs:
raise ValueError(f"Invalid saliva type '{saliva_type}'! Must be one of {list(_dict_saliva_type_suggs.keys())}.")
sugg_filt = list(
filter(
lambda col: any(k in col for k in _dict_saliva_type_suggs[saliva_type]),
data.columns,
)
)
sugg_filt = list(filter(lambda s: any(str(i) in s for i in range(0, 20)), sugg_filt))
sugg_filt = list(
filter(
lambda s: all(
k not in s
for k in (
"AUC",
"auc",
"TSST",
"max",
"log",
"inc",
"lg",
"ln",
"GenExp",
"inv",
)
),
sugg_filt,
)
)
# replace il{} with il6 since this was removed out by the previous filter operation
sugg_filt = [re.sub(r"\d", "{}", s).replace("il{}", "il6").replace("IL{}", "IL6") for s in sugg_filt]
sugg_filt = sorted(filter(lambda s: "{}" in s, set(sugg_filt)))
# build regex for column extraction
sugg_filt = ["^{}$".format(s.replace("{}", r"(\d)")) for s in sugg_filt]
return sugg_filt
[docs]def extract_saliva_columns(
data: pd.DataFrame, saliva_type: Union[str, Sequence[str]], col_pattern: Optional[Union[str, Sequence[str]]] = None
) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]:
"""Extract saliva sample columns from a pandas dataframe.
Parameters
----------
data: :class:`~pandas.DataFrame`
dataframe to extract columns from
saliva_type: str or list of str
saliva type variable or list of saliva types which should be used to extract columns (e.g. 'cortisol')
col_pattern: str, optional
string pattern or list of string patterns to identify saliva columns.
If ``None``, it is attempted to automatically infer column names using :func:`get_saliva_column_suggestions()`.
If ``col_pattern`` is a list, it must be the same length like ``saliva_type``.
Returns
-------
:class:`~pandas.DataFrame` or dict
pandas dataframe with extracted columns or dict of such if ``saliva_type`` is a list
"""
if isinstance(saliva_type, list):
if isinstance(col_pattern, list) and len(saliva_type) is not len(col_pattern):
raise ValueError("'saliva_type' and 'col_pattern' must have same length!")
dict_result = {}
if col_pattern is None:
col_pattern = [None] * len(saliva_type)
for saliva, col_p in zip(saliva_type, col_pattern):
dict_result[saliva] = extract_saliva_columns(data=data, saliva_type=saliva, col_pattern=col_p)
return dict_result
if col_pattern is None:
col_suggs = get_saliva_column_suggestions(data, saliva_type)
if len(col_suggs) > 1:
raise ValueError(
"More than one possible column pattern was found! "
"Please check manually which pattern is correct: {}".format(col_suggs)
)
col_pattern = col_suggs[0]
return data.filter(regex=col_pattern)
def _sample_times_datetime_to_minute_apply(
sample_times: Union[pd.DataFrame, pd.Series]
) -> Union[pd.DataFrame, pd.Series]:
if isinstance(sample_times.to_numpy().flatten()[0], (pd.Timedelta, np.timedelta64)):
return sample_times.apply(pd.to_timedelta)
return sample_times.astype(str).apply(pd.to_datetime)
[docs]def sample_times_datetime_to_minute(sample_times: Union[pd.Series, pd.DataFrame]) -> Union[pd.Series, pd.DataFrame]:
"""Convert sample times from datetime or timedelta objects into minutes.
In order to compute certain saliva features (such as :func:`~biopsykit.saliva.auc` or
:func:`~biopsykit.saliva.slope`) the saliva sampling times are needed.
This function can be used to convert sampling times into minutes relative to the first saliva sample.
Parameters
----------
sample_times : :class:`~pandas.Series` or :class:`~pandas.DataFrame`
saliva sampling times in a Python datetime- or timedelta-related format.
If ``sample_times`` is a Series, it is assumed to be in long-format and will be unstacked into wide-format
along the `sample` level.
If ``sample_times`` is a DataFrame, it is assumed to be in wide-format already.
If values in ``sample_times`` are ``str``, they are assumed to be strings with time information only
(**not** including date), e.g., "09:00", "09:15", ...
Returns
-------
:class:`~pandas.DataFrame`
dataframe in wide-format with saliva sampling times in minutes relative to the first saliva sample
Raises
------
ValueError
if sample times are not in a datetime- or timedelta-related format
"""
if isinstance(sample_times.to_numpy().flatten()[0], str):
sample_times = _get_sample_times_str(sample_times)
if not isinstance(
sample_times.to_numpy().flatten()[0], (time, datetime, pd.Timedelta, np.timedelta64, np.datetime64)
):
raise TypeError(
"Sample times must be instance of `datetime.datetime()`, `datetime.time()`,"
" `np.datetime64`, `np.timedelta64`, or `pd.Timedelta`!"
)
is_series = isinstance(sample_times, pd.Series)
if is_series:
_assert_has_index_levels(sample_times, index_levels=["sample"], match_atleast=True)
# unstack the multi-index dataframe in the 'samples' level so that time differences can be computed in minutes.
# Then stack it back together
sample_times = sample_times.unstack(level="sample")
sample_times = _sample_times_datetime_to_minute_apply(sample_times)
sample_times = sample_times.diff(axis=1).apply(lambda s: (s.dt.total_seconds() / 60))
sample_times = sample_times.cumsum(axis=1)
sample_times.iloc[:, 0] = sample_times.iloc[:, 0].fillna(0)
if is_series:
sample_times = sample_times.stack()
return sample_times
def _get_sample_times_str(sample_times: Union[pd.Series, pd.DataFrame]) -> Union[pd.Series, pd.DataFrame]:
if isinstance(sample_times, pd.DataFrame):
return pd.to_timedelta(sample_times.stack()).unstack("sample")
return pd.to_timedelta(sample_times)
def _remove_s0(data: SalivaRawDataFrame) -> SalivaRawDataFrame:
"""Remove first saliva sample.
Parameters
----------
data : :class:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame`
saliva data in `SalivaRawDataFrame` format
Returns
-------
:class:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame`
saliva data in `SalivaRawDataFrame` format without the first saliva sample
"""
data = data.drop(0, level="sample", errors="ignore")
data = data.drop("0", level="sample", errors="ignore")
data = data.drop("S0", level="sample", errors="ignore")
return _SalivaRawDataFrame(data)
def _check_sample_times(sample_times: np.array) -> None:
"""Check that all sample times are monotonously increasing.
Parameters
----------
sample_times : array-like
list of sample times
Raises
------
ValueError
if values in ``sample_times`` are not monotonously increasing
"""
if np.any(np.diff(sample_times) <= 0):
raise ValueError("'sample_times' must be increasing!")
def _get_sample_times(
data: pd.DataFrame,
saliva_type: str,
sample_times: Optional[Union[np.array, Sequence[int]]] = None,
remove_s0: Optional[bool] = False,
) -> np.array:
if sample_times is None:
# check if dataframe has 'time' index
if "time" in data.index.names:
data = data.reset_index("time")
# check if dataframe has 'time' column
if "time" in data.columns:
sample_times = np.array(data.unstack(level="sample")["time"])
if np.all(sample_times == sample_times[0]):
# all subjects have the same saliva times
sample_times = sample_times[0]
else:
raise ValueError("No sample times specified!")
# ensure numpy
sample_times = np.squeeze(sample_times)
# check whether we have the same saliva times for all subjects (1d array) or not (2d array)
# and whether the input format is correct
sample_times = _sample_times_sanitize(data, sample_times, saliva_type)
_get_sample_times_check_dims(data, sample_times, saliva_type)
if remove_s0:
sample_times = sample_times[..., 1:]
return sample_times
def _sample_times_sanitize(data: pd.DataFrame, sample_times: arr_t, saliva_type: str) -> arr_t:
if sample_times.ndim == 1:
exp_shape = data.unstack(level="sample")[saliva_type].shape[1]
act_shape = sample_times.shape[0]
if act_shape != exp_shape and (act_shape % exp_shape) == 0:
# saliva times are in long-format => number of sample times corresponds to 2nd dimension
sample_times = np.array(sample_times.unstack("sample").squeeze())
return sample_times
def _get_sample_times_check_dims(data: pd.DataFrame, sample_times: arr_t, saliva_type: str):
if sample_times.ndim == 1:
exp_shape = data.unstack(level="sample")[saliva_type].shape[1]
act_shape = sample_times.shape[0]
# saliva times equal for all subjects
# => number of sample times must correspond to 2nd dimension of wide-format data
if act_shape != exp_shape:
raise ValueError(
"'sample_times' does not correspond to the number of saliva samples in 'data'! "
"Expected {}, got {}.".format(exp_shape, act_shape)
)
elif sample_times.ndim == 2:
# saliva time different for all subjects
exp_shape = data.unstack(level="sample")[saliva_type].shape
act_shape = sample_times.shape
if act_shape != exp_shape:
raise ValueError(
"Dimensions of 'sample_times' does not correspond to dimensions of 'data'! "
"Expected {}, got {}.".format(exp_shape, act_shape)
)
else:
raise ValueError(f"'sample_times' has invalid dimensions! Expected 1 or 2, got {sample_times.ndim}")
def _get_saliva_idx_labels(
columns: pd.Index,
sample_labels: Optional[Union[Tuple, Sequence]] = None,
sample_idx: Optional[Union[Tuple[int, int], Sequence[int]]] = None,
) -> Tuple[Sequence, Sequence]:
"""Get sample labels and indices from data, if only one of both was specified.
Parameters
----------
columns : :class:`pandas.Index`
dataframe columns
sample_labels : list, optional
list of sample labels
sample_idx : list, optional
list of sample indices
Returns
-------
sample_labels:
list of sample labels
sample_idx:
list of sample indices
"""
if sample_labels is not None:
try:
sample_idx = [columns.get_loc(label) for label in sample_labels]
except KeyError as e:
raise IndexError(f"Invalid sample_labels `{sample_labels}`") from e
else:
try:
# ensure list
sample_idx = list(sample_idx)
sample_labels = columns[sample_idx]
except IndexError as e:
raise IndexError("`sample_idx[1]` is out of bounds!") from e
if len(sample_idx) != 2:
raise IndexError(f"Exactly 2 indices needed for computing slope. Got {len(sample_idx)} indices.")
sample_idx = _get_saliva_idx_labels_sanitize(sample_idx, columns)
return sample_labels, sample_idx
def _get_saliva_idx_labels_sanitize(sample_idx: List[int], columns: Sequence[str]):
# replace idx values like '-1' with the actual index
if sample_idx[0] < 0:
sample_idx[0] = len(columns) + sample_idx[0]
if sample_idx[1] < 0:
sample_idx[1] = len(columns) + sample_idx[1]
# check that second index is bigger than first index
if sample_idx[0] >= sample_idx[1]:
raise IndexError(f"`sample_idx[1]` must be bigger than `sample_idx[0]`. Got {sample_idx}")
return sample_idx
def _get_group_cols(
data: SalivaRawDataFrame, group_cols: Union[str, Sequence[str]], group_type: str, function_name: str
) -> List[str]:
"""Get appropriate columns for grouping.
Parameters
----------
data : :class:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame`
saliva data in `SalivaRawDataFrame` format
group_cols : str or list of str
list of index levels and column names to group by
group_type : str
which index level should be grouped by: 'subject' (for computing features per subject, along 'sample' axis)
or 'sample' (for computing values per sample, along 'subject' axis)
function_name : str
function name for error message: 'standard_features' or 'mean_se'
Returns
-------
list of str
list of group by columns
"""
if isinstance(group_cols, str):
# ensure list
group_cols = [group_cols]
elif group_cols is None:
# group by all available index levels
group_cols = list(data.index.names)
group_cols.remove(group_type)
if any(col not in list(data.index.names) + list(data.columns) for col in group_cols):
# check for valid groupers
raise ValueError(
"Computing {} failed: Not all of '{}' are valid index levels or column names!".format(
function_name, group_cols
)
)
return group_cols
_dict_saliva_type_suggs: Dict[str, Sequence[str]] = {
"cortisol": ["cortisol", "cort", "Cortisol", "_c_"],
"amylase": ["amylase", "amy", "Amylase", "sAA"],
"il6": ["il6", "IL6", "il-6", "IL-6", "il_6", "IL_6"],
}
"""Dictionary containing possible column patterns for different saliva types."""