Source code for biopsykit.io.psg

"""Module for importing data recorded by a PSG system (expects .edf files)."""

import datetime
import time
import warnings
from pathlib import Path
from typing import Dict, Optional, Sequence, Union

try:
    import mne
except ImportError as e:
    raise ImportError(
        "The 'mne' package is required to read edf data files. "
        "Please install it using 'pip install mne' or 'poetry add mne'."
    ) from e

import pandas as pd
from biopsykit.utils._datatype_validation_helper import _assert_file_extension, _assert_is_dir
from biopsykit.utils._types import path_t

__all__ = ["PSGDataset"]


[docs]class PSGDataset: """Class for loading and processing PSG data.""" def __init__( self, data_dict: Dict[str, pd.DataFrame], sampling_rate_dict: Dict[str, int], start_time: Optional[pd.Timestamp] = None, tz: Optional[str] = None, ): self._data = data_dict for name, data in data_dict.items(): setattr(self, name, data) for _, sampling_rate in sampling_rate_dict.items(): setattr(self, "sampling_rate", sampling_rate) setattr(self, "channels", list(self._data.keys())) self._sampling_rate = sampling_rate_dict self._start_time_datetime = pd.Timestamp(start_time) self._start_time_unix = int(time.mktime(start_time.timetuple())) self._tz = tz
[docs] @classmethod def from_edf_file( cls, path: path_t, datastreams: Optional[Sequence] = None, tz: Optional[str] = "Europe/Berlin", ): """Create a new Dataset from a valid .edf file. Parameters ---------- path : :class:`pathlib.Path` or str Path to the .edf file datastreams : Optional[Sequence], optional List of datastreams to load, by default None. If None, all datastreams are loaded. tz : Optional[str], optional Timezone of the recording, by default "Europe/Berlin". Returns ------- PSGDataset : New Dataset instance """ if path.is_dir(): data_dict, fs, start_time = cls.load_data_folder(path, datastreams, tz) else: data_dict, fs, start_time = cls.load_data(path, datastreams, tz) return cls(data_dict=data_dict, sampling_rate_dict={"sampling_rate": fs}, start_time=start_time, tz=tz)
@property def start_time_unix(self) -> Optional[pd.Timestamp]: """Start time of the recording in UTC time.""" return self._start_time_unix @property def start_time_datetime(self) -> Optional[pd.Timestamp]: """Start time of the recording in UTC time.""" return self._start_time_datetime @property def timezone(self) -> str: """Timezone the dataset was recorded in.""" return self._tz
[docs] def data_as_df(self, index: Optional[str] = None) -> pd.DataFrame: """Return data as one combined pandas.DataFrame. Parameters ---------- index : str, optional Specify which index should be used for the dataset. The options are: * "time": For the time in seconds since the first sample * "utc": For the utc time stamp of each sample * "utc_datetime": for a pandas DateTime index in UTC time * "local_datetime": for a pandas DateTime index in the timezone set for the session * None: For a simple index (0...N) Returns ------- pd.DataFrame: Combined data as pandas DataFrame with respective index """ # get datastreams from dict datastreams = self._data.keys() data = [self._data[datastream] for datastream in datastreams] data = pd.concat(data, axis=1) data = self._add_index(data, index) return data
[docs] def load_ground_truth( self, path: path_t, ): """Load ground truth data from a .xlsx file which can be exported from the Software Somnomedics. .. note:: Other formats are not supported yet and raise a FileNotFoundError. path : :class:`pathlib.Path` or str path to the .xlsx file return: pd.DataFrame ground truth data as pandas DataFrame """ file_path = path.parents[1].joinpath("labels/PSG_analyse.xlsx") try: sleep_phases = pd.read_excel(file_path, sheet_name="Schlafprofil", header=7, index_col=0) except FileNotFoundError: warnings.warn("No ground truth found") return pd.DataFrame() return sleep_phases
# TODO: Read in excel or txt files with sleep labels
[docs] @classmethod def load_data_folder( cls, folder_path: path_t, datastreams: Optional[Sequence] = None, timezone: Optional[Union[datetime.tzinfo, str]] = "Europe/Berlin", ): """Load data from a folder containing a single .edf file. Parameters ---------- folder_path : :class:`pathlib.Path` or str path to the folder containing the .edf file datastreams: lst, optional list of datastreams to load. If None, all datastreams are loaded timezone: str, optional timezone of the recording. If None, the timezone is set to UTC Returns ------- data_dict: dict of datastreams fs: sampling rate start_time: start time of the recording Raises ------ FileNotFoundError: if no .edf file is found in the folder ValueError: if more than one .edf file is found in the folder """ _assert_is_dir(folder_path) # look for all PSG .edf files in the folder dataset_list = sorted(folder_path.glob("*.edf")) if len(dataset_list) == 0: raise FileNotFoundError(f"No PSG files found in folder {folder_path}!") if len(dataset_list) > 1: raise ValueError( f"More than one PSG files found in folder {folder_path}!" f"This function only supports one recording per folder!" ) result_dict, fs, start_time = cls.load_data(folder_path.joinpath(dataset_list[0]), datastreams, timezone) return result_dict, fs, start_time
[docs] @classmethod def load_data( cls, path: path_t, datastreams: Optional[Sequence] = None, timezone: Optional[Union[datetime.tzinfo, str]] = "Europe/Berlin", ): """Load PSG data from a valid .edf file. Parameters ---------- path : :class:`pathlib.Path` or str path to the .edf file datastreams: lst, optional list of datastreams to load. If None, all datastreams are loaded timezone: str, optional timezone of the recording. If None, the timezone is set to UTC Returns ------- data_dict: dict of datastreams fs: sampling rate start_time: start time of the recording Raises ------ Value Error: Not all datastreams are found in the .edf file """ # load raw data data_psg, fs = cls.load_data_raw(path, timezone) # select datastreams to extract if datastreams is None: datastreams = data_psg.ch_names if isinstance(datastreams, str): datastreams = [datastreams] # save extracted datastreams in dict result_dict = {} for datastream in datastreams: try: time_idx, _, start_time = cls._create_dt_index(data_psg.info["meas_date"], times_array=data_psg.times) psg_datastream = data_psg.copy().pick([datastream]).get_data()[0, :] result_dict[datastream] = pd.DataFrame(psg_datastream, index=time_idx, columns=[datastream]) except ValueError: print( "Not all channels match the selected datastreams - Following Datastreams are available: " + str(data_psg.ch_names) ) return result_dict, fs, start_time
[docs] @classmethod def load_data_raw( cls, path: path_t, timezone: Optional[Union[datetime.tzinfo, str]] = "Europe/Berlin", ): """Load PSG data from .edf file. Parameters ---------- path : :class:`pathlib.Path` or str path to the .edf file timezone: str, optional timezone of the recording. If None, the timezone is set to UTC Returns ------- data: mne.io.Raw object fs: sampling rate """ # ensure pathlib path = Path(path) _assert_file_extension(path, ".edf") if timezone is None: timezone = "Europe/Berlin" # load data from edf file edf = mne.io.read_raw_edf(path) # get sampling rate fs = edf.info["sfreq"] return edf, fs
@classmethod def _create_dt_index(cls, starttime, times_array): """Create a datetime index from the start time and the times array.""" starttime_s = starttime.timestamp() # add start time to array of timestamps times_array = times_array + starttime_s # convert to datetime datetime_index = pd.to_datetime(times_array, unit="s") # generate epochs from datetime index epochs, start_time = cls._generate_epochs(datetime_index) return datetime_index, epochs, start_time @classmethod def _generate_epochs(cls, datetime_index): """Generate epochs from a datetime index.""" start_time = datetime_index[0] # round to 30 second epochs epochs_30s = datetime_index.round("30s") epochs_clear = (epochs_30s - start_time).total_seconds() epochs_clear = epochs_clear / 30 epochs = epochs_clear.astype(int) return epochs, start_time def _add_index(self, data: pd.DataFrame, index: str) -> pd.DataFrame: index_names = { None: "n_samples", "time": "t", "utc": "utc", "utc_datetime": "date", "local_datetime": f"date ({self.timezone})", } if index and index not in index_names: raise ValueError(f"Supplied value for index ({index}) is not allowed. Allowed values: {index_names.keys()}") index_name = index_names[index] data.index.name = index_name if index is None: data = data.reset_index(drop=True) data.index.name = index_name return data if index == "utc_datetime": return data if index == "time": data.index = data.index - self.start_time_datetime data.index = data.index.total_seconds() return data if index == "utc": # convert counter to utc timestamps i seconds data = data.reset_index(drop=True) data.index.astype("int64") data.index += self.start_time_unix return data if index == "local_datetime": data.index = data.index.tz_localize(self.timezone) return data return data