Source code for biopsykit.io.biomarker

"""Module containing different I/O functions for biomarker data (saliva, dried blood spots, IL-6)."""

from collections.abc import Sequence
from pathlib import Path

import numpy as np
import pandas as pd

from biopsykit.io.io import _apply_index_cols
from biopsykit.utils._datatype_validation_helper import _assert_file_extension, _assert_has_columns
from biopsykit.utils._types_internal import path_t
from biopsykit.utils.dtypes import (
    BiomarkerRawDataFrame,
    SalivaRawDataFrame,
    SubjectConditionDataFrame,
    _BiomarkerRawDataFrame,
    _SalivaRawDataFrame,
    _SubjectConditionDataFrame,
    is_biomarker_raw_dataframe,
    is_saliva_raw_dataframe,
    is_subject_condition_dataframe,
)

__all__ = ["load_biomarker_results", "load_saliva_plate", "load_saliva_wide_format", "save_saliva"]

_DATA_COL_NAMES = {
    "cortisol": "cortisol (nmol/l)",
    "amylase": "amylase (U/ml)",
    "crp": "CRP (µg/ml)",
}

_MISSING_DATA = ["no sample", "empty", "insufficient volume"]


[docs]def load_saliva_plate( file_path: path_t, saliva_type: str, sample_id_col: str | None = None, data_col: str | None = None, id_col_names: Sequence[str] | None = None, regex_str: str | None = None, sample_times: Sequence[int] | None = None, condition_list: Sequence | dict[str, Sequence] | pd.Index | None = None, **kwargs, ) -> SalivaRawDataFrame: r"""Read saliva from an Excel sheet in 'plate' format. Wraps load_biomarker_results() for compatibilty. This function automatically extracts identifier like subject, day and sample IDs from the saliva sample names. To extract them, a regular expression string can be passed via ``regex_str``. Here are some examples on how sample identifiers might look like and what the corresponding ``regex_str`` would output: * "Vp01 S1" => ``r"(Vp\d+) (S\d)"`` (this is the default pattern, you can also just set ``regex_str`` to ``None``) => data ``[Vp01, S1]`` in two columns: ``subject``, ``sample`` (unless column names are explicitly specified in ``data_col_names``) * "Vp01 T1 S1" ... "Vp01 T1 S5" (only *numeric* characters in day/sample) => ``r"(Vp\d+) (T\d) (S\d)"`` => three columns: ``subject``, ``sample`` with data ``[Vp01, T1, S1]`` (unless column names are explicitly specified in ``data_col_names``) * "Vp01 T1 S1" ... "Vp01 T1 SA" (also *letter* characters in day/sample) => ``r"(Vp\d+) (T\w) (S\w)"`` => three columns: ``subject``, ``sample`` with data ``[Vp01, T1, S1]`` (unless column names are explicitly specified in ``data_col_names``) If you **don't** want to extract the 'S' or 'T' prefixes in saliva or day IDs, respectively, you have to move it **out** of the capture group in the ``regex_str`` (round brackets), like this: ``(S\d)`` (would give ``S1``, ``S2``, ...) => ``S(\d)`` (would give ``1``, ``2``, ...) Parameters ---------- file_path: :class:`~pathlib.Path` or str path to the Excel sheet in 'plate' format containing saliva data saliva_type: str saliva type to load from file sample_id_col: str, optional column name of the Excel sheet containing the sample ID. Default: "sample ID" data_col: str, optional column name of the Excel sheet containing saliva data to be analyzed. Default: Select default column name based on ``biomarker_type``, e.g. ``cortisol`` => ``cortisol (nmol/l)`` id_col_names: list of str, optional names of the extracted ID column names. ``None`` to use the default column names (['subject', 'day', 'sample']) regex_str: str, optional regular expression to extract subject ID, day ID and sample ID from the sample identifier. ``None`` to use default regex string (``r"(Vp\d+) (S\d)"``) sample_times: list of int, optional times at which saliva samples were collected condition_list: 1d-array, optional list of conditions which subjects were assigned to **kwargs Additional parameters that are passed to :func:`pandas.read_excel` Returns ------- data : :class:`~biopsykit.utils.dtypes.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format Raises ------ :exc:`~biopsykit.utils.exceptions.FileExtensionError` if file is no Excel file (.xls or .xlsx) ValueError if any saliva sample can not be converted into a float (e.g. because there was text in one of the columns) :exc:`~biopsykit.utils.exceptions.ValidationError` if imported data can not be parsed to a SalivaRawDataFrame """ if regex_str is None: regex_str = r"(Vp\d+) (S\d)" return _SalivaRawDataFrame( load_biomarker_results( file_path, biomarker_type=saliva_type, sample_id_col=sample_id_col, data_col=data_col, id_col_names=id_col_names, regex_str=regex_str, sample_times=sample_times, condition_list=condition_list, **kwargs, ) )
[docs]def save_saliva( file_path: path_t, data: SalivaRawDataFrame, saliva_type: str | None = "cortisol", as_wide_format: bool | None = False, ): """Save saliva data to csv file. Parameters ---------- file_path: :class:`~pathlib.Path` or str file path to export. Must be a csv or an Excel file data : :class:`~biopsykit.utils.dtypes.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format saliva_type : str type of saliva data in the dataframe as_wide_format : bool, optional ``True`` to save data in wide format (and flatten all index levels), ``False`` to save data in long-format. Default: ``False`` Raises ------ :exc:`~biopsykit.utils.exceptions.ValidationError` if ``data`` is not a SalivaRawDataFrame :exc:`~biopsykit.utils.exceptions.FileExtensionError` if ``file_path`` is not a csv or Excel file """ # ensure pathlib file_path = Path(file_path) _assert_file_extension(file_path, [".csv", ".xls", ".xlsx"]) is_saliva_raw_dataframe(data, saliva_type) if as_wide_format: levels = list(data.index.names) levels.remove("subject") data = data.unstack(level=levels) data.columns = ["_".join(col) for col in data.columns] if file_path.suffix in [".csv"]: data.to_csv(file_path) else: writer = pd.ExcelWriter(file_path, engine="xlsxwriter") # pylint:disable=abstract-class-instantiated data.to_excel(writer) writer.close()
[docs]def load_saliva_wide_format( file_path: path_t, saliva_type: str, subject_col: str | None = None, condition_col: str | None = None, additional_index_cols: str | Sequence[str] | None = None, sample_times: Sequence[int] | None = None, **kwargs, ) -> SalivaRawDataFrame: """Load saliva data that is in wide-format from csv file. It will return a `SalivaRawDataFrame`, a long-format dataframe that complies with BioPsyKit's naming convention, i.e., the subject ID index will be named ``subject``, the sample index will be names ``sample``, and the value column will be named after the saliva biomarker type. Parameters ---------- file_path: :class:`~pathlib.Path` or str path to file saliva_type: str saliva type to load from file. Example: ``cortisol`` subject_col: str, optional name of column containing subject IDs or ``None`` to use the default column name ``subject``. According to BioPsyKit's convention, the subject ID column is expected to have the name ``subject``. If the subject ID column in the file has another name, the column will be renamed in the dataframe returned by this function. Default: ``None`` condition_col : str, optional name of the column containing condition assignments or ``None`` if no conditions are present. According to BioPsyKit's convention, the condition column is expected to have the name ``condition``. If the condition column in the file has another name, the column will be renamed in the dataframe returned by this function. Default: ``None`` additional_index_cols : str or list of str, optional additional index levels to be added to the dataframe, e.g., "day" index. Can either be a string or a list strings to indicate column name(s) that should be used as index level(s), or ``None`` for no additional index levels. Default: ``None`` sample_times: list of int, optional times at which saliva samples were collected or ``None`` if no sample times should be specified. Default: ``None`` **kwargs Additional parameters that are passed to :func:`pandas.read_csv` or :func:`pandas.read_excel` Returns ------- data : :class:`~biopsykit.utils.dtypes.SalivaRawDataFrame` saliva data in `SalivaRawDataFrame` format Raises ------ :exc:`~biopsykit.utils.exceptions.FileExtensionError` if file is no csv or Excel file """ # ensure pathlib file_path = Path(file_path) _assert_file_extension(file_path, [".csv", ".xls", ".xlsx"]) data = _read_dataframe(file_path, **kwargs) if subject_col is None: subject_col = "subject" _assert_has_columns(data, [[subject_col]]) if subject_col != "subject": # rename column data = data.rename(columns={subject_col: "subject"}) subject_col = "subject" index_cols = [subject_col] data, condition_col = _get_condition_col(data, condition_col) index_cols = _get_index_cols(condition_col, index_cols, additional_index_cols) data = _apply_index_cols(data, index_cols=index_cols) num_subjects = len(data) data.columns = pd.MultiIndex.from_product([[saliva_type], data.columns], names=[None, "sample"]) data = data.stack(future_stack=True) _check_num_samples(len(data), num_subjects) if sample_times is not None: _check_sample_times(len(data), num_subjects, sample_times) data["time"] = np.array(sample_times * num_subjects) is_saliva_raw_dataframe(data, saliva_type) return _SalivaRawDataFrame(data)
[docs]def load_biomarker_results( file_path: path_t, biomarker_type: str | None = None, sample_id_col: str | None = None, data_col: str | None = None, id_col_names: Sequence[str] | None = None, regex_str: str | None = None, sample_times: Sequence[int] | None = None, condition_list: Sequence | dict[str, Sequence] | pd.Index | None = None, check_number_samples: bool = True, replace_strings_missing: bool = True, **kwargs, ) -> BiomarkerRawDataFrame: r"""Load biomarker results from Excel file. Parameters ---------- file_path: :class:`~pathlib.Path` or str path to file biomarker_type: str, optional biomarker type to load from file. Example: ``cortisol`` sample_id_col: str, optional name of column containing sample IDs or ``None`` to use the default column name ``sample ID``. data_col: str, optional name of column containing biomarker data or ``None`` to use the default column name. id_col_names: list of str, optional names of the extracted ID column names. ``None`` to use the default column names (['subject', 'day', 'sample']) regex_str: str, optional regular expression to extract subject, day, and sample ID from sample ID column. ``None`` to use the default regular expression ``r"(VP\\d+)-(T\\w)-(B\\w)"``. sample_times: list of int, optional times at which samples were collected or ``None`` if no sample times should be specified. Default: ``None`` condition_list: list of str, dict of str to list of str, or :class:`~pandas.Index`, optional list of condition names or dictionary of condition names to list of condition assignments or :class:`~pandas.Index` of condition names or ``None`` if no conditions are present. Default: ``None`` skiprows: int, optional, default: 2, passed to :func:`pandas.read_excel` check_num_samples: bool, optional, default: True ``True`` to check that the number of samples is the same for all subjects, ``False`` to skip this check check_number_samples: bool, optional ``True`` to check that the number of samples is equal for all subjects, ``False`` to skip this check. Default: ``True`` replace_strings_missing : bool, optional ``True`` to replace strings indicating missing in the biomarker data with NaN values, ``False`` to keep the strings. Default: ``True`` **kwargs Additional parameters that are passed to :func:`pandas.read_excel` Returns ------- data : :class:`~biopsykit.utils.dtypes.BiomarkerRawDataFrame` biomarker data in `BiomarkerRawDataFrame` format Raises ------ :exc:`~biopsykit.utils.exceptions.FileExtensionError` if file is no Excel file """ # ensure pathlib file_path = Path(file_path) _assert_file_extension(file_path, (".xls", ".xlsx")) # TODO add remove_nan option (all or any) if regex_str is None: regex_str = r"(VP\d+)-(T\w)-(B\w)" if sample_id_col is None: sample_id_col = "sample ID" if data_col is None: data_col = _DATA_COL_NAMES[biomarker_type] df_biomarker = _try_read_plate_excel(file_path, sample_id_col, data_col, **kwargs) cols = df_biomarker[sample_id_col].str.extract(regex_str) id_col_names = _get_id_columns(id_col_names, cols) df_biomarker[id_col_names] = cols df_biomarker = df_biomarker.drop(columns=[sample_id_col], errors="ignore") df_biomarker = df_biomarker.rename(columns={data_col: biomarker_type}) df_biomarker = df_biomarker.set_index(id_col_names) if condition_list is not None: df_biomarker = _apply_condition_list(df_biomarker, condition_list) num_subjects = len(df_biomarker.index.get_level_values("subject").unique()) if check_number_samples: _check_num_samples(len(df_biomarker), num_subjects) if sample_times: _check_sample_times(len(df_biomarker), num_subjects, sample_times) df_biomarker["time"] = np.array(sample_times * num_subjects) df_biomarker[biomarker_type] = df_biomarker[biomarker_type].astype(str).str.replace(r"\s+", " ", regex=True) if replace_strings_missing: df_biomarker[biomarker_type] = df_biomarker[biomarker_type].replace(dict.fromkeys(_MISSING_DATA, "nan")) try: df_biomarker[biomarker_type] = df_biomarker[biomarker_type].astype(float) except ValueError as e: raise ValueError( f"""Error converting all biomarker values into numbers: '{e}' Please check your biomarker values whether there is any text etc. in the column '{data_col}' and delete the values or replace them by NaN!""" ) from e is_biomarker_raw_dataframe(df_biomarker, biomarker_type=biomarker_type) return _BiomarkerRawDataFrame(df_biomarker)
def _get_index_cols(condition_col: str, index_cols: Sequence[str], additional_index_cols: Sequence[str]): if condition_col is not None: index_cols = [condition_col, *index_cols] if additional_index_cols is None: additional_index_cols = [] if isinstance(additional_index_cols, str): additional_index_cols = [additional_index_cols] return index_cols + additional_index_cols def _try_read_plate_excel(file_path: Path, sample_id_col: str, data_col: str, **kwargs): for skiprows in [1, 2, 3]: try: return pd.read_excel(file_path, skiprows=skiprows, usecols=[sample_id_col, data_col], **kwargs) except ValueError as e: last_exception = e continue raise ValueError("Could not read plate Excel file. Please check the file format.") from last_exception def _read_dataframe(file_path: Path, **kwargs): if file_path.suffix in [".csv"]: return pd.read_csv(file_path, **kwargs) return pd.read_excel(file_path, **kwargs) def _check_num_samples(num_samples: int, num_subjects: int): """Check that number of imported samples is the same for all subjects. Parameters ---------- num_samples : int total number of saliva samples in the current dataframe num_subjects : int total number of subjects in the current dataframe Raises ------ ValueError if number of samples is not equal for all subjects """ if num_samples % num_subjects != 0: raise ValueError( f"Error during import: Number of samples not equal for all subjects! " f"Got {num_samples} samples for {num_subjects} subjects." ) def _check_sample_times(num_samples: int, num_subjects: int, sample_times: Sequence[int]): """Check that sample times have the correct number of samples and are monotonously increasing. Parameters ---------- num_samples : int total number of saliva samples in the current dataframe num_subjects : int total number of subjects in the current dataframe sample_times : array-like list of sample times Raises ------ ValueError if values in ``sample_times`` are not monotonously increasing or if number of saliva times does not match the number of saliva samples per subject """ if np.any(np.diff(sample_times) <= 0): raise ValueError("`saliva_times` must be increasing!") if (len(sample_times) * num_subjects) != num_samples: raise ValueError( f"Length of `saliva_times` does not match the number of saliva samples! " f"Expected: {int(num_samples / num_subjects)}, got: {len(sample_times)}" ) def _parse_condition_list( data: pd.DataFrame, condition_list: Sequence | dict[str, Sequence] | pd.Index ) -> SubjectConditionDataFrame: if isinstance(condition_list, list | np.ndarray): # Add Condition as new index level condition_list = pd.DataFrame( condition_list, columns=["condition"], index=data.index.get_level_values("subject").unique(), ) elif isinstance(condition_list, dict): condition_list = [(subject, cond) for cond in condition_list for subject in condition_list[cond]] condition_list = pd.DataFrame(condition_list, columns=["subject", "condition"]) condition_list = condition_list.set_index("subject") elif isinstance(condition_list, pd.DataFrame): condition_list = condition_list.reset_index().set_index("subject") is_subject_condition_dataframe(condition_list) return _SubjectConditionDataFrame(condition_list) def _apply_condition_list( data: pd.DataFrame, condition_list: Sequence | dict[str, Sequence] | pd.Index | None = None, ): condition_list = _parse_condition_list(data, condition_list) data = ( data.join(condition_list).set_index("condition", append=True).reorder_levels(["condition", "subject", "sample"]) ) return data def _get_id_columns(id_col_names: Sequence[str], extracted_cols: pd.DataFrame): if id_col_names is None: id_col_names = ["subject", "sample"] if len(extracted_cols.columns) == 3: id_col_names = ["subject", "day", "sample"] elif len(id_col_names) != len(extracted_cols.columns): raise ValueError( f"Number of 'id_col_names' must match length of extracted index columns! " f"Expected {len(extracted_cols)}, got {len(id_col_names)}." ) return id_col_names def _get_condition_col(data: pd.DataFrame, condition_col: str) -> tuple[pd.DataFrame, str]: if condition_col is None: if "condition" in data.columns: condition_col = "condition" else: _assert_has_columns(data, [[condition_col]]) if condition_col != "condition": # rename column data = data.rename(columns={condition_col: "condition"}) condition_col = "condition" return data, condition_col