公司的纯 Flutter 应用近期要添加一个根据 BLE 心电设备采集到信息实时显示使用者心率的功能,简单研究后发现需要实现一个 QRS波检测算法,记录下研究和实现的过程。

DEMO效果演示

前期调查

  1. 由于 App 已经实现了实时 ECG 数据的采集和图形绘制:
    ecg.webp
    所以最早的想法是每隔一段时间,将之前采集到的一段时间内的数据进行一次波峰识别,从而得到某几次心跳的总间隔,再计算得出平均的心率(BPM)。
    于是关于波峰识别,主要查看了下面两篇文章:

    看完后感觉这两种算法并不能很好地识别 ECG 心电波形

  2. 反编译了设备供应商提供的安卓 SDK 后,发现其用于实时心率计算的功能是利用一个名为 QRSDetUtil 的工具类调用 jni 实现的,并有如下参数定义:
    sdk.webp
    猜测 QRSDet 的全称应该是 qrs detect,通过搜索得知 QRS波识别 是心电信号处理的通用方法,并找到前辈的总结:
    QRS波检测算法集锦(含源代码)
    但是这里总结的算法给出的源码大多数都是 MATLAB 实现的,不是很能看得懂,而且手里也没有现成的环境,不想花费过多精力在这上面 😂

  3. 再以 qrs detector 为关键词在 GitHub 上进行搜索,找到了如下项目:
    Python Online and Offline ECG QRS Detector based on the Pan-Tomkins algorithm
    该项目是基于最经典的Pan-Tomkins算法实现的,分为离线分析和实时分析两个部分,用的语言也是我还算比较熟悉的 python,如果判断可用的话用 dart 语言移植到 Flutter 上相对来说更有把握,所以下面开始尝试基于这个项目和我们 App 实际采集到的 ECG 数据进行测试。

测试 qrs_detector 项目

运行项目自带例子

克隆项目源码,安装依赖

项目中 requirements.txt 配置的某些依赖版本过旧可能已经无法安装,需要手动处理一下,最终实际安装的项目依赖版本如下:
requirements.webp

运行

依赖安装完成后运行应该是会报错的:
error.webp
报错提醒来自 scipy 包下的 “滤波器设计” 函数,提示参数 Wn 的值应该大于 0 小于 1,猜测是 scipy 包版本升级后新增的限制。跟踪调用栈,得知应修改如下位置的参数定义:
error_fix_1.webp
将上图中 self.filter_lowcut 的值从 0.0 改为 0.001,再次运行代码即可得到结果,图中黑色的标记点即为算法识别出的波峰:
demo1.webp

P.S. 该问题后来在ECG-QRS检测参考代码这篇博客中也见到了一样的 fix

测试公司数据

准备数据

参考项目中 ecg_data/ecg_data_1.csv 的数据格式,截取公司产品采集到的 ECG 数据创建 ecg_data_3.csv,运行后得到如下结果:
demo2.webp
可以看到程序运行正常,但没能识别出任何一个波峰 😨

修复问题

猜测这个问题可能和程序中预设的某个或某些参数有关。经过一段时间的分析和尝试,对比上面两张图的倒数第二个波形示意,发现项目自带数据形成的处理后的波峰的峰值都在 0.6 以上,而我们数据经过相同的处理,最大峰值不超过 0.07,再配合源码分析和 debugger 跟踪,发现 “寻找波峰” 的方法中有如下判断:
findpeaks_limit.webp
只有峰值大于给定限制的波峰才会被识别出来,而源码中预设的阈值 findpeaks_limit0.35,大约为多数波峰峰值的一半,于是这里根据我们数据的情况,将这个限制值改为 0.04,再次运行程序即可得到预期的结果:
demo3.webp

至此,足以判断该项目的方法确实可以用于我们产品的心率识别需求

修改在线版本

根据离线分析版本的经验,修改 QRSDetectorOnline.py。原项目的在线版本需要通过数据线连接指定的 ECG 设备,这里我将相关代码移除,并通过读取由我们公司设备采集得来的 data.txt来模拟数据输入,完成了实时分析版本代码的测试,后续即根据这个版本的代码进行移植工作。

以上的所有修改可以参考该仓库:debuggerx01/qrs_detector

代码移植

搭建测试 DEMO

为了方便开发调试和直观地看到效果,我先用 dart 自带的 WebSocket server 实现了一个简单的、连接后自动间隔向客户端发送 ECG 数据的服务器(代码参考:dart:io library);然后用 ECharts 写了个简单的静态页面作为客户端,页面打开时连接本机已经打开的 ECG 数据模拟发送服务器,并实时将接收到的数据以图表形式绘制出来。
关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/// FileName: main.dart

import 'dart:async';
import 'dart:io';

/// 设置 ECG 数据的采样率
const SAMPLING_RATE = 250;

Future sendData(WebSocket webSocket) {
var file = File('data.txt');
var lineIndex = 0;
return file.readAsLines().then((value) {
/// 根据采样率算出ECG数据发送的时间间隔,定时向客户端发送数据
Timer.periodic(Duration(milliseconds: 1000 ~/ SAMPLING_RATE), (timer) {
if (webSocket.closeCode != null || lineIndex >= value.length)
return timer.cancel();
webSocket.add(value[lineIndex]);
lineIndex++;
});
});
}

main() async {
/// 监听本机9988端口,创建WebSocket服务器
HttpServer server = await HttpServer.bind('127.0.0.1', 9988);
server
.transform(new WebSocketTransformer())
.listen((WebSocket webSocket) async {
print('WebSocket opened.');
webSocket.listen(print, onDone: () {
print('WebSocket closed.');
});
/// 向客户端发送模拟ECG数据
sendData(webSocket);
});
print('Listening..');
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
<!--FileName: ecg_viewer/index.html-->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>ECG Viewer</title>
<script type="text/javascript" src="echarts.min.js"></script>
<style>
.container {
margin: auto;
width: 80vw;
height: 60vw;
}
</style>
</head>
<body>
<div id="container" class="container"></div>
</body>
<script>
// 设置图表显示心电图的最大数据量
const showLength = 1000;
// 初始化 echarts
const container = document.getElementById('container');
const myChart = echarts.init(container);
const _data = [];
// 初始化 echarts 的数据源
for (let i = 0; i < showLength; i++) {
_data[i] = [i, null];
}
let option = {
title: {
text: 'ECG Viewer'
},
xAxis: {
type: "value",
},
yAxis: {
type: "value",
max: -3.5,
min: -7,
},
series: [{
type: 'line',
data: _data,
smooth: false,
showSymbol: false,
itemStyle: {
normal: {
color: "black",
label:
{
show: true,
},
},
},
}]
};
myChart.setOption(option);
let _index = 0;

let ws = new WebSocket('ws://127.0.0.1:9988');
ws.onmessage = ev => {

_data[_index] = [_index, ev.data];
let __index;
// 将当前接受到点的后面20个数据设置为null,模拟刷新显示的效果
for (let i = 1; i < 20; i++) {
__index = (_index + i) % showLength;
_data[__index] = [__index, null];
}
_index++;
// 当显示满时重置数据源下标,实现重复从头绘制的效果
if (_index >= showLength - 1)
_index = 0;
// 每接收到三次数据进行一次重绘,降低资源占用,提高显示流畅度
if (_index % 3 === 0) {
option.series.data = _data;
myChart.setOption(option)
}
}

window.onresize = myChart.resize;

</script>
</html>

创建 qrs_detector.dart

参照 python 项目的模式,定义一个 QRSDetector 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class QRSDetector {
final int signalFrequency;
final double findPeaksLimit;
final Function handleDetection;

QRSDetector({
this.signalFrequency = 250,
this.findPeaksLimit = 0.04,
this.handleDetection,
});

void addPoint(int timeStamp, double point) {
...
}
}

构造函数可以设置 ECG 数据的采样率,波峰识别时用于判断的 findPeaksLimit,以及当识别到波峰时的回调函数 handleDetection;QRSDetector 的实例对象只对外暴露一个 void addPoint(int timeStamp, double point) 方法,用于接收实时心电数据。

接着就是根据 python 代码的逻辑,实现这个 QRSDetector 的具体功能。

移植过程的关键点&难点

双端队列deque

python 算法中的第一步就是将新添加的数据加入到名为 self.most_recent_measurementsdeque 中,然后用包括当前数据在内和其之前的 self.number_of_samples_stored 个数据进行各种计算,来判断是否存在波峰。
来自 python 标准库的 deque 可以设置一个长度,当添加的数据大于其长度时会将头部的数据 “挤出”,所以非常适合这里的需要;而且由于其实现原理,它具有很低时间和空间复杂度。
但是 dart 的标准库中就没有这么方便的结构了,所以只能用 Queue 来代替,并自行处理以上的逻辑:

1
2
3
4
5
6
7
Queue<double> _mostRecentMeasurements = Queue();

void _addToMostRecentMeasurements(double val) {
_mostRecentMeasurements.add(val);
if (_mostRecentMeasurements.length > NUMBER_OF_SAMPLES_STORED)
_mostRecentMeasurements.removeFirst();
}

使用时不要直接向 _mostRecentMeasurements 中插入数据,而是用 _addToMostRecentMeasurements 方法即可

带通滤波器 bandpass_filter 的移植

算法中数据添加进队列后,第一个操作就是对当前队列内的数据进行一次滤波处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from scipy.signal import butter, lfilter
……
def bandpass_filter(self, data, lowcut, highcut, signal_freq, filter_order):
"""
Method responsible for creating and applying Butterworth filter.
:param deque data: raw data
:param float lowcut: filter lowcut frequency value
:param float highcut: filter highcut frequency value
:param int signal_freq: signal frequency in samples per second (Hz)
:param int filter_order: filter order
:return array: filtered data
"""
"""Constructs signal filter and uses it to given data set."""
nyquist_freq = 0.5 * signal_freq
low = lowcut / nyquist_freq
high = highcut / nyquist_freq
b, a = butter(filter_order, [low, high], btype="band")
y = lfilter(b, a, data)
return y

来自 scipy.signal 巴特沃斯滤波器实现,先用 butter 进行滤波器生成滤波器系数,可以看到这里的类型 btype="band" 是一个带通滤波器,然后调用 lfilter 对数据序列进行 IIR滤波处理,相关文档参考这里:docs.scipy.org: scipy.signal.lfilter

简单看了下 scipy.signal 的滤波器实现源码,复杂到没信心移植,所以尝试找 dart 中的替代库。以IIR filter为关键词在 pub仓库 上搜索,发现有且只有这么一个库:
pub1.webp
看介绍倒确实是实现了 IIR 的bandpass带通滤波,于是加入项目尝试使用。但是发现一个问题:参考文档,这个库中bandpass 的函数签名为:

1
butterworth.bandPass(order,Samplingfreq,Center freq,Width in frequ);

参数用法和 python 项目中的完全不同,python 中的参数给的是带通滤波器的上下阈值(与采样率有关),而这里却要分别传入 SamplingfreqCenter freqWidth in frequ 三个参数,也就是说不能直接移植 python 的程序写法直接替换这个库的实现。

在查阅了许多资料后,也只得知 python 中 scipy.signal.butter 的设计和 Matlab 中同名函数的用法一致(Butterworth filter design),但实在找不到上面 iirjdart 库中那样定义的方法该如何使用,以及每个参数的含义……直到回过头来再读库的介绍,发现这样一段:

得知原来这个库本身就是对一个 java 库的移植;然后在 berndporr/iirj 这个库的 GitHub 中以 bandpass 为关键词搜索 Issue,发现这样一个讨论:#issue8
issue1.webp

提问者的情况和我类似,他是想把一段和我上面差不多的 Matlab 的代码翻译成 java 代码,不知道如何转换这些参数,而库的作者给出的解决方案是,不要直接用 bandpass 这个方法,而是将一对高通和低通滤波器串联,得到一样的效果。根据这个Issue,我将 dart 代码实现为:

1
2
3
4
5
6
7
8
List<double> _bandPassFilter() {
double nyquistFreq = signalFrequency / 2;
Butterworth butter = Butterworth();
butter.lowPass(1, 2, FILTER_LOW_CUT / nyquistFreq);
butter.highPass(1, 2, FILTER_HIGH_CUT / nyquistFreq);
return List.generate(_mostRecentMeasurements.length,
(index) => butter.filter(_mostRecentMeasurements.elementAt(index)));
}

终于解决了这个问题 😖

numpy.ediff1d (连续差分) 的实现

滤波后的下一个处理是对数据序列进行差分计算,获取两个连续元素之间的一维差异数组,具体的含义可以参考这个文章:Python Numpy np.ediff1d()用法及代码示例

这个函数来自 python 著名强大的数字处理库 numpy,dart 中可能也有类似的库,但是引用的话可能还要花功夫去学习其使用方法。这里在理解了连续差分函数 ediff1d 的作用之后,我直接用 dart 便准库中 List.generate 这个数组的工厂构造方法来实现类似的功能:

1
2
3
4
5
6
List<double> differentiatedEcgMeasurements = List.generate(
/// 由于是计算每两个数字的差值,最终生成的数组的长度就将会是原始数组的长度 -1
filteredEcgMeasurements.length - 1,
(index) =>
filteredEcgMeasurements[index + 1] -
filteredEcgMeasurements[index]);

数组平方

下一个处理的 python 代码是:

1
2
# Squaring - intensifies values received in derivative.
squared_ecg_measurements = differentiated_ecg_measurements ** 2

python 中有一个非常方便的操作符 ** ,可以快速实现乘方操作,这在我知道的大部分常用语言中都是没有的,基本都要通过语言内置的数学库里的 pow 函数实现。但是经过上面的代码可以得知 differentiated_ecg_measurements 是一个数值序列,对于这样一个类似数组的结构进行平方操作的结果是什么呢?猜测是对其中的每个数值进行乘方,但是并不能确定,简单跟下源码:
https://github.com/numpy/numpy/blob/fb215c76967739268de71aa4bda55dd1b062bc2e/numpy/lib/scimath.py#L434
确定确实和猜测一样,那么根据含义进行移植就很简单了

计算卷积和移动平均数

再下一个操作是用 移动窗口积分器 对平方后的序列进行处理:

1
2
# Moving-window integration.
integrated_ecg_measurements = np.convolve(squared_ecg_measurements, np.ones(self.integration_window))

这里 np.convolve(移动平均数) 和 np.ones(计算卷积) 两个函数同样来自 numpy库,但是这两个函数就不像 numpy.ediff1d 那么容易实现了……好在用 dart convolve 作为关键词在 Google 上搜索后,发现了 scidart 这个库,用完全一样的方式实现了这两个函数,直接引用替换即可

findpeaks 的实现移植

经过如上的各种处理,终于来到了关键的峰值识别部分,这里的 python 实现是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def findpeaks(self, data, spacing=1, limit=None):
"""
Janko Slavic peak detection algorithm and implementation.
https://github.com/jankoslavic/py-tools/tree/master/findpeaks
Finds peaks in `data` which are of `spacing` width and >=`limit`.
:param ndarray data: data
:param float spacing: minimum spacing to the next peak (should be 1 or more)
:param float limit: peaks should have value greater or equal
:return array: detected peaks indexes array
"""
len = data.size
x = np.zeros(len + 2 * spacing)
x[:spacing] = data[0] - 1.e-6
x[-spacing:] = data[-1] - 1.e-6
x[spacing:spacing + len] = data
peak_candidate = np.zeros(len)
peak_candidate[:] = True
for s in range(spacing):
start = spacing - s - 1
h_b = x[start: start + len] # before
start = spacing
h_c = x[start: start + len] # central
start = spacing + s + 1
h_a = x[start: start + len] # after
peak_candidate = np.logical_and(peak_candidate, np.logical_and(h_c > h_b, h_c > h_a))

ind = np.argwhere(peak_candidate)
ind = ind.reshape(ind.size)
if limit is not None:
ind = ind[data[ind] > limit]
return ind

这里的重点主要是:

  • 11~16行:
    对于输入序列,生成一个数组,其前 spacing 个数据是 输入序列第一个元素的值 - 1.e-6,后 spacing 个数据是 输入序列最后一个元素的值 - 1.e-6,中间则是原始输入序列的元素。

  • 21~28行:
    spacing 做 fori 循环,每次根据下标取新数组的三个部分,然后对这三部分的值逐个进行比较,实际就是找出比前后都大的值,也就是峰值,通过 logical_and(逻辑与) 对数组进行操作,将得到一个数组,其中为峰值的位置的值为 True,其余为 False

  • 30~33行:
    找出所有波峰所在位置的下标,然后判断该波峰的值是否大于 limit,大于的话则加入结果数组,最终返回所有找到的峰值的下标。

基于以上的理解,实现的 dart 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
List<int> _findPeaks(Array data) {
int len = data.length;
/// 这里还是利用 List.generate,在构造方法中利用下标判断返回不同的值,从而巧妙实现上面 11~16行的逻辑
List<double> x = List.generate(len + 2 * FIND_PEAKS_SPACING, (index) {
if (index < FIND_PEAKS_SPACING) return data.first - 1e-6;
if (index >= len + FIND_PEAKS_SPACING) return data.last - 1e-6;
return data[index - FIND_PEAKS_SPACING];
});
List<bool> peakCandidate = List.filled(len, true);

List<int> ind = [];
for (var s = 0; s < FIND_PEAKS_SPACING; s++) {
var start = FIND_PEAKS_SPACING - s - 1;
var hb = x.getRange(start, start + len);
start = FIND_PEAKS_SPACING;
var hc = x.getRange(start, start + len);
start = FIND_PEAKS_SPACING + s + 1;
var ha = x.getRange(start, start + len);

/// 由于缺少 pythons 那样直接逐个比较两个数组所有值的大小返回List<bool>的方法,这里只能用循环自己比较
for (var i = 0; i < len; i++) {
peakCandidate[i] = peakCandidate[i] &&
(hc.elementAt(i) > hb.elementAt(i)) &&
(hc.elementAt(i) > ha.elementAt(i));
if (s == FIND_PEAKS_SPACING - 1 &&
peakCandidate[i] &&
data[i] > findPeaksLimit) {
ind.add(i);
}
}
}
return ind;
}

到此为止,大部分算法逻辑的移植就差不多完成了,后面的代码相对都比较简单,无脑移植即可。识别出了每个波峰,也就得到了每次心跳发生的时间,经过简单的计算即可得到用户的实时心率了~

加入 DEMO 进行测试

QRSDetector 的算法逻辑实现时候,就可以添加到 DEMO 中测试效果了。将原本向客户端发送数据的方法改为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Future sendData(WebSocket webSocket) {
/// 使用指定参数创建 detector,当算法识别到波峰时向客户端发送 "beat" 信号
QRSDetector detector = QRSDetector(
signalFrequency: SAMPLING_RATE,
handleDetection: () {
webSocket.add("beat");
});

var file = File('data.txt');
var timeStamp = 0;
return file.readAsLines().then((value) {
return value.forEach((ele) {
webSocket.add(ele);
timeStamp += 4000;
/// 将数据通过 addPoint 方法加入识别器 detector
detector.addPoint(timeStamp, double.tryParse(ele));
sleep(Duration(milliseconds: 1000 ~/ SAMPLING_RATE));
});
});
}

这样就实现了一开始演示的效果:回到视频

源码地址:debuggerx01/ecg_viewer_dart_with_qrs_detector

性能优化

由于原始 python 工程的代码更多地是学习研究的目的,所以每一步代码都只做一个操作,而且生成了大量的中间临时对象和数组。而实际上某些操作是可以简化合并的,很多临时变量也可以不必创建,虽然会降低代码可读性,但可以提高执行效率:
https://github.com/debuggerx01/ecg_viewer_dart_with_qrs_detector/blob/performance/qrs_detector.dart
经过对比测试,这些修改大致可以使算法比没有优化前快约30%,而且在我机器上测试每次计算的耗时大约都在 0.5 毫秒以内,不得不说 dart 语言本身的性能还是相当不错的。