Source code for biopsykit.io.sleep_analyzer

# pylint:disable=unsupported-assignment-operation
# pylint:disable=unsubscriptable-object
"""Module containing different I/O functions to load data recorded by Withings Sleep Analyzer."""
import datetime
import re
from ast import literal_eval
from pathlib import Path
from typing import Dict, Optional, Sequence, Union

import numpy as np
import pandas as pd
from biopsykit.sleep.utils import split_nights
from biopsykit.utils._datatype_validation_helper import _assert_file_extension, _assert_has_columns, _assert_is_dir
from biopsykit.utils._types import path_t
from biopsykit.utils.datatype_helper import SleepEndpointDataFrame, is_sleep_endpoint_dataframe
from biopsykit.utils.time import tz

__all__ = [
    "WITHINGS_RAW_DATA_SOURCES",
    "load_withings_sleep_analyzer_raw_file",
    "load_withings_sleep_analyzer_raw_folder",
    "load_withings_sleep_analyzer_summary",
]

WITHINGS_RAW_DATA_SOURCES = {
    "hr": "heart_rate",
    "respiratory-rate": "respiration_rate",
    "sleep-state": "sleep_state",
    "snoring": "snoring",
}
""" Mapping of data source names to names of the biosignal (and the exported dataframe column)"""


[docs]def load_withings_sleep_analyzer_raw_folder( folder_path: path_t, timezone: Optional[Union[datetime.tzinfo, str]] = None, split_into_nights: Optional[bool] = True, ) -> Union[pd.DataFrame, Sequence[pd.DataFrame]]: """Load folder with raw data from a Withings Sleep Analyzer recording session and convert into time-series data. The function will return a list of dataframes (one dataframe per night, if ``split_into_nights`` is ``True``) with continuous time-series data (sampling distance: 1min) of all data sources (heart rate, respiratory rate, sleep state, snoring) combined. The dataframe columns will be: * ``heart_rate``: heart rate in beats-per-minute (bpm) * ``respiration_rate``: respiration rate in breaths-per-minute (bpm) * ``sleep_state``: current sleep state: 0 = awake, 1 = light sleep, 2 = deep sleep, 3 = rem sleep * ``snoring``: flag whether snoring was detected: 0 = no snoring, 100 = snoring The files are all expected to have the following name pattern: ``raw-sleep-monitor_<datasource>.csv``. .. warning:: If data is not split into single nights (``split_into_nights`` is ``False``), data in the dataframe will **not** be resampled. Parameters ---------- folder_path: :class:`~pathlib.Path` or str path to folder with Sleep Analyzer raw data timezone : str or :class:`datetime.tzinfo`, optional timezone of the acquired data, either as string of as tzinfo object. Default: 'Europe/Berlin' split_into_nights : bool, optional whether to split the dataframe into the different recording nights (and return a list of dataframes) or not. Default: ``True`` Returns ------- :class:`~pandas.DataFrame` or list of such dataframe (or list of dataframes, if ``split_into_nights`` is ``True``) with Sleep Analyzer data Raises ------ ValueError if ``folder_path`` is not a directory if no Sleep Analyzer Raw files are in directory specified by ``folder_path`` See Also -------- load_withings_sleep_analyzer_raw_file load a single Sleep Analyzer file with only one data source """ # ensure pathlib folder_path = Path(folder_path) _assert_is_dir(folder_path) raw_files = sorted(folder_path.glob("raw_sleep-monitor_*.csv")) if len(raw_files) == 0: raise ValueError(f"No sleep analyzer raw files found in {folder_path}!") data_sources = [re.findall(r"raw_sleep-monitor_(\S*).csv", s.name)[0] for s in raw_files] list_data = [ load_withings_sleep_analyzer_raw_file( file_path, data_source=WITHINGS_RAW_DATA_SOURCES[data_source], timezone=timezone, split_into_nights=split_into_nights, ) for file_path, data_source in zip(raw_files, data_sources) if data_source in WITHINGS_RAW_DATA_SOURCES ] if split_into_nights: # "transpose" list of dictionaries. # before: outer list = data sources, inner dict = nights. # after: outer dict = nights, inner list = data sources keys = np.unique(np.array([sorted(data.keys()) for data in list_data]).flatten()) dict_nights = {} for key in keys: dict_nights.setdefault(key, []) for data in list_data: dict_nights[key].append(data[key]) data = {key: pd.concat(data, axis=1) for key, data in dict_nights.items()} else: data = pd.concat(list_data, axis=1) return data
[docs]def load_withings_sleep_analyzer_raw_file( file_path: path_t, data_source: str, timezone: Optional[Union[datetime.tzinfo, str]] = None, split_into_nights: Optional[bool] = True, ) -> Union[pd.DataFrame, Dict[str, pd.DataFrame]]: """Load single Withings Sleep Analyzer raw data file and convert into time-series data. Parameters ---------- file_path : :class:`~pathlib.Path` or str path to file data_source : str data source of file specified by ``file_path``. Must be one of ['heart_rate', 'respiration_rate', 'sleep_state', 'snoring']. timezone : str or :class:`datetime.tzinfo`, optional timezone of recorded data, either as string or as tzinfo object. Default: 'Europe/Berlin' split_into_nights : bool, optional whether to split the dataframe into the different recording nights (and return a dictionary of dataframes) or not. Default: ``True`` Returns ------- :class:`~pandas.DataFrame` or dict of such dataframe (or dict of dataframes, if ``split_into_nights`` is ``True``) with Sleep Analyzer data Raises ------ ValueError if unsupported data source was passed `~biopsykit.utils.exceptions.FileExtensionError` if ``file_path`` is not a csv file `~biopsykit.utils.exceptions.ValidationError` if file does not have the required columns ``start``, ``duration``, ``value`` """ if data_source not in WITHINGS_RAW_DATA_SOURCES.values(): raise ValueError( "Unsupported data source {}! Must be one of {}.".format( data_source, list(WITHINGS_RAW_DATA_SOURCES.values()) ) ) file_path = Path(file_path) _assert_file_extension(file_path, ".csv") data = pd.read_csv(file_path) _assert_has_columns(data, [["start", "duration", "value"]]) if timezone is None: timezone = tz # convert string timestamps to datetime data["start"] = pd.to_datetime(data["start"]) # sort index data = data.set_index("start").sort_index() # drop duplicate index values data = data.loc[~data.index.duplicated()] # convert it into the right time zone data = data.groupby("start", group_keys=False).apply(_localize_time, timezone=timezone) # convert strings of arrays to arrays data["duration"] = data["duration"].apply(literal_eval) data["value"] = data["value"].apply(literal_eval) # rename index data.index.name = "time" # explode data and apply timestamp explosion to groups data_explode = data.apply(pd.Series.explode) data_explode = data_explode.groupby("time", group_keys=False).apply(_explode_timestamp) # rename the value column data_explode.columns = [data_source] # convert dtypes from object into numerical values data_explode = data_explode.astype(int) # drop duplicate index values data_explode = data_explode.loc[~data_explode.index.duplicated()] if split_into_nights: data_explode = split_nights(data_explode) data_explode = {key: _reindex_datetime_index(d) for key, d in data_explode.items()} else: data_explode = _reindex_datetime_index(data_explode) return data_explode
[docs]def load_withings_sleep_analyzer_summary(file_path: path_t, timezone: Optional[str] = None) -> SleepEndpointDataFrame: """Load Sleep Analyzer summary file. This function additionally computes several other sleep endpoints from the Sleep Analyzer summary data to be comparable with the output with the format of other sleep analysis algorithms. All time information are reported in minutes. The resulting dataframe has the following columns: * ``total_duration``: Total recording time * ``total_time_light_sleep``: Total time of light sleep * ``total_time_deep_sleep``: Total time of deep sleep * ``total_time_rem_sleep``: Total time of REM sleep * ``total_time_awake``: Total time of being awake * ``total_sleep_duration``: Total sleep duration, i.e., time between Sleep Onset and Wake Onset * ``number_wake_bouts``: Total number of wake bouts * ``sleep_onset_latency``: Sleep Onset Latency, i.e., time in bed needed to fall asleep * ``getup_onset_latency``: Get Up Latency, i.e., time in bed after awakening until getting up * ``sleep_onset``: Sleep Onset, i.e., time of falling asleep, in absolute time * ``wake_onset``: Wake Onset, i.e., time of awakening, in absolute time * ``wake_after_sleep_onset``: Wake After Sleep Onset (WASO), i.e., total time awake after falling asleep * ``count_snoring_episodes``: Total number of snoring episodes * ``total_time_snoring``: Total time of snoring * ``heart_rate_avg``: Average heart rate during recording in bpm * ``heart_rate_min``: Minimum heart rate during recording in bpm * ``heart_rate_max``: Maximum heart rate during recording in bpm Parameters ---------- file_path : :class:`~pathlib.Path` or str path to file timezone : str or :class:`datetime.tzinfo`, optional timezone of recorded data, either as string or as tzinfo object. Default: 'Europe/Berlin' Returns ------- :obj:`~biopsykit.datatype_helper.SleepEndpointDataFrame` dataframe with Sleep Analyzer summary data, i.e., sleep endpoints """ # ensure pathlib file_path = Path(file_path) _assert_file_extension(file_path, ".csv") data = pd.read_csv(file_path) _assert_has_columns(data, [["von", "bis"]]) if timezone is None: timezone = tz for col in ["von", "bis"]: # convert into date time data[col] = pd.to_datetime(data[col]).dt.tz_convert(timezone) # total duration in seconds data["total_duration"] = [int(td.total_seconds()) for td in data["bis"] - data["von"]] data["date"] = data["von"] data["date"] = data["date"].apply( lambda date: ((date - pd.Timedelta("1d")) if date.hour < 12 else date).normalize() ) data = data.rename( { "von": "recording_start", "bis": "recording_end", "leicht (s)": "total_time_light_sleep", "tief (s)": "total_time_deep_sleep", "rem (s)": "total_time_rem_sleep", "wach (s)": "total_time_awake", "Aufwachen": "number_wake_bouts", "Duration to sleep (s)": "sleep_onset_latency", "Duration to wake up (s)": "getup_latency", "Snoring episodes": "count_snoring_episodes", "Snoring (s)": "total_time_snoring", "Average heart rate": "heart_rate_avg", "Heart rate (min)": "heart_rate_min", "Heart rate (max)": "heart_rate_max", }, axis="columns", ) data["sleep_onset"] = data["recording_start"] + pd.to_timedelta(data["sleep_onset_latency"], unit="seconds") # Wake after Sleep Onset (WASO): total time awake after sleep onset data["wake_after_sleep_onset"] = data["total_time_awake"] - data["sleep_onset_latency"] - data["getup_latency"] data["wake_onset"] = data["recording_end"] - pd.to_timedelta(data["getup_latency"], unit="seconds") # compute total sleep duration # = total duration - (time to fall asleep + time to get up (= time spent in bed after waking up)) data["total_sleep_duration"] = data["total_duration"] - data["sleep_onset_latency"] - data["getup_latency"] # compute net sleep duration (time spent actually sleeping) = total sleep duration - wake after sleep onset data["net_sleep_duration"] = data["total_sleep_duration"] - data["wake_after_sleep_onset"] transform_cols = [ "total_time_light_sleep", "total_time_deep_sleep", "total_time_rem_sleep", "total_time_awake", "sleep_onset_latency", "getup_latency", "total_time_snoring", "wake_after_sleep_onset", "total_sleep_duration", "net_sleep_duration", ] data[transform_cols] = data[transform_cols].transform(lambda column: (column / 60).astype(int)) data = data.set_index("date") # reindex column order new_cols = list(data.columns) sowo = ["sleep_onset", "wake_onset"] for d in sowo: new_cols.remove(d) data = data[sowo + new_cols] # assert output is in the correct format is_sleep_endpoint_dataframe(data) return data
def _localize_time(df: pd.DataFrame, timezone) -> pd.DataFrame: df.index = pd.to_datetime(df.index).tz_convert(timezone) return df def _explode_timestamp(df: pd.DataFrame) -> pd.DataFrame: # sum up the time durations and subtract the first value from it (so that we start from 0) # dur_sum then looks like this: [0, 60, 120, 180, ...] dur_sum = df["duration"].cumsum() - df["duration"].iloc[0] # Add these time durations to the index timestamps. df["time"] = df.index + pd.to_timedelta(dur_sum, unit="s") # set this as index df = df.set_index("time") # we don't need the duration column anymore so we can drop it df = df.drop(columns="duration") return df def _reindex_datetime_index(df: pd.DataFrame) -> pd.DataFrame: return df.reindex(df.resample("1min").bfill().index)