Source code for biopsykit.signals.icg.event_extraction._b_point_pale2021

import warnings

import numpy as np
import pandas as pd

from biopsykit.signals._base_extraction import HANDLE_MISSING_EVENTS, CanHandleMissingEventsMixin
from biopsykit.signals.icg.event_extraction._base_b_point_extraction import BaseBPointExtraction
from biopsykit.utils.array_handling import sanitize_input_dataframe_1d
from biopsykit.utils.dtypes import (
    CPointDataFrame,
    HeartbeatSegmentationDataFrame,
    IcgRawDataFrame,
    is_b_point_dataframe,
    is_c_point_dataframe,
    is_heartbeat_segmentation_dataframe,
    is_icg_raw_dataframe,
)
from biopsykit.utils.exceptions import EventExtractionError

__all__ = ["BPointExtractionPale2021"]


[docs]class BPointExtractionPale2021(BaseBPointExtraction, CanHandleMissingEventsMixin): """B-point extraction algorithm by Pale et al. (2021). This algorithm extracts B-points by first determining a search window based on the C-point location and the C-point amplitude in the dZ/dt signal. Afterward, the algorithms searches for either the local minimum closest to the C-point or the first point at which the slope of the signal exceeds a certain threshold. If no criterion is met, the search is repeated with a less strict slope threshold. If still no B-point is found, the algorithm returns the signal minimum in the cardiac cycle before the C-point. For more information, see [Pal21]_. References ---------- .. [Pal21] Pale, U., Muller, N., Arza, A., & Atienza, D. (2021). ReBeatICG: Real-time Low-Complexity Beat-to-beat Impedance Cardiogram Delineation Algorithm. 2021 43rd Annual International Conference of the IEEE Engineering in Medicine & Biology Society (EMBC), 5618-5624. https://doi.org/10.1109/EMBC46164.2021.9630170 """ c_point_amplitude_fraction: float b_point_slope_threshold_01: float b_point_slope_threshold_02: float def __init__( self, c_point_amplitude_fraction: float = 0.5, b_point_slope_threshold_01: float = 0.11, b_point_slope_threshold_02: float = 0.08, handle_missing_events: HANDLE_MISSING_EVENTS = "warn", ): """Initialize new ``BPointExtractionPale2021`` instance. Parameters ---------- handle_missing_events : one of {"warn", "raise", "ignore"}, optional How to handle failing event extraction. Must be one of: - ``"warn"``: issue a warning and set the event to NaN, - ``"raise"``: raise an ``EventExtractionError``, or - ``"ignore"``: continue silently. Default: ``"warn"``. """ self.c_point_amplitude_fraction = c_point_amplitude_fraction self.b_point_slope_threshold_01 = b_point_slope_threshold_01 self.b_point_slope_threshold_02 = b_point_slope_threshold_02 super().__init__(handle_missing_events=handle_missing_events)
[docs] def extract( self, *, icg: IcgRawDataFrame, heartbeats: HeartbeatSegmentationDataFrame, c_points: CPointDataFrame, sampling_rate_hz: float, ): """Extract B-points from given ICG derivative signal. This algorithm extracts B-points by transforming the ICG signal using a weighted time window applied to the segment preceding the maximal ICG peak (C-point). This transformation amplifies the characteristics of the B-point, facilitating B-point identification. Parameters ---------- icg : IcgRawDataFrame The raw ICG signal data. heartbeats : HeartbeatSegmentationDataFrame The heartbeat segmentation data. c_points : CPointDataFrame The C-point data. sampling_rate_hz : float The sampling rate of the ICG signal in Hz. Returns ------- BPointDataFrame The extracted B-point data. """ self._check_valid_missing_handling() is_icg_raw_dataframe(icg) is_heartbeat_segmentation_dataframe(heartbeats) is_c_point_dataframe(c_points) icg = sanitize_input_dataframe_1d(icg, column="icg_der") icg = icg.squeeze() # Create the b_point Dataframe. Use the heartbeats id as index b_points = pd.DataFrame(index=heartbeats.index, columns=["b_point_sample", "nan_reason"]) # get the c_point locations from the c_points dataframe c_points = c_points["c_point_sample"] icg_2nd_der = np.gradient(icg) # iterate over each heartbeat for idx, data in heartbeats.iterrows(): # Get the C-Point location at the current heartbeat id c_point = c_points[idx] if pd.isna(c_point): b_points.loc[idx, "b_point_sample"] = np.nan b_points.loc[idx, "nan_reason"] = "c_point_nan" continue icg_slice = icg.iloc[data["start_sample"] : c_point] # Calculate the search window based on the C-point location and amplitude # start of the search window is 80 ms before the C-point search_window_start = c_point - int(0.08 * sampling_rate_hz) # end of the search window is the closest point before the C-point with amplitude less than # c_point_amplitude_fraction * c_point c_point_amplitude = icg.iloc[c_point] * self.c_point_amplitude_fraction search_window_end = np.where(icg.iloc[search_window_start:c_point] < c_point_amplitude)[0] # If no point is found, use the C-point as the end of the search window; otherwise, use the last point # before the C-point that meets the condition search_window_end = c_point if search_window_end.size == 0 else search_window_end[-1] + search_window_start # B_min is the minimum of the signal in the search window b_point_min = data["start_sample"] + np.argmin(icg_slice) if (search_window_end - search_window_start) <= 2: # If the search window is too small, set the B-point to the minimum of the signal in the search window b_points.loc[idx, "b_point_sample"] = b_point_min continue # slice derivative to the search window icg_2nd_der_slice = icg_2nd_der[search_window_start:search_window_end] # candidate 1: search for local minima in the second derivative zero_crossings = np.where(np.diff(np.signbit(icg_2nd_der_slice)))[0] # check if it's a local minimum => the value of the derivative at the zero crossing must be positive zero_crossings = zero_crossings[np.gradient(icg_2nd_der_slice)[zero_crossings] > 0] # candidate 2: search for the first point at which the slope exceeds the threshold; the slope is already # calculated in the derivative slope_exceeds_threshold = np.where(icg_2nd_der_slice > self.b_point_slope_threshold_01)[0] # if no slope exceeds the threshold, use the second threshold if slope_exceeds_threshold.size == 0: slope_exceeds_threshold = np.where(icg_2nd_der_slice > self.b_point_slope_threshold_02)[0] # concatenate and sort the candidates candidates = np.sort(np.concatenate((zero_crossings, slope_exceeds_threshold))) # if no candidates are found, use the minimum of the signal in the search window; otherwise, use the # candidate closest to the C-point b_point = b_point_min if candidates.size == 0 else search_window_start + candidates[-1] b_points.loc[idx, "b_point_sample"] = b_point idx_nan = b_points["b_point_sample"].isna() if idx_nan.sum() > 0: idx_nan = list(b_points.index[idx_nan]) missing_str = ( f"The C-point contains NaN at heartbeats {idx_nan}! The index of the B-points were also set to NaN." ) if self.handle_missing_events == "warn": warnings.warn(missing_str) elif self.handle_missing_events == "raise": raise EventExtractionError(missing_str) b_points = b_points.astype({"b_point_sample": "Int64", "nan_reason": "object"}) is_b_point_dataframe(b_points) self.points_ = b_points return self