qrs_detector——Flutter应用中的心电心率识别
公司的纯 Flutter 应用近期要添加一个根据 BLE 心电设备采集到信息实时显示使用者心率的功能,简单研究后发现需要实现一个 QRS波检测算法,记录下研究和实现的过程。
DEMO效果演示
前期调查
由于 App 已经实现了实时 ECG 数据的采集和图形绘制:
所以最早的想法是每隔一段时间,将之前采集到的一段时间内的数据进行一次波峰识别,从而得到某几次心跳的总间隔,再计算得出平均的心率(BPM)。
于是关于波峰识别,主要查看了下面两篇文章:看完后感觉这两种算法并不能很好地识别 ECG 心电波形
反编译了设备供应商提供的安卓 SDK 后,发现其用于实时心率计算的功能是利用一个名为
QRSDetUtil
的工具类调用 jni 实现的,并有如下参数定义:
猜测QRSDet
的全称应该是qrs detect
,通过搜索得知QRS波识别
是心电信号处理的通用方法,并找到前辈的总结:
QRS波检测算法集锦(含源代码)
但是这里总结的算法给出的源码大多数都是 MATLAB 实现的,不是很能看得懂,而且手里也没有现成的环境,不想花费过多精力在这上面 😂再以
qrs detector
为关键词在 GitHub 上进行搜索,找到了如下项目:
Python Online and Offline ECG QRS Detector based on the Pan-Tomkins algorithm
该项目是基于最经典的Pan-Tomkins
算法实现的,分为离线分析和实时分析两个部分,用的语言也是我还算比较熟悉的 python,如果判断可用的话用 dart 语言移植到 Flutter 上相对来说更有把握,所以下面开始尝试基于这个项目和我们 App 实际采集到的 ECG 数据进行测试。
测试 qrs_detector 项目
运行项目自带例子
克隆项目源码,安装依赖
项目中 requirements.txt
配置的某些依赖版本过旧可能已经无法安装,需要手动处理一下,最终实际安装的项目依赖版本如下:
运行
依赖安装完成后运行应该是会报错的:
报错提醒来自 scipy
包下的 “滤波器设计” 函数,提示参数 Wn
的值应该大于 0 小于 1,猜测是 scipy 包版本升级后新增的限制。跟踪调用栈,得知应修改如下位置的参数定义:
将上图中 self.filter_lowcut
的值从 0.0
改为 0.001
,再次运行代码即可得到结果,图中黑色的标记点即为算法识别出的波峰:
P.S. 该问题后来在ECG-QRS检测参考代码这篇博客中也见到了一样的 fix
测试公司数据
准备数据
参考项目中 ecg_data/ecg_data_1.csv
的数据格式,截取公司产品采集到的 ECG 数据创建 ecg_data_3.csv
,运行后得到如下结果:
可以看到程序运行正常,但没能识别出任何一个波峰 😨
修复问题
猜测这个问题可能和程序中预设的某个或某些参数有关。经过一段时间的分析和尝试,对比上面两张图的倒数第二个波形示意,发现项目自带数据形成的处理后的波峰的峰值都在 0.6 以上,而我们数据经过相同的处理,最大峰值不超过 0.07,再配合源码分析和 debugger 跟踪,发现 “寻找波峰” 的方法中有如下判断:
只有峰值大于给定限制的波峰才会被识别出来,而源码中预设的阈值 findpeaks_limit
为 0.35
,大约为多数波峰峰值的一半,于是这里根据我们数据的情况,将这个限制值改为 0.04
,再次运行程序即可得到预期的结果:
至此,足以判断该项目的方法确实可以用于我们产品的心率识别需求 ✌
修改在线版本
根据离线分析版本的经验,修改 QRSDetectorOnline.py
。原项目的在线版本需要通过数据线连接指定的 ECG 设备,这里我将相关代码移除,并通过读取由我们公司设备采集得来的 data.txt
来模拟数据输入,完成了实时分析版本代码的测试,后续即根据这个版本的代码进行移植工作。
以上的所有修改可以参考该仓库:debuggerx01/qrs_detector
代码移植
搭建测试 DEMO
为了方便开发调试和直观地看到效果,我先用 dart 自带的 WebSocket server 实现了一个简单的、连接后自动间隔向客户端发送 ECG 数据的服务器(代码参考:dart:io library);然后用 ECharts 写了个简单的静态页面作为客户端,页面打开时连接本机已经打开的 ECG 数据模拟发送服务器,并实时将接收到的数据以图表形式绘制出来。
关键代码:
1 | /// FileName: main.dart |
1 | <!--FileName: ecg_viewer/index.html--> |
创建 qrs_detector.dart
参照 python 项目的模式,定义一个 QRSDetector 类:
1 | class QRSDetector { |
构造函数可以设置 ECG 数据的采样率,波峰识别时用于判断的 findPeaksLimit
,以及当识别到波峰时的回调函数 handleDetection
;QRSDetector 的实例对象只对外暴露一个 void addPoint(int timeStamp, double point)
方法,用于接收实时心电数据。
接着就是根据 python 代码的逻辑,实现这个 QRSDetector 的具体功能。
移植过程的关键点&难点
双端队列deque
python 算法中的第一步就是将新添加的数据加入到名为 self.most_recent_measurements
的 deque 中,然后用包括当前数据在内和其之前的 self.number_of_samples_stored
个数据进行各种计算,来判断是否存在波峰。
来自 python 标准库的 deque 可以设置一个长度,当添加的数据大于其长度时会将头部的数据 “挤出”,所以非常适合这里的需要;而且由于其实现原理,它具有很低时间和空间复杂度。
但是 dart 的标准库中就没有这么方便的结构了,所以只能用 Queue 来代替,并自行处理以上的逻辑:
1 | Queue<double> _mostRecentMeasurements = Queue(); |
使用时不要直接向
_mostRecentMeasurements
中插入数据,而是用 _addToMostRecentMeasurements
方法即可
带通滤波器 bandpass_filter 的移植
算法中数据添加进队列后,第一个操作就是对当前队列内的数据进行一次滤波处理:
1 | from scipy.signal import butter, lfilter |
来自 scipy.signal
巴特沃斯滤波器实现,先用 butter
进行滤波器生成滤波器系数,可以看到这里的类型 btype="band"
是一个带通滤波器,然后调用 lfilter
对数据序列进行 IIR
滤波处理,相关文档参考这里:docs.scipy.org: scipy.signal.lfilter
简单看了下 scipy.signal
的滤波器实现源码,复杂到没信心移植,所以尝试找 dart 中的替代库。以IIR filter
为关键词在 pub仓库 上搜索,发现有且只有这么一个库:
看介绍倒确实是实现了 IIR 的bandpass
带通滤波,于是加入项目尝试使用。但是发现一个问题:参考文档,这个库中bandpass
的函数签名为:
1 | butterworth.bandPass(order,Samplingfreq,Center freq,Width in frequ); |
参数用法和 python 项目中的完全不同,python 中的参数给的是带通滤波器的上下阈值(与采样率有关),而这里却要分别传入 Samplingfreq
、Center freq
、Width in frequ
三个参数,也就是说不能直接移植 python 的程序写法直接替换这个库的实现。
在查阅了许多资料后,也只得知 python 中 scipy.signal.butter
的设计和 Matlab 中同名函数的用法一致(Butterworth filter design),但实在找不到上面 iirjdart
库中那样定义的方法该如何使用,以及每个参数的含义……直到回过头来再读库的介绍,发现这样一段:
- This library is a porting in Dart of the famous iirj library by berndporr.
得知原来这个库本身就是对一个 java 库的移植;然后在 berndporr/iirj 这个库的 GitHub 中以 bandpass
为关键词搜索 Issue,发现这样一个讨论:#issue8:
提问者的情况和我类似,他是想把一段和我上面差不多的 Matlab 的代码翻译成 java 代码,不知道如何转换这些参数,而库的作者给出的解决方案是,不要直接用 bandpass
这个方法,而是将一对高通和低通滤波器串联,得到一样的效果。根据这个Issue,我将 dart 代码实现为:
1 | List<double> _bandPassFilter() { |
终于解决了这个问题 😖
numpy.ediff1d (连续差分) 的实现
滤波后的下一个处理是对数据序列进行差分计算,获取两个连续元素之间的一维差异数组,具体的含义可以参考这个文章:Python Numpy np.ediff1d()用法及代码示例
这个函数来自 python 著名强大的数字处理库 numpy
,dart 中可能也有类似的库,但是引用的话可能还要花功夫去学习其使用方法。这里在理解了连续差分函数 ediff1d 的作用之后,我直接用 dart 便准库中 List.generate 这个数组的工厂构造方法来实现类似的功能:
1 | List<double> differentiatedEcgMeasurements = List.generate( |
数组平方
下一个处理的 python 代码是:
1 | # Squaring - intensifies values received in derivative. |
python 中有一个非常方便的操作符 **
,可以快速实现乘方操作,这在我知道的大部分常用语言中都是没有的,基本都要通过语言内置的数学库里的 pow
函数实现。但是经过上面的代码可以得知 differentiated_ecg_measurements
是一个数值序列,对于这样一个类似数组的结构进行平方操作的结果是什么呢?猜测是对其中的每个数值进行乘方,但是并不能确定,简单跟下源码:
https://github.com/numpy/numpy/blob/fb215c76967739268de71aa4bda55dd1b062bc2e/numpy/lib/scimath.py#L434
确定确实和猜测一样,那么根据含义进行移植就很简单了
计算卷积和移动平均数
再下一个操作是用 移动窗口积分器
对平方后的序列进行处理:
1 | # Moving-window integration. |
这里 np.convolve
(移动平均数) 和 np.ones
(计算卷积) 两个函数同样来自 numpy库,但是这两个函数就不像 numpy.ediff1d
那么容易实现了……好在用 dart convolve
作为关键词在 Google 上搜索后,发现了 scidart 这个库,用完全一样的方式实现了这两个函数,直接引用替换即可 ✌
findpeaks 的实现移植
经过如上的各种处理,终于来到了关键的峰值识别部分,这里的 python 实现是:
1 | def findpeaks(self, data, spacing=1, limit=None): |
这里的重点主要是:
11~16行:
对于输入序列,生成一个数组,其前spacing
个数据是输入序列第一个元素的值 - 1.e-6
,后spacing
个数据是输入序列最后一个元素的值 - 1.e-6
,中间则是原始输入序列的元素。21~28行:
对spacing
做 fori 循环,每次根据下标取新数组的三个部分,然后对这三部分的值逐个进行比较,实际就是找出比前后都大的值,也就是峰值,通过logical_and(逻辑与)
对数组进行操作,将得到一个数组,其中为峰值的位置的值为True
,其余为False
。30~33行:
找出所有波峰所在位置的下标,然后判断该波峰的值是否大于limit
,大于的话则加入结果数组,最终返回所有找到的峰值的下标。
基于以上的理解,实现的 dart 代码如下:
1 | List<int> _findPeaks(Array data) { |
到此为止,大部分算法逻辑的移植就差不多完成了,后面的代码相对都比较简单,无脑移植即可。识别出了每个波峰,也就得到了每次心跳发生的时间,经过简单的计算即可得到用户的实时心率了~
加入 DEMO 进行测试
QRSDetector
的算法逻辑实现时候,就可以添加到 DEMO 中测试效果了。将原本向客户端发送数据的方法改为:
1 | Future sendData(WebSocket webSocket) { |
这样就实现了一开始演示的效果:回到视频
源码地址:debuggerx01/ecg_viewer_dart_with_qrs_detector
性能优化
由于原始 python 工程的代码更多地是学习研究的目的,所以每一步代码都只做一个操作,而且生成了大量的中间临时对象和数组。而实际上某些操作是可以简化合并的,很多临时变量也可以不必创建,虽然会降低代码可读性,但可以提高执行效率:
https://github.com/debuggerx01/ecg_viewer_dart_with_qrs_detector/blob/performance/qrs_detector.dart
经过对比测试,这些修改大致可以使算法比没有优化前快约30%,而且在我机器上测试每次计算的耗时大约都在 0.5 毫秒以内,不得不说 dart 语言本身的性能还是相当不错的。