Source code for biopsykit.signals.icg.event_extraction._b_point_forouzanfar2018
import warnings
import numpy as np
import pandas as pd
from scipy.signal import argrelmin
from tpcp import Parameter
from biopsykit.signals._base_extraction import HANDLE_MISSING_EVENTS, CanHandleMissingEventsMixin
from biopsykit.signals.icg.event_extraction._base_b_point_extraction import BaseBPointExtraction
from biopsykit.utils.array_handling import sanitize_input_dataframe_1d
from biopsykit.utils.dtypes import (
CPointDataFrame,
HeartbeatSegmentationDataFrame,
IcgRawDataFrame,
is_b_point_dataframe,
is_c_point_dataframe,
is_heartbeat_segmentation_dataframe,
is_icg_raw_dataframe,
)
from biopsykit.utils.exceptions import EventExtractionError
__all__ = ["BPointExtractionForouzanfar2018"]
[docs]class BPointExtractionForouzanfar2018(BaseBPointExtraction, CanHandleMissingEventsMixin):
"""B-point extraction algorithm by Forouzanfar et al. (2018).
This algorithm extracts B-points based on the detection of the most prominent monotonic increasing segment
between the A-Point (the local minimum within one third of the beat-to-beat interval prior to the C-Point)
and the C-Point. The B-Point is then detected as the last zero crossing or local maximum of the third derivative
of the ICG signal within the segment.
For more information, see [For18]_.
References
----------
.. [For18] Forouzanfar, M., Baker, F. C., De Zambotti, M., McCall, C., Giovangrandi, L., & Kovacs, G. T. A. (2018).
Toward a better noninvasive assessment of preejection period: A novel automatic algorithm for B-point detection
and correction on thoracic impedance cardiogram. Psychophysiology, 55(8), e13072.
https://doi.org/10.1111/psyp.13072
"""
# input parameters
scaling_factor: Parameter[float]
correct_outliers: Parameter[bool]
def __init__(
self,
scaling_factor: float = 2000,
correct_outliers: bool | None = False,
handle_missing_events: HANDLE_MISSING_EVENTS = "warn",
):
"""Initialize new ``BPointExtractionForouzanfar2018`` instance.
In the original paper, the authors report the *sampling frequency* of the ICG signal as the scaling factor.
Since this would change the algorithm behavior depending on the sampling rate of the ICG signal, this
implementation introduces a scaling factor that can be set by the user. By default, the scaling factor is
set to 2000 (corresponding to a sampling rate of the original data of 2000 Hz) instead of using the
sampling rate of the ICG signal.
Parameters
----------
scaling_factor : float, optional
Scaling factor for the B-point extraction algorithm. Default: 2000.
correct_outliers : bool, optional
True to correct outliers, False to set the B-point to NaN if no monotonic segment is found.
Default: False.
handle_missing_events : one of {"warn", "raise", "ignore"}, optional
How to handle failing event extraction. Must be one of:
- ``"warn"``: issue a warning and set the event to NaN,
- ``"raise"``: raise an ``EventExtractionError``, or
- ``"ignore"``: continue silently.
Default: ``"warn"``.
"""
super().__init__(handle_missing_events=handle_missing_events)
self.scaling_factor = scaling_factor
self.correct_outliers = correct_outliers
# @make_action_safe
[docs] def extract( # noqa: PLR0915
self,
*,
icg: IcgRawDataFrame,
heartbeats: HeartbeatSegmentationDataFrame,
c_points: CPointDataFrame,
sampling_rate_hz: float | None, # noqa: ARG002
):
"""Extract B-points from given ICG derivative signal.
The algorithm extracts B-points based on the detection of the most prominent monotonic increasing segment
between the A-Point (the local minimum within one third of the beat-to-beat interval prior to the C-Point)
and the C-Point. The B-Point is then detected as the last zero crossing or local maximum of the third derivative
of the ICG signal within the segment.
The results are saved in the ``points_`` attribute of the super class.
Parameters
----------
icg : :class:`~pandas.DataFrame`
ICG derivative signal
heartbeats : :class:`~pandas.DataFrame`
Segmented heartbeats. Each row contains start, end, and R-peak location (in samples
from beginning of signal) of that heartbeat, index functions as id of heartbeat
c_points : :class:`~pandas.DataFrame`
Extracted C-points. Each row contains the C-point location (in samples from beginning of signal) for each
heartbeat, index functions as id of heartbeat. C-point locations can be NaN if no C-points were detected
for certain heartbeats
sampling_rate_hz : int
sampling rate of ICG derivative signal in hz
Returns
-------
self
Raises
------
:exc:`~biopsykit.utils.exceptions.EventExtractionError`
If the event extraction fails and ``handle_missing`` is set to "raise"
"""
# sanitize input signal
self._check_valid_missing_handling()
is_icg_raw_dataframe(icg)
is_heartbeat_segmentation_dataframe(heartbeats)
is_c_point_dataframe(c_points)
icg = sanitize_input_dataframe_1d(icg, column="icg_der")
icg = icg.squeeze()
# Create the B-Point/A-Point Dataframes with the index of the heartbeat_list
b_points = pd.DataFrame(index=heartbeats.index, columns=["b_point_sample", "nan_reason"])
# get the c_point locations from the c_points dataframe
c_points = c_points["c_point_sample"]
# Calculate the second- and third-derivative of the ICG-signal
second_der = np.gradient(icg)
third_der = np.gradient(second_der)
for idx, data in heartbeats.iloc[1:].iterrows():
# Get the C-Point location at the current heartbeat id
c_point = c_points[idx]
prev_c_point = c_points[idx - 1]
# check if the current or the previous C-Point contain NaN. If this is the case, set the b_point to NaN
if pd.isna(c_point) or pd.isna(prev_c_point):
b_points.loc[idx, "b_point_sample"] = np.nan
b_points.loc[idx, "nan_reason"] = "c_point_nan"
continue
# Compute the beat to beat interval
beat_to_beat = c_point - prev_c_point
# Compute the search interval for the A-Point
search_interval = int(beat_to_beat / 3)
# Detect the local minimum (A-Point) within one third of the beat to beat interval prior to the C-Point
a_point = self._get_a_point(icg, search_interval, c_point) + (c_point - search_interval)
# Select the signal_segment between the A-Point and the C-Point
signal_clean_segment = icg.iloc[a_point : c_point + 1]
# Define the C-Point amplitude which is used as a constraint for monotonic segment detection
c_amplitude = icg.iloc[c_point]
# Step 4.1: Get the most prominent monotonic increasing segment between the A-Point and the C-Point
start_sample, end_sample = (
self._get_most_prominent_monotonic_increasing_segment(signal_clean_segment, c_amplitude) + a_point
)
if (start_sample == a_point) & (end_sample == a_point):
if self.correct_outliers:
b_points.loc[idx, "b_point_sample"] = data["r_peak_sample"]
else:
b_points.loc[idx, "b_point_sample"] = np.nan
b_points.loc[idx, "nan_reason"] = "no_monotonic_segment"
continue
# Get the first third of the monotonic increasing segment
start = start_sample
end = end_sample - int((2 / 3) * (end_sample - start_sample))
# 2nd derivative of the segment
monotonic_segment_2nd_der = pd.DataFrame(second_der[start:end], columns=["2nd_der"])
# 3rd derivative of the segment
monotonic_segment_3rd_der = pd.DataFrame(third_der[start:end], columns=["3rd_der"])
# Calculate the amplitude difference between the C-Point and the A-Point
height = icg.iloc[c_point] - icg.iloc[a_point]
# Compute the significant zero_crossings
significant_zero_crossings = self._get_zero_crossings_3rd_derivative(
monotonic_segment_3rd_der, monotonic_segment_2nd_der, height
)
# Compute the significant local maxima of the 3rd derivative of the most prominent monotonic segment
significant_local_maxima = self._get_local_maxima_3rd_derivative(monotonic_segment_3rd_der, height)
# Label the last zero crossing/ local maximum as the B-Point
# If there are no zero crossings or local maxima use the first Point of the segment as B-Point
significant_features = pd.concat([significant_zero_crossings, significant_local_maxima], axis=0) + start
if len(significant_features) == 0:
b_point = start
else:
b_point = significant_features.iloc[np.argmin(c_point - significant_features)].iloc[0]
b_points.loc[idx, "b_point_sample"] = b_point
# interpolate the first B-Point with the second B-Point since it is not possible to detect a B-Point
# for the first heartbeat
b_points.loc[b_points.index[0], "b_point_sample"] = (
b_points.loc[b_points.index[1], "b_point_sample"] - b_points["b_point_sample"].diff().loc[b_points.index[2]]
)
idx_nan = b_points["b_point_sample"].isna()
if idx_nan.sum() > 0:
idx_nan = list(b_points.index[idx_nan])
missing_str = (
f"Either 'r_peak' or 'c_point' contains NaN at positions {idx_nan}! The B-points were set to NaN."
)
if self.handle_missing_events == "warn":
warnings.warn(missing_str)
elif self.handle_missing_events == "raise":
raise EventExtractionError(missing_str)
b_points = b_points.astype({"b_point_sample": "Int64", "nan_reason": "object"})
is_b_point_dataframe(b_points)
self.points_ = b_points
return self
@staticmethod
def _get_a_point(icg: pd.Series, search_interval: int, c_point: int):
signal_interval = icg.iloc[(c_point - search_interval) : c_point]
signal_minima = argrelmin(signal_interval.values, mode="wrap")
a_point_candidates = signal_interval.iloc[signal_minima[0]]
if len(a_point_candidates) == 0:
# no local minima found => return the argmin of the interval
return np.argmin(signal_interval)
a_point_idx = np.argmin(a_point_candidates)
a_point = signal_minima[0][a_point_idx]
return a_point
@staticmethod
def _get_most_prominent_monotonic_increasing_segment(icg_segment: pd.Series, height: int):
icg_segment = icg_segment.copy()
icg_2nd_der_segment = np.gradient(icg_segment)
icg_segment.index = np.arange(0, len(icg_segment))
monotony_df = pd.DataFrame(icg_segment.values, columns=["icg"])
monotony_df = monotony_df.assign(**{"2nd_der": icg_2nd_der_segment, "borders": pd.NA})
# A-Point is a possible start of the monotonic segment
monotony_df.loc[monotony_df.index[0], "borders"] = "start_increase"
# C-Point is a possible end of the monotonic segment
monotony_df.loc[monotony_df.index[-1], "borders"] = "end_increase"
neg_pos_change_idx = np.where(np.diff(np.sign(monotony_df["2nd_der"])) > 0)[0]
monotony_df.loc[monotony_df.index[neg_pos_change_idx], "borders"] = "start_increase"
pos_neg_change_idx = np.where(np.diff(np.sign(monotony_df["2nd_der"])) < 0)[0]
monotony_df.loc[monotony_df.index[pos_neg_change_idx], "borders"] = "end_increase"
# drop all samples that are no possible start-/ end-points
monotony_df = monotony_df.drop(monotony_df[pd.isna(monotony_df["borders"])].index)
monotony_df = monotony_df.reset_index()
# Drop start- and corresponding end-point if their start value is higher than 1/2 of H
start_index_drop_rule_a = monotony_df[
(monotony_df["borders"] == "start_increase") & (monotony_df["icg"] > height / 2)
].index
start_index_drop_rule_a = start_index_drop_rule_a.union(start_index_drop_rule_a + 1)
monotony_df = monotony_df.drop(index=start_index_drop_rule_a)
# Drop start- and corresponding end-point if their end values does not reach at least 2/3 of H
end_index_drop_rule_b = monotony_df[
(monotony_df["borders"] == "end_increase") & (monotony_df["icg"] < 2 * height / 3)
].index
end_index_drop_rule_b = end_index_drop_rule_b.union(end_index_drop_rule_b - 1)
monotony_df = monotony_df.drop(index=monotony_df.iloc[end_index_drop_rule_b].index)
# Select the monotonic segment with the highest amplitude difference
start_sample = 0
end_sample = 0
if len(monotony_df) > 2:
idx = np.argmax(monotony_df["icg"].diff())
start_sample = monotony_df["index"].iloc[idx - 1]
end_sample = monotony_df["index"].iloc[idx]
elif len(monotony_df) != 0:
start_sample = monotony_df["index"].iloc[0]
end_sample = monotony_df["index"].iloc[-1]
return start_sample, end_sample # That are not absolute positions yet
def _get_zero_crossings_3rd_derivative(
self, monotonic_segment_3rd_der: pd.DataFrame, monotonic_segment_2nd_der: pd.DataFrame, height: int
):
constraint = float(10 * height / self.scaling_factor)
zero_crossings = np.where(np.diff(np.signbit(monotonic_segment_3rd_der["3rd_der"])))[0]
zero_crossings = pd.DataFrame(zero_crossings, columns=["sample_position"])
# Discard zero_crossings with negative to positive sign change
significant_crossings = zero_crossings.drop(
zero_crossings[monotonic_segment_2nd_der.iloc[zero_crossings["sample_position"]].to_numpy() < 0].index,
axis=0,
)
# Discard zero crossings with slope higher than 10*H/scaling_factor
significant_crossings = significant_crossings.drop(
significant_crossings[
monotonic_segment_2nd_der.iloc[significant_crossings["sample_position"]].to_numpy() >= constraint
].index,
axis=0,
)
if isinstance(zero_crossings, type(None)) or len(zero_crossings) == 0:
return pd.DataFrame([0], columns=["sample_position"])
return significant_crossings
def _get_local_maxima_3rd_derivative(self, monotonic_segment_3rd_der: pd.DataFrame, height: int):
constraint = float(4 * height / self.scaling_factor)
# compute gradient
monotonic_segment_3rd_der_gradient = np.gradient(monotonic_segment_3rd_der.squeeze())
# find zero-crossings of the gradient to determine local maxima
local_maxima = np.where(np.diff(np.sign(monotonic_segment_3rd_der_gradient)) < 0)[0]
# add 1 to the index to get the correct position
local_maxima += 1
local_maxima = pd.DataFrame(local_maxima, columns=["sample_position"])
significant_maxima = local_maxima.drop(
local_maxima[
monotonic_segment_3rd_der["3rd_der"].iloc[local_maxima["sample_position"]].to_numpy() < constraint
].index,
axis=0,
)
if isinstance(significant_maxima, type(None)):
return pd.DataFrame([0], columns=["sample_position"])
if len(significant_maxima) == 0:
return pd.DataFrame([0], columns=["sample_position"])
return significant_maxima