Source code for biopsykit.io.nilspod

"""Module for importing data recorded by NilsPod sensors."""
import datetime
import re
import warnings
from pathlib import Path
from typing import Dict, Literal, Optional, Sequence, Tuple, Union

import numpy as np
import pandas as pd
from biopsykit.utils._datatype_validation_helper import _assert_file_extension, _assert_is_dir, _assert_is_dtype
from biopsykit.utils._types import path_t
from biopsykit.utils.time import tz
from nilspodlib import Dataset, SyncedSession

COUNTER_INCONSISTENCY_HANDLING = Literal["raise", "warn", "ignore"]
"""Available behavior types when dealing with NilsPod counter inconsistencies."""

__all__ = [
    "load_dataset_nilspod",
    "load_synced_session_nilspod",
    "load_csv_nilspod",
    "load_folder_nilspod",
    "check_nilspod_dataset_corrupted",
    "get_nilspod_dataset_corrupted_info",
]


[docs]def load_dataset_nilspod( file_path: Optional[path_t] = None, dataset: Optional[Dataset] = None, datastreams: Optional[Union[str, Sequence[str]]] = None, handle_counter_inconsistency: Optional[COUNTER_INCONSISTENCY_HANDLING] = "raise", **kwargs, ) -> Tuple[pd.DataFrame, float]: """Load NilsPod recording and convert into dataframe. To load a dataset either a :class:`~nilspodlib.dataset.Dataset` object (via ``dataset`` parameter) or the path to the binary file (via ``file_path`` variable) can be passed. Parameters ---------- file_path : :class:`~pathlib.Path` or str, optional path to binary file dataset : :class:`~nilspodlib.dataset.Dataset`, optional Dataset object datastreams : str or list of str, optional list of datastreams if only specific datastreams of the dataset object should be imported or ``None`` to load all datastreams. Datastreams that are not part of the current dataset will be silently ignored. Default: ``None`` handle_counter_inconsistency : {"raise", "warn", "ignore"}, optional how to handle if counter of dataset is not monotonously increasing, which might be an indicator for a corrupted dataset: * "raise" (default): raise an error * "warn": issue a warning but still return a dataframe * "ignore": ignore the counter check result **kwargs optional arguments passed to :func:`~nilspodlib.Dataset.from_bin_file`, such as: legacy_support : {"error", "warn", "resolve"}, optional Flag indicating how to deal with older NilsPod firmware versions: * "error": raise an error if an unsupported version is detected * "warn": issue a warning and parse the file without modification * "resolve" (default): perform a legacy conversion to load old files. If no suitable conversion is found, an error is raised. See the :any:`nilspodlib.legacy` package and the README of ``nilspodlib`` to learn more about available conversions. timezone : str or :class:`datetime.tzinfo`, optional timezone of the acquired data, either as string of as tzinfo object. Default: "Europe/Berlin" Returns ------- tuple df : :class:`~pandas.DataFrame` dataframe of imported dataset fs : float sampling rate Raises ------ :exc:`~biopsykit.utils.exceptions.FileExtensionError` if `file_path` is specified and file is not a binary (.bin) file ValueError if neither `file_path` nor `dataset` are supplied as parameter, if ``handle_counter_inconsistency`` is ``raise`` and :class:`~nilspodlib.dataset.Dataset` counter is inconsistent (not monotonously increasing), if ``legacy_support`` is ``raise`` and so suitable conversion can be found for this file version. See Also -------- :class:`~nilspodlib.dataset.Dataset` NilsPod Dataset Examples -------- >>> from biopsykit.io.nilspod import load_dataset_nilspod >>> # Option 1: Import data by passing file name >>> file_path = "./<filename-of-nilspod-data>.bin" >>> # load dataset with all datastreams >>> df, fs = load_dataset_nilspod(file_path=file_path) >>> # load only ECG data of dataset >>> df, fs = load_dataset_nilspod(file_path=file_path, datastreams=['ecg']) >>> >>> # Option 2: Import data by passing Dataset object imported from NilsPodLib >>> # (in this example, only acceleration data) >>> from nilspodlib import Dataset >>> dataset = Dataset.from_bin_file("<filename>.bin") >>> df, fs = load_dataset_nilspod(dataset=dataset, datastreams='acc') """ if file_path is not None: file_path = Path(file_path) _assert_file_extension(file_path, ".bin") kwargs.setdefault("tz", kwargs.pop("timezone", tz)) kwargs.setdefault("legacy_support", "resolve") dataset = Dataset.from_bin_file(file_path, **kwargs) if file_path is None and dataset is None: raise ValueError("Either 'file_path' or 'dataset' must be supplied as parameter!") _handle_counter_inconsistencies_dataset(dataset, handle_counter_inconsistency) if isinstance(datastreams, str): datastreams = [datastreams] # convert dataset to dataframe and localize timestamp df = dataset.data_as_df(datastreams, index="local_datetime") df.index.name = "time" return df, dataset.info.sampling_rate_hz
[docs]def load_synced_session_nilspod( folder_path: path_t, datastreams: Optional[Union[str, Sequence[str]]] = None, handle_counter_inconsistency: Optional[COUNTER_INCONSISTENCY_HANDLING] = "raise", **kwargs, ) -> Tuple[pd.DataFrame, float]: """Load a synchronized session of NilsPod recordings and convert into dataframes. Parameters ---------- folder_path : :class:`~pathlib.Path` or str, optional folder path to session files datastreams : list of str, optional list of datastreams if only specific datastreams of the datasets in the session should be imported or ``None`` to load all datastreams. Datastreams that are not part of the current datasets will be silently ignored. Default: ``None`` handle_counter_inconsistency : {"raise", "warn", "ignore"}, optional how to handle if counter of dataset is not monotonously increasing, which might be an indicator for a corrupted dataset: * "raise" (default): raise an error * "warn": issue a warning but still return a dataframe * "ignore": ignore the counter check result **kwargs optional arguments passed to :func:`~nilspodlib.Session.from_folder_path`, such as: legacy_support : {"error", "warn", "resolve"}, optional Flag indicating how to deal with older NilsPod firmware versions: * "error": raise an error if an unsupported version is detected * "warn": issue a warning and parse the file without modification * "resolve" (default): perform a legacy conversion to load old files. If no suitable conversion is found, an error is raised. See the :any:`nilspodlib.legacy` package and the README of ``nilspodlib`` to learn more about available conversions. timezone : str or :class:`datetime.tzinfo`, optional timezone of the acquired data, either as string of as tzinfo object. Default: "Europe/Berlin" Returns ------- tuple df : :class:`~pandas.DataFrame` concatenated dataframe of imported session fs : float sampling rate Raises ------ ValueError if ``handle_counter_inconsistency`` is ``raise`` and counter of any dataset is inconsistent (not monotonously increasing). If ``legacy_support`` is ``raise`` and so suitable conversion can be found for the files in the session. If sampling rate is not the same for all datasets in the session ValueError If ``folder_path`` does not contain any NilsPod files If the sampling rates of the files in the folder are not the same See Also -------- :class:`~nilspodlib.dataset.Dataset` NilsPod Dataset :func:`~biopsykit.io.nilspod.load_dataset_nilspod` load a single NilsPod dataset """ # ensure pathlib folder_path = Path(folder_path) nilspod_files = sorted(folder_path.glob("*.bin")) if len(nilspod_files) == 0: raise ValueError("No NilsPod files found in directory!") kwargs.setdefault("tz", kwargs.pop("timezone", tz)) session = SyncedSession.from_folder_path(folder_path, **kwargs) session = session.align_to_syncregion() _handle_counter_inconsistencies_session(session, handle_counter_inconsistency) if isinstance(datastreams, str): datastreams = [datastreams] # convert dataset to dataframe and localize timestamp df = session.data_as_df(datastreams, index="local_datetime", concat_df=True) df.index.name = "time" if len(set(session.info.sampling_rate_hz)) > 1: raise ValueError( f"Datasets in the sessions have different sampling rates! Got: {session.info.sampling_rate_hz}." ) fs = session.info.sampling_rate_hz[0] return df, fs
def _handle_counter_inconsistencies_dataset( dataset: Dataset, handle_counter_inconsistency: COUNTER_INCONSISTENCY_HANDLING ): idxs_corrupted = np.where(np.diff(dataset.counter) < 1)[0] # edge case: check if only last sample is corrupted. if yes, cut last sample if len(idxs_corrupted) == 1 and (idxs_corrupted == len(dataset.counter) - 2): dataset = dataset.cut(start=0, stop=idxs_corrupted[0]) elif len(idxs_corrupted) > 1: if handle_counter_inconsistency == "raise": raise ValueError("Error loading dataset. Counter not monotonously increasing!") if handle_counter_inconsistency == "warn": warnings.warn( "Counter not monotonously increasing. This might indicate that the dataset is corrupted or " "that the dataset was recorded as part of a synchronized session and might need to be loaded " "using `biopsykit.io.nilspod.load_synced_session_nilspod()`. " "Check the counter of the DataFrame manually!" ) def _handle_counter_inconsistencies_session( session: SyncedSession, handle_counter_inconsistency: COUNTER_INCONSISTENCY_HANDLING ): idxs_corrupted = np.where(np.diff(session.counter) < 1)[0] # edge case: check if only last sample is corrupted. if yes, cut last sample if len(idxs_corrupted) == 1 and (idxs_corrupted == len(session.counter) - 2): session = session.cut(start=0, stop=idxs_corrupted[0]) elif len(idxs_corrupted) > 1: if handle_counter_inconsistency == "raise": raise ValueError("Error loading session. Counter not monotonously increasing!") if handle_counter_inconsistency == "warn": warnings.warn( "Counter not monotonously increasing. This might indicate that the session is corrupted. " "Check the counter of the DataFrame manually!" )
[docs]def load_csv_nilspod( file_path: path_t = None, datastreams: Optional[Sequence[str]] = None, timezone: Optional[Union[datetime.tzinfo, str]] = tz, filename_regex: Optional[str] = None, time_regex: Optional[str] = None, ) -> Tuple[pd.DataFrame, float]: r"""Convert a csv file recorded by NilsPod into a dataframe. By default, this function expects the file name to have the following pattern: "NilsPodX-<sensor-id>_YYYYMMDD_hhmmss.csv". The time information in the file name is used to infer the start time of the recording and add absolute time information to return a dataframe with a :class:`~pandas.DatetimeIndex`. If no start time can be extracted the index of the resulting dataframe is a :class:`~pandas.TimedeltaIndex`, not a :class:`~pandas.DatetimeIndex`. Parameters ---------- file_path : :class:`~pathlib.Path` or str, optional path to binary file datastreams : list of str, optional list of datastreams if only specific datastreams of the file should be imported or ``None`` to load all datastreams. Datastreams that are not part of the current dataset will be silently ignored. Default: ``None`` timezone : str or :class:`datetime.tzinfo`, optional timezone of the acquired data, either as string or as tzinfo object. Default: 'Europe/Berlin' filename_regex : str, optional regex string to extract time substring from file name or ``None`` to use default file name pattern. Default: ``None`` time_regex : str, optional regex string specifying format of time substring or ``None`` to use default time format. Default: ``None`` Returns ------- df : :class:`~pandas.DataFrame` dataframe of imported dataset fs : float sampling rate Raises ------ :exc:`~biopsykit.utils.exceptions.FileExtensionError` if file is no csv file See Also -------- :class:`~nilspodlib.dataset.Dataset` NilsPod Dataset `load_dataset_nilspod` load a single NilsPod dataset from binary file """ _assert_file_extension(file_path, ".csv") df = pd.read_csv(file_path, header=1, index_col="timestamp") header = pd.read_csv(file_path, header=None, nrows=1) # sampling rate is in second column of header sampling_rate = float(header.iloc[0, 1]) if filename_regex is None: filename_regex = r"NilsPodX-[^\s]{4}_(.*?).csv" if time_regex is None: time_regex = "%Y%m%d_%H%M%S" # convert index to nanoseconds df.index = ((df.index / sampling_rate) * 1e9).astype(int) # infer start time from filename start_time = re.findall(filename_regex, str(file_path.name)) df = _convert_index(df, start_time, time_regex) if isinstance(datastreams, str): datastreams = [datastreams] if datastreams is not None: # filter only desired datastreams df = pd.concat([df.filter(like=ds) for ds in datastreams], axis=1) if isinstance(df.index, pd.DatetimeIndex): # localize timezone (is already in correct timezone since start time is inferred from file name) df = df.tz_localize(tz=timezone) return df, sampling_rate
def _convert_index(df: pd.DataFrame, start_time: Sequence[str], time_regex: str): if len(start_time) > 0: # convert index to datetime index with absolute time information start_time = start_time[0] start_time = pd.to_datetime(start_time, format=time_regex).to_datetime64().astype(int) # add start time as offset and convert into datetime index df.index = pd.to_datetime(df.index + start_time) else: # no start time information available, so convert into timedelta index df.index = pd.to_timedelta(df.index) df.index.name = "time" return df
[docs]def load_folder_nilspod( folder_path: path_t, phase_names: Optional[Sequence[str]] = None, **kwargs ) -> Tuple[Dict[str, pd.DataFrame], float]: """Load all NilsPod datasets from one folder, convert them into dataframes, and combine them into a dictionary. This function can for example be used when single NilsPod sessions (datasets) were recorded for different study phases. Parameters ---------- folder_path : :class:`~pathlib.Path` or str, optional folder path to files phase_names: list, optional list of phase names corresponding to the files in the folder. Must match the number of recordings. If ``None`` phase names will be named ``Part{1-x}``. Default: ``None`` **kwargs additional arguments that are passed to :func:`load_dataset_nilspod` Returns ------- dataset_dict : dict dictionary with phase names as keys and pandas dataframes with sensor recordings as values fs : float sampling rate of sensor recordings Raises ------ ValueError if ``folder_path`` does not contain any NilsPod files, the number of phases does not match the number of datasets in the folder, or if the sampling rates of the files in the folder are not the same See Also -------- :func:`load_dataset_nilspod` load single NilsPod dataset Examples -------- >>> from biopsykit.io.nilspod import load_folder_nilspod >>> folder_path = "./nilspod" >>> # load all datasets from the selected folder with all datastreams >>> dataset_dict, fs = load_folder_nilspod(folder_path) >>> # load only ECG data of all datasets from the selected folder >>> dataset_dict, fs = load_folder_nilspod(folder_path, datastreams=['ecg']) >>> # load all datasets from the selected folder with correspondng phase names >>> dataset_dict, fs = load_folder_nilspod(folder_path, phase_names=['VP01','VP02','VP03']) """ # ensure pathlib folder_path = Path(folder_path) _assert_is_dir(folder_path) # look for all NilsPod binary files in the folder dataset_list = sorted(folder_path.glob("*.bin")) if len(dataset_list) == 0: raise ValueError(f"No NilsPod files found in folder {folder_path}!") if phase_names is None: phase_names = [f"Part{i}" for i in range(len(dataset_list))] if len(phase_names) != len(dataset_list): raise ValueError( f"Number of phases does not match number of datasets in the folder! " f"Expected {len(dataset_list)}, got {len(phase_names)}." ) dataset_list = [load_dataset_nilspod(file_path=dataset_path, **kwargs) for dataset_path in dataset_list] # check if sampling rate is equal for all datasets in folder fs_list = [fs for df, fs in dataset_list] if len(set(fs_list)) > 1: raise ValueError(f"Datasets in the sessions have different sampling rates! Got: {fs_list}.") fs = fs_list[0] dataset_dict = {phase: df for phase, (df, fs) in zip(phase_names, dataset_list)} return dataset_dict, fs
[docs]def check_nilspod_dataset_corrupted(dataset: Dataset) -> bool: """Check if a NilsPod dataset is potentially corrupted. A dataset is potentially corrupted if the counter is not monotonously increasing. Parameters ---------- dataset : :class:`~nilspodlib.dataset.Dataset` dataset to check Returns ------- bool flag indicating whether a NilsPod dataset is potentially corrupted or not """ return np.where(np.diff(dataset.counter) != 1.0)[0].size != 0
[docs]def get_nilspod_dataset_corrupted_info(dataset: Dataset, file_path: path_t) -> Dict: """Get information about the corruption state of a NilsPod dataset. Corruption information include the information: * "name": recording date and time * "percent_corrupt": Amount of corrupted data in percent * "condition": Condition of the dataset. Can be one of: * "fine": if dataset is not corrupted * "lost": if more than 90% of all samples are corrupted * "parts": if between 50% and 90% of all samples are corrupted * "start_only": if less than 50% of all samples are corrupted and corrupted samples are only in the first third of the dataset * "end_only": if less than 50% of all samples are corrupted and corrupted samples are only in the last third of the dataset Parameters ---------- dataset : :class:`~nilspodlib.dataset.Dataset`, optional Dataset object file_path : :class:`~pathlib.Path` or str, optional path to binary file Returns ------- dict dictionary with corruption information """ _assert_is_dtype(dataset, Dataset) nilspod_file_pattern = r"NilsPodX-\w{4}_(.*?).bin" # ensure pathlib file_path = Path(file_path) keys = ["name", "percent_corrupt", "condition"] dict_res = dict.fromkeys(keys) re_groups = re.search(nilspod_file_pattern, file_path.name) name = re_groups.group(1) if re_groups is not None else file_path.name dict_res["name"] = name if not check_nilspod_dataset_corrupted(dataset): dict_res["condition"] = "fine" dict_res["percent_corrupt"] = 0.0 return dict_res idx_diff = np.diff(dataset.counter) idx_corrupt = np.where(idx_diff != 1.0)[0] percent_corrupt = round((len(idx_corrupt) / len(idx_diff)) * 100.0, 1) condition = _get_nilspod_dataset_corrupted_info_get_condition(percent_corrupt, idx_corrupt) dict_res["percent_corrupt"] = percent_corrupt dict_res["condition"] = condition return dict_res
def _get_nilspod_dataset_corrupted_info_get_condition(percent_corrupt: float, idx_corrupt: Sequence[int]) -> str: condition = "parts" if percent_corrupt > 90.0: condition = "lost" elif percent_corrupt < 50.0: if (idx_corrupt[0] / len(idx_corrupt)) < 0.30: condition = "start_only" elif (idx_corrupt[0] / len(idx_corrupt)) > 0.70: condition = "end_only" return condition