Source code for biopsykit.signals.ecg.segmentation._heartbeat_segmentation_neurokit
import warnings
import neurokit2 as nk
import numpy as np
import pandas as pd
from tpcp import Parameter
from biopsykit.signals._base_extraction import HANDLE_MISSING_EVENTS, CanHandleMissingEventsMixin
from biopsykit.signals.ecg.segmentation._base_segmentation import BaseHeartbeatSegmentation
__all__ = ["HeartbeatSegmentationNeurokit"]
from biopsykit.utils.array_handling import sanitize_input_dataframe_1d
from biopsykit.utils.dtypes import (
EcgRawDataFrame,
HeartbeatSegmentationDataFrame,
is_ecg_raw_dataframe,
is_heartbeat_segmentation_dataframe,
)
from biopsykit.utils.exceptions import EventExtractionError
[docs]class HeartbeatSegmentationNeurokit(BaseHeartbeatSegmentation, CanHandleMissingEventsMixin):
"""Segment ECG signal into individual heartbeats based on NeuroKit2.
This algorithm segments the ECG signal into individual heartbeats based on the R-peaks detected by Neurokit's
``ecg_peaks`` function. The start of each heartbeat is determined based on the R-peak and the current RR-interval.
For more information on NeuroKit2, see [Mak21]_.
Parameters
----------
variable_length : bool, optional
``True`` if extracted heartbeats should have variable length (depending on the current RR-interval) or
``False`` if extracted heartbeats should have fixed length (same length for all heartbeats, depending
on the mean heartrate of the complete signal, 35% of mean heartrate in seconds before R-peak and 50%
after r_peak, see :func:`neurokit2.ecg_segment` for details).
For variable length heartbeats, the start of the next heartbeat follows directly after end of last
(ends exclusive); For fixed length heartbeats, there might be spaces between heartbeat borders, or they
might overlap. Default: ``True``
start_factor : float, optional
only needed if ``variable_length=True``. This parameter defines where the start border between heartbeats
is set depending on the RR-interval to previous heartbeat. For example, ``start_factor=0.35`` means that
the beat start is set at 35% of current RR-distance before the R-peak of the beat
r_peak_detection_method : str, optional
Method to detect R-peaks that is passed to :func:`neurokit2.ecg_peaks`. Default: "neurokit"
handle_missing_events : one of {"warn", "raise", "ignore"}, optional
How to handle missing data in the input dataframes. Default: "warn"
Attributes
----------
heartbeat_list_ : :class:`~biopsykit.utils.dtypes.HeartbeatSegmentationDataFrame`
DataFrame containing the segmented heartbeats with the following columns:
- ``start_time``: Start time of the heartbeat
- ``start_sample``: Start sample of the heartbeat
- ``end_sample``: End sample of the heartbeat
- ``r_peak_sample``: Sample of the R-peak of the heartbeat
- ``rr_interval_sample``: RR-interval to the next heartbeat in samples
- ``rr_interval_ms``: RR-interval to the next heartbeat in milliseconds
References
----------
.. [Mak21] Makowski, D., Pham, T., Lau, Z. J., Brammer, J. C., Lesspinasse, F., Pham, H., Schölzel, C., & S.H. Chen
(2021). NeuroKit2: A Python Toolbox for Neurophysiological Signal Processing. Behavior Research Methods.
https://doi.org/10.3758/s13428-020-01516-y
"""
_action_methods = "extract"
# input parameters
variable_length: Parameter[bool]
start_factor: Parameter[float]
r_peak_detection_method: Parameter[str]
# result
heartbeat_list_: HeartbeatSegmentationDataFrame
def __init__(
self,
*,
variable_length: bool = True,
start_factor: float = 0.35,
r_peak_detection_method: str = "neurokit",
handle_missing_events: HANDLE_MISSING_EVENTS = "warn",
):
"""Initialize new ``HeartbeatSegmentationNeurokit`` algorithm instance.
Parameters
----------
variable_length : bool, optional
``True`` if extracted heartbeats should have variable length (depending on the current RR-interval) or
``False`` if extracted heartbeats should have fixed length (same length for all heartbeats, depending
on the mean heartrate of the complete signal, 35% of mean heartrate in seconds before R-peak and 50%
after r_peak, see :func:`neurokit2.ecg_segment` for details).
For variable length heartbeats, the start of the next heartbeat follows directly after end of last
(ends exclusive); For fixed length heartbeats, there might be spaces between heartbeat borders, or they
might overlap. Default: ``True``
start_factor : float, optional
only needed if ``variable_length=True``. This parameter defines where the start border between heartbeats
is set depending on the RR-interval to previous heartbeat. For example, ``start_factor=0.35`` means that
the beat start is set at 35% of current RR-distance before the R-peak of the beat
r_peak_detection_method : str, optional
Method to detect R-peaks that is passed to :func:`neurokit2.ecg_peaks`. Default: "neurokit"
handle_missing_events : one of {"warn", "raise", "ignore"}, optional
How to handle missing data in the input dataframes. Default: "warn"
"""
self.variable_length = variable_length
self.start_factor = start_factor
self.r_peak_detection_method = r_peak_detection_method
super().__init__(handle_missing_events=handle_missing_events)
# @make_action_safe
[docs] def extract( # noqa: PLR0915, PLR0912, C901
self,
*,
ecg: EcgRawDataFrame,
sampling_rate_hz: float,
):
"""Segment ECG signal into heartbeats.
The function uses R-peak detection to segment the ECG signal into heartbeats. The start of each heartbeat is
determined based on the R-peak and the current RR-interval.
The results (start and end sample, R-peak sample, current RR-interval in samples and milliseconds) are saved
in the ``heartbeat_list_`` attribute.
Parameters
----------
ecg : :class:`~pandas.Series` or :class:`~pandas.DataFrame`
ECG signal
sampling_rate_hz : int
Sampling rate of ECG signal in hz
Returns
-------
self
Raises
------
:exc:`~biopsykit.utils.exceptions.EventExtractionError`
If the event extraction fails and ``handle_missing`` is set to "raise"
"""
is_ecg_raw_dataframe(ecg)
ecg = sanitize_input_dataframe_1d(ecg, column="ECG")
heartbeats = pd.DataFrame(
columns=["start_sample", "end_sample", "r_peak_sample"],
)
heartbeats.index.name = "heartbeat_id"
heartbeats = heartbeats.astype("Int64")
if ecg.empty:
is_heartbeat_segmentation_dataframe(heartbeats)
self.heartbeat_list_ = heartbeats
missing_str = "No ECG signal found, no heartbeats can be segmented!"
if self.handle_missing_events == "warn":
warnings.warn(missing_str)
elif self.handle_missing_events == "raise":
raise EventExtractionError(missing_str)
return self
_, r_peaks = nk.ecg_peaks(ecg, sampling_rate=int(sampling_rate_hz), method=self.r_peak_detection_method)
r_peaks = r_peaks["ECG_R_Peaks"]
if len(r_peaks) < 2:
# no r-peaks were detected, so no heartbeats can be segmented
# clear dataframe
is_heartbeat_segmentation_dataframe(heartbeats)
missing_str = "Not sufficient R-peaks were detected, so no heartbeats can be segmented!"
self.heartbeat_list_ = heartbeats
if self.handle_missing_events == "warn":
warnings.warn(missing_str)
elif self.handle_missing_events == "raise":
raise EventExtractionError(missing_str)
return self
heartbeats = pd.DataFrame(
index=pd.Index(np.arange(0, len(r_peaks)), name="heartbeat_id"),
columns=["start_sample", "end_sample", "r_peak_sample"],
)
heartbeats = heartbeats.assign(r_peak_sample=r_peaks)
# save RR-interval to successive heartbeat
heartbeats = heartbeats.assign(rr_interval_sample=np.abs(heartbeats["r_peak_sample"].diff(periods=-1)))
if self.variable_length:
# split ecg signal into heartbeats with varying length
rr_interval_samples = heartbeats["r_peak_sample"].diff()
# calculate start of each heartbeat based on corresponding R-peak and current RR-interval
beat_starts = heartbeats["r_peak_sample"] - self.start_factor * rr_interval_samples
# extrapolate first beats start based on RR-interval of next beat
first_beat_start = heartbeats["r_peak_sample"].iloc[0] - self.start_factor * rr_interval_samples.iloc[1]
if first_beat_start >= 0:
beat_starts.iloc[0] = first_beat_start
else:
beat_starts = beat_starts.iloc[1:].reset_index(drop=True) # drop row if heartbeat is incomplete
heartbeats = heartbeats.iloc[1:].reset_index(drop=True)
beat_starts = round(beat_starts).astype(int)
# calculate beat ends (last beat ends 1 sample before next starts, end is exclusive)
beat_ends = beat_starts.shift(-1) # end is exclusive
# extrapolate last beats end based on RR-interval of previous beat
last_beat_end = round(
heartbeats["r_peak_sample"].iloc[-1] + (1 - self.start_factor) * rr_interval_samples.iloc[-1]
)
if last_beat_end < len(ecg):
beat_ends.iloc[-1] = last_beat_end
else:
# drop the last beat if it is incomplete
heartbeats = heartbeats.iloc[:-1]
beat_ends = beat_ends.iloc[:-1]
beat_starts = beat_starts.iloc[:-1]
beat_ends = beat_ends.astype(int)
# extract time of each beat's start
beat_starts_time = ecg.iloc[beat_starts].index
heartbeats = heartbeats.assign(start_sample=beat_starts, end_sample=beat_ends, start_time=beat_starts_time)
else:
# split ecg signal into heartbeats with fixed length
heartbeat_segments = nk.ecg_segment(ecg, rpeaks=r_peaks, sampling_rate=int(sampling_rate_hz), show=False)
heartbeat_segments_new = {int(k) - 1: v for k, v in heartbeat_segments.items()}
heartbeat_segments_new = pd.concat(heartbeat_segments_new, names=["heartbeat_id"])
heartbeat_segments_new = heartbeat_segments_new.groupby("heartbeat_id").agg(
start_sample=("Index", "first"),
end_sample=("Index", "last"),
start_time=("Index", lambda s: ecg.index[s.iloc[0]]),
)
# fill the empty columns of heartbeats with the start_sample, end_sample, and start_time of
# heartbeat_segments_new
heartbeats.update(heartbeat_segments_new)
heartbeats = heartbeats.assign(start_time=heartbeat_segments_new["start_time"])
# check if R-peak occurs between corresponding start and end
check = heartbeats.apply(lambda x: x["start_sample"] < x["r_peak_sample"] < x["end_sample"], axis=1)
if len(check.loc[~check]) > 0:
raise ValueError(
f"Start/end/R-peak position of heartbeat {list(check.loc[check is False].index)} could be incorrect!"
)
# ensure that index is Int64Index (not RangeIndex) because some neurokit functions won't work with RangeIndex
heartbeats.index = list(heartbeats.index)
heartbeats.index.name = "heartbeat_id"
heartbeats = heartbeats.assign(rr_interval_ms=heartbeats["rr_interval_sample"] / sampling_rate_hz * 1000)
# ensure correct column order
heartbeats = heartbeats[
["start_time", "start_sample", "end_sample", "r_peak_sample", "rr_interval_sample", "rr_interval_ms"]
]
heartbeats = heartbeats.astype(
{
"start_sample": "Int64",
"end_sample": "Int64",
"r_peak_sample": "Int64",
"rr_interval_sample": "Int64",
"rr_interval_ms": "Float64",
}
)
is_heartbeat_segmentation_dataframe(heartbeats)
self.heartbeat_list_ = heartbeats
return self