"""Module containing different I/O functions for biomarker data (saliva, dried blood spots, IL-6)."""
from pathlib import Path
from typing import Dict, Optional, Sequence, Tuple, Union
import numpy as np
import pandas as pd
from biopsykit.io.io import _apply_index_cols
from biopsykit.utils._datatype_validation_helper import _assert_file_extension, _assert_has_columns
from biopsykit.utils._types import path_t
from biopsykit.utils.datatype_helper import (
BiomarkerRawDataFrame,
SalivaRawDataFrame,
SubjectConditionDataFrame,
_BiomarkerRawDataFrame,
_SalivaRawDataFrame,
_SubjectConditionDataFrame,
is_biomarker_raw_dataframe,
is_saliva_raw_dataframe,
is_subject_condition_dataframe,
)
__all__ = ["load_saliva_plate", "save_saliva", "load_saliva_wide_format", "load_biomarker_results"]
_DATA_COL_NAMES = {
"cortisol": "cortisol (nmol/l)",
"amylase": "amylase (U/ml)",
"crp": "CRP (µg/ml)",
}
[docs]def load_saliva_plate(
file_path: path_t,
saliva_type: str,
sample_id_col: Optional[str] = None,
data_col: Optional[str] = None,
id_col_names: Optional[Sequence[str]] = None,
regex_str: Optional[str] = None,
sample_times: Optional[Sequence[int]] = None,
condition_list: Optional[Union[Sequence, Dict[str, Sequence], pd.Index]] = None,
**kwargs,
) -> SalivaRawDataFrame:
r"""Read saliva from an Excel sheet in 'plate' format. Wraps load_biomarker_results() for compatibilty.
This function automatically extracts identifier like subject, day and sample IDs from the saliva sample names.
To extract them, a regular expression string can be passed via ``regex_str``.
Here are some examples on how sample identifiers might look like and what the corresponding
``regex_str`` would output:
* "Vp01 S1"
=> ``r"(Vp\d+) (S\d)"`` (this is the default pattern, you can also just set ``regex_str`` to ``None``)
=> data ``[Vp01, S1]`` in two columns: ``subject``, ``sample``
(unless column names are explicitly specified in ``data_col_names``)
* "Vp01 T1 S1" ... "Vp01 T1 S5" (only *numeric* characters in day/sample)
=> ``r"(Vp\d+) (T\d) (S\d)"``
=> three columns: ``subject``, ``sample`` with data ``[Vp01, T1, S1]``
(unless column names are explicitly specified in ``data_col_names``)
* "Vp01 T1 S1" ... "Vp01 T1 SA" (also *letter* characters in day/sample)
=> ``r"(Vp\d+) (T\w) (S\w)"``
=> three columns: ``subject``, ``sample`` with data ``[Vp01, T1, S1]``
(unless column names are explicitly specified in ``data_col_names``)
If you **don't** want to extract the 'S' or 'T' prefixes in saliva or day IDs, respectively,
you have to move it **out** of the capture group in the ``regex_str`` (round brackets), like this:
``(S\d)`` (would give ``S1``, ``S2``, ...)
=> ``S(\d)`` (would give ``1``, ``2``, ...)
Parameters
----------
file_path: :class:`~pathlib.Path` or str
path to the Excel sheet in 'plate' format containing saliva data
saliva_type: str
saliva type to load from file
sample_id_col: str, optional
column name of the Excel sheet containing the sample ID. Default: "sample ID"
data_col: str, optional
column name of the Excel sheet containing saliva data to be analyzed.
Default: Select default column name based on ``biomarker_type``, e.g. ``cortisol`` => ``cortisol (nmol/l)``
id_col_names: list of str, optional
names of the extracted ID column names. ``None`` to use the default column names (['subject', 'day', 'sample'])
regex_str: str, optional
regular expression to extract subject ID, day ID and sample ID from the sample identifier.
``None`` to use default regex string (``r"(Vp\d+) (S\d)"``)
sample_times: list of int, optional
times at which saliva samples were collected
condition_list: 1d-array, optional
list of conditions which subjects were assigned to
**kwargs
Additional parameters that are passed to :func:`pandas.read_excel`
Returns
-------
data : :class:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame`
saliva data in `SalivaRawDataFrame` format
Raises
------
:exc:`~biopsykit.utils.exceptions.FileExtensionError`
if file is no Excel file (.xls or .xlsx)
ValueError
if any saliva sample can not be converted into a float (e.g. because there was text in one of the columns)
:exc:`~biopsykit.utils.exceptions.ValidationError`
if imported data can not be parsed to a SalivaRawDataFrame
"""
if regex_str is None:
regex_str = r"(Vp\d+) (S\d)"
return _SalivaRawDataFrame(
load_biomarker_results(
file_path,
biomarker_type=saliva_type,
sample_id_col=sample_id_col,
data_col=data_col,
id_col_names=id_col_names,
regex_str=regex_str,
sample_times=sample_times,
condition_list=condition_list,
**kwargs,
)
)
[docs]def save_saliva(
file_path: path_t,
data: SalivaRawDataFrame,
saliva_type: Optional[str] = "cortisol",
as_wide_format: Optional[bool] = False,
):
"""Save saliva data to csv file.
Parameters
----------
file_path: :class:`~pathlib.Path` or str
file path to export. Must be a csv or an Excel file
data : :class:`~biopsykit.utils.datatype_helper.SalivaRawDataFrame`
saliva data in `SalivaRawDataFrame` format
saliva_type : str
type of saliva data in the dataframe
as_wide_format : bool, optional
``True`` to save data in wide format (and flatten all index levels), ``False`` to save data in long-format.
Default: ``False``
Raises
------
:exc:`~biopsykit.utils.exceptions.ValidationError`
if ``data`` is not a SalivaRawDataFrame
:exc:`~biopsykit.utils.exceptions.FileExtensionError`
if ``file_path`` is not a csv or Excel file
"""
# ensure pathlib
file_path = Path(file_path)
_assert_file_extension(file_path, [".csv", ".xls", ".xlsx"])
is_saliva_raw_dataframe(data, saliva_type)
data = data[saliva_type]
if as_wide_format:
levels = list(data.index.names)
levels.remove("subject")
data = data.unstack(level=levels)
data.columns = ["_".join(col) for col in data.columns]
if file_path.suffix in [".csv"]:
data.to_csv(file_path)
else:
writer = pd.ExcelWriter(file_path, engine="xlsxwriter") # pylint:disable=abstract-class-instantiated
data.to_excel(writer)
writer.close()
[docs]def load_biomarker_results(
file_path: path_t,
biomarker_type: Optional[str] = None,
sample_id_col: Optional[str] = None,
data_col: Optional[str] = None,
id_col_names: Optional[Sequence[str]] = None,
regex_str: Optional[str] = None,
sample_times: Optional[Sequence[int]] = None,
condition_list: Optional[Union[Sequence, Dict[str, Sequence], pd.Index]] = None,
**kwargs,
) -> BiomarkerRawDataFrame:
r"""Load biomarker results from Excel file.
Parameters
----------
file_path: :class:`~pathlib.Path` or str
path to file
dbs_type: str, optional
biomarker type to load from file. Example: ``crp``
sample_id_col: str, optional
name of column containing sample IDs or ``None`` to use the default column name ``sample ID``.
data_col: str, optional
name of column containing biomarker data or ``None`` to use the default column name.
id_col_names: list of str, optional
names of the extracted ID column names. ``None`` to use the default column names (['subject', 'day', 'sample'])
regex_str: str, optional
regular expression to extract subject, day, and sample ID from sample ID column.
``None`` to use the default regular expression ``r"(VP\\d+)-(T\\w)-(B\\w)"``.
sample_times: list of int, optional
times at which samples were collected or ``None`` if no sample times should be specified.
Default: ``None``
condition_list: list of str, dict of str to list of str, or :class:`~pandas.Index`, optional
list of condition names or dictionary of condition names to list of condition assignments or
:class:`~pandas.Index` of condition names or ``None`` if no conditions are present.
Default: ``None``
**kwargs
Additional parameters that are passed to :func:`pandas.read_csv` or :func:`pandas.read_excel`
Returns
-------
data : :class:`~biopsykit.utils.datatype_helper.BiomarkerRawDataFrame`
biomarker data in `BiomarkerRawDataFrame` format
Raises
------
:exc:`~biopsykit.utils.exceptions.FileExtensionError`
if file is no Excel file
"""
# ensure pathlib
file_path = Path(file_path)
_assert_file_extension(file_path, (".xls", ".xlsx"))
# TODO add remove_nan option (all or any)
if regex_str is None:
regex_str = r"(VP\d+)-(T\w)-(B\w)"
if sample_id_col is None:
sample_id_col = "sample ID"
if data_col is None:
data_col = _DATA_COL_NAMES[biomarker_type]
df_biomarker = pd.read_excel(file_path, skiprows=2, usecols=[sample_id_col, data_col], **kwargs)
cols = df_biomarker[sample_id_col].str.extract(regex_str)
id_col_names = _get_id_columns(id_col_names, cols)
df_biomarker[id_col_names] = cols
df_biomarker = df_biomarker.drop(columns=[sample_id_col], errors="ignore")
df_biomarker = df_biomarker.rename(columns={data_col: biomarker_type})
df_biomarker = df_biomarker.set_index(id_col_names)
if condition_list is not None:
df_biomarker = _apply_condition_list(df_biomarker, condition_list)
num_subjects = len(df_biomarker.index.get_level_values("subject").unique())
_check_num_samples(len(df_biomarker), num_subjects)
if sample_times:
_check_sample_times(len(df_biomarker), num_subjects, sample_times)
df_biomarker["time"] = np.array(sample_times * num_subjects)
try:
df_biomarker[biomarker_type] = df_biomarker[biomarker_type].astype(float)
except ValueError as e:
raise ValueError(
"""Error converting all biomarker values into numbers: '{}'
Please check your biomarker values whether there is any text etc. in the column '{}'
and delete the values or replace them by NaN!""".format(
e, data_col
)
) from e
is_biomarker_raw_dataframe(df_biomarker, biomarker_type=biomarker_type)
return _BiomarkerRawDataFrame(df_biomarker)
def _get_index_cols(condition_col: str, index_cols: Sequence[str], additional_index_cols: Sequence[str]):
if condition_col is not None:
index_cols = [condition_col, *index_cols]
if additional_index_cols is None:
additional_index_cols = []
if isinstance(additional_index_cols, str):
additional_index_cols = [additional_index_cols]
return index_cols + additional_index_cols
def _read_dataframe(file_path: Path, **kwargs):
if file_path.suffix in [".csv"]:
return pd.read_csv(file_path, **kwargs)
return pd.read_excel(file_path, **kwargs)
def _check_num_samples(num_samples: int, num_subjects: int):
"""Check that number of imported samples is the same for all subjects.
Parameters
----------
num_samples : int
total number of saliva samples in the current dataframe
num_subjects : int
total number of subjects in the current dataframe
Raises
------
ValueError
if number of samples is not equal for all subjects
"""
if num_samples % num_subjects != 0:
raise ValueError(
"Error during import: Number of samples not equal for all subjects! Got {} samples for {} subjects.".format(
num_samples, num_subjects
)
)
def _check_sample_times(num_samples: int, num_subjects: int, sample_times: Sequence[int]):
"""Check that sample times have the correct number of samples and are monotonously increasing.
Parameters
----------
num_samples : int
total number of saliva samples in the current dataframe
num_subjects : int
total number of subjects in the current dataframe
sample_times : array-like
list of sample times
Raises
------
ValueError
if values in ``sample_times`` are not monotonously increasing or
if number of saliva times does not match the number of saliva samples per subject
"""
if np.any(np.diff(sample_times) <= 0):
raise ValueError("`saliva_times` must be increasing!")
if (len(sample_times) * num_subjects) != num_samples:
raise ValueError(
"Length of `saliva_times` does not match the number of saliva samples! Expected: {}, got: {}".format(
int(num_samples / num_subjects), len(sample_times)
)
)
def _parse_condition_list(
data: pd.DataFrame, condition_list: Union[Sequence, Dict[str, Sequence], pd.Index]
) -> SubjectConditionDataFrame:
if isinstance(condition_list, (list, np.ndarray)):
# Add Condition as new index level
condition_list = pd.DataFrame(
condition_list,
columns=["condition"],
index=data.index.get_level_values("subject").unique(),
)
elif isinstance(condition_list, dict):
condition_list = [(subject, cond) for cond in condition_list for subject in condition_list[cond]]
condition_list = pd.DataFrame(condition_list, columns=["subject", "condition"])
condition_list = condition_list.set_index("subject")
elif isinstance(condition_list, pd.DataFrame):
condition_list = condition_list.reset_index().set_index("subject")
is_subject_condition_dataframe(condition_list)
return _SubjectConditionDataFrame(condition_list)
def _apply_condition_list(
data: pd.DataFrame,
condition_list: Optional[Union[Sequence, Dict[str, Sequence], pd.Index]] = None,
):
condition_list = _parse_condition_list(data, condition_list)
data = (
data.join(condition_list).set_index("condition", append=True).reorder_levels(["condition", "subject", "sample"])
)
return data
def _get_id_columns(id_col_names: Sequence[str], extracted_cols: pd.DataFrame):
if id_col_names is None:
id_col_names = ["subject", "sample"]
if len(extracted_cols.columns) == 3:
id_col_names = ["subject", "day", "sample"]
elif len(id_col_names) != len(extracted_cols.columns):
raise ValueError(
"Number of 'id_col_names' must match length of extracted index columns! Expected {}, got {}.".format(
len(extracted_cols), len(id_col_names)
)
)
return id_col_names
def _get_condition_col(data: pd.DataFrame, condition_col: str) -> Tuple[pd.DataFrame, str]:
if condition_col is None:
if "condition" in data.columns:
condition_col = "condition"
else:
_assert_has_columns(data, [[condition_col]])
if condition_col != "condition":
# rename column
data = data.rename(columns={condition_col: "condition"})
condition_col = "condition"
return data, condition_col