Source code for biopsykit.sleep.sleep_wake_detection.utils

"""Utility functions for sleep/wake detection algorithms."""

from typing import Literal, Optional

import numpy as np

EPOCH_LENGTH = Literal[30, 60]


[docs]def rescore(predictions: np.ndarray, epoch_length: Optional[EPOCH_LENGTH] = 30) -> np.ndarray: """Apply Webster's rescoring rules to sleep/wake predictions. Parameters ---------- predictions : array_like sleep/wake predictions epoch_length : int length of actigraphy epoch in seconds Returns ------- array_like rescored sleep/wake predictions """ rescored = predictions.copy() # rules a through c rescored = _apply_recording_rules_a_c(rescored, epoch_length) # rules d and e rescored = _apply_recording_rules_d_e(rescored, epoch_length) # wake phases of 1 minute, surrounded by sleep, get rescored for t in range(1, len(rescored) - 1): if rescored[t] == 1 and rescored[t - 1] == 0 and rescored[t + 1] == 0: rescored[t] = 0 return rescored
def _apply_recording_rules_a_c(rescored: np.ndarray, epoch_length: EPOCH_LENGTH): # pylint:disable=too-many-branches wake_bin = 0 for t in range(len(rescored)): # pylint:disable=consider-using-enumerate if rescored[t] == 1: wake_bin += 1 elif epoch_length == 30: if wake_bin >= 30: # rule c: at least 15 minutes of wake, next 4 minutes of sleep get rescored rescored[t : t + 8] = 0 elif 20 <= wake_bin < 30: # rule b: at least 10 minutes of wake, next 3 minutes of sleep get rescored rescored[t : t + 6] = 0 elif 8 <= wake_bin < 20: # rule a: at least 4 minutes of wake, next 1 minute of sleep gets rescored rescored[t : t + 2] = 0 wake_bin = 0 else: if wake_bin >= 15: # rule c: at least 15 minutes of wake, next 4 minutes of sleep get rescored rescored[t : t + 4] = 0 elif 10 <= wake_bin < 15: # rule b: at least 10 minutes of wake, next 3 minutes of sleep get rescored rescored[t : t + 3] = 0 elif 4 <= wake_bin < 10: # rule a: at least 4 minutes of wake, next 1 minute of sleep gets rescored rescored[t : t + 1] = 0 wake_bin = 0 return rescored def _apply_recording_rules_d_e(rescored: np.ndarray, epoch_length: EPOCH_LENGTH): # pylint:disable=too-many-branches # rule d/e: 6/10 minutes or less of sleep surrounded by at least 10/20 minutes of wake on each side get rescored if epoch_length == 30: sleep_rules = [12, 20] wake_rules = [20, 40] else: sleep_rules = [6, 10] wake_rules = [10, 20] for sleep_thres, wake_thres in zip(sleep_rules, wake_rules): sleep_bin = 0 start_ind = 0 for t in range(wake_thres, len(rescored) - wake_thres): if rescored[t] == 1: sleep_bin += 1 if sleep_bin == 1: start_ind = t else: sum1 = np.sum(rescored[start_ind - wake_thres : start_ind]) sum2 = np.sum(rescored[t : t + wake_thres]) if sleep_thres >= sleep_bin > 0 == sum1 and sum2 == 0: rescored[start_ind:t] = 0 sleep_bin = 0 return rescored