-
Notifications
You must be signed in to change notification settings - Fork 320
Expand file tree
/
Copy pathbasic.py
More file actions
256 lines (204 loc) · 5.74 KB
/
basic.py
File metadata and controls
256 lines (204 loc) · 5.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import numpy as np
import pandas as pd
from scipy import signal
from wfdb.io.annotation import Annotation
def resample_ann(ann_sample, fs, fs_target):
"""
Compute the new annotation indices.
Parameters
----------
ann_sample : ndarray
Array of annotation locations.
fs : int
The starting sampling frequency.
fs_target : int
The desired sampling frequency.
Returns
-------
ndarray
Array of resampled annotation locations.
"""
ratio = fs_target / fs
return (ratio * ann_sample).astype(np.int64)
def resample_sig(x, fs, fs_target):
"""
Resample a signal to a different frequency.
Parameters
----------
x : ndarray
Array containing the signal.
fs : int, float
The original sampling frequency.
fs_target : int, float
The target frequency.
Returns
-------
resampled_x : ndarray
Array of the resampled signal values.
resampled_t : ndarray
Array of the resampled signal locations.
"""
t = np.arange(x.shape[0]).astype("float64")
if fs == fs_target:
return x, t
new_length = int(x.shape[0] * fs_target / fs)
# Resample the array if NaN values are present
if np.isnan(x).any():
x = pd.Series(x.reshape((-1,))).interpolate().values
resampled_x, resampled_t = signal.resample(x, num=new_length, t=t)
assert (
resampled_x.shape == resampled_t.shape
and resampled_x.shape[0] == new_length
)
assert np.all(np.diff(resampled_t) > 0)
return resampled_x, resampled_t
def resample_singlechan(x, ann, fs, fs_target):
"""
Resample a single-channel signal with its annotations.
Parameters
----------
x: ndarray
The signal array.
ann : WFDB Annotation
The WFDB annotation object.
fs : int, float
The original frequency.
fs_target : int, float
The target frequency.
Returns
-------
resampled_x : ndarray
Array of the resampled signal values.
resampled_ann : WFDB Annotation
Annotation containing resampled annotation locations.
"""
resampled_x, _ = resample_sig(x, fs, fs_target)
new_sample = resample_ann(ann.sample, fs, fs_target)
resampled_ann = Annotation(
record_name=ann.record_name,
extension=ann.extension,
sample=new_sample,
symbol=ann.symbol,
subtype=ann.subtype,
chan=ann.chan,
num=ann.num,
aux_note=ann.aux_note,
fs=fs_target,
)
return resampled_x, resampled_ann
def resample_multichan(xs, ann, fs, fs_target, resamp_ann_chan=0):
"""
Resample multiple channels with their annotations.
Parameters
----------
xs: ndarray
The signal array.
ann : WFDB Annotation
The WFDB annotation object.
fs : int, float
The original frequency.
fs_target : int, float
The target frequency.
resample_ann_channel : int, optional
The signal channel used to compute new annotation indices.
Returns
-------
ndarray
Array of the resampled signal values.
resampled_ann : WFDB Annotation
Annotation containing resampled annotation locations.
"""
assert resamp_ann_chan < xs.shape[1]
lx = []
for chan in range(xs.shape[1]):
resampled_x, _ = resample_sig(xs[:, chan], fs, fs_target)
lx.append(resampled_x)
new_sample = resample_ann(ann.sample, fs, fs_target)
resampled_ann = Annotation(
record_name=ann.record_name,
extension=ann.extension,
sample=new_sample,
symbol=ann.symbol,
subtype=ann.subtype,
chan=ann.chan,
num=ann.num,
aux_note=ann.aux_note,
fs=fs_target,
)
return np.column_stack(lx), resampled_ann
def normalize_bound(sig, lb=0, ub=1):
"""
Normalize a signal between the lower and upper bound.
Parameters
----------
sig : ndarray
Original signal to be normalized.
lb : int, float, optional
Lower bound.
ub : int, float, optional
Upper bound.
Returns
-------
ndarray
Normalized signal.
"""
mid = ub - (ub - lb) / 2
min_v = np.min(sig)
max_v = np.max(sig)
mid_v = max_v - (max_v - min_v) / 2
coef = (ub - lb) / (max_v - min_v)
return sig * coef - (mid_v * coef) + mid
def smooth(sig, window_size):
"""
Apply a uniform moving average filter to a signal.
Parameters
----------
sig : ndarray
The signal to smooth.
window_size : int
The width of the moving average filter.
Returns
-------
ndarray
The convolved input signal with the desired box waveform.
"""
box = np.ones(window_size) / window_size
return np.convolve(sig, box, mode="same")
def get_filter_gain(b, a, f_gain, fs):
"""
Given filter coefficients, return the gain at a particular
frequency.
Parameters
----------
b : list
List of linear filter b coefficients.
a : list
List of linear filter a coefficients.
f_gain : int, float, optional
The frequency at which to calculate the gain.
fs : int, float, optional
The sampling frequency of the system.
Returns
-------
gain : int, float
The passband gain at the desired frequency.
"""
# Save the passband gain
w, h = signal.freqz(b, a)
w_gain = f_gain * 2 * np.pi / fs
ind = np.where(w >= w_gain)[0][0]
gain = abs(h[ind])
return gain
def normalize(X):
"""
Scale input vector to unit norm (vector length).
Parameters
----------
X : ndarray
The vector to normalize.
Returns
-------
ndarray
The normalized vector.
"""
return X / np.linalg.norm(X)