Skip to content

Commit 527d6db

Browse files
authored
Merge pull request #1 from jolin-io/feature/rqatrend_matrix
feat: add rqatrend matrix variant and dask variant
2 parents e05c76a + fbd8e49 commit 527d6db

File tree

3 files changed

+134
-1
lines changed

3 files changed

+134
-1
lines changed

pyproject.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ authors = [
1414
readme = "README.md"
1515
requires-python = ">=3.8"
1616
dependencies = [
17-
"numpy"
17+
"numpy",
18+
"dask"
1819
]
1920
classifiers = [
2021
"Programming Language :: Python :: 3",
@@ -35,3 +36,8 @@ include-package-data = true
3536

3637
[tool.setuptools_scm]
3738
version_scheme = "post-release"
39+
40+
[dependency-groups]
41+
dev = [
42+
"pytest>=8.3.5",
43+
]

rqadeforestation/__init__.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import ctypes as ct
33
import numpy as np
4+
import dask.array as da
45

56
class MallocVector(ct.Structure):
67
_fields_ = [("pointer", ct.c_void_p),
@@ -43,6 +44,66 @@ def rqatrend(y: np.ndarray, threshold: float, border: int = 10, theiler: int = 1
4344
result_single = lib.rqatrend(py, threshold, border, theiler)
4445
return result_single
4546

47+
48+
def rqatrend_matrix(matrix: np.ndarray, threshold: float, border: int = 10, theiler: int = 1) -> np.ndarray:
49+
"""
50+
Calculate the RQA trend for a matrix of time series.
51+
52+
:param matrix: Input time series data as a numpy array of shape (n_timeseries, series_length).
53+
:param threshold: Threshold value for the RQA calculation.
54+
:param border: Border size for the RQA calculation.
55+
:param theiler: Theiler window size for the RQA calculation.
56+
:return: Numpy array of all RQA trend values of size n_timeseries.
57+
"""
58+
59+
n = matrix.shape[0]
60+
result_several = np.ones(n)
61+
p_result_several = mvptr(result_several)
62+
p_matrix = mmptr(matrix)
63+
64+
# arguments: result_vector, data, threshhold, border, theiler
65+
lib.rqatrend_inplace.argtypes = (ct.POINTER(MallocVector), ct.POINTER(MallocMatrix), ct.c_double, ct.c_int64, ct.c_int64)
66+
return_value = lib.rqatrend_inplace(p_result_several, p_matrix, threshold, border, theiler)
67+
return result_several
68+
69+
70+
def rqatrend_dask(x: da.Array, timeseries_axis: int, threshold: float, border: int = 10, theiler: int = 1, out_dtype: type = np.float64) -> da.Array:
71+
"""
72+
Apply rqatrend to a given dask array.
73+
74+
Consider comparing this function's performance with a simple `dask.array.apply_along_axis`
75+
```py
76+
import dask.array as da
77+
da.apply_along_axis(lambda ts: rqatrend(ts.ravel(), threshold, border, theiler), time_axis, darr)
78+
```
79+
80+
:param x: dask Array on which rqatrend should be computed.
81+
:param timeseries_axis: dask Array axis on which the rqatrend function should be applied
82+
:param threshold: Threshold value for the RQA calculation.
83+
:param border: Border size for the RQA calculation.
84+
:param theiler: Theiler window size for the RQA calculation.
85+
:param out_dtype: dtype of the output dask array, in case a smaller float representation is wanted, or similar.
86+
:return: Dask array of all RQA trend values without the timeseries_axis dimension (it got aggregated by rqatrend).
87+
"""
88+
# Rechunk so full timeseries axis is in one block
89+
# do this first so we can use the optimized chunks also for moveaxis
90+
x_rechunked = x.rechunk({timeseries_axis: -1})
91+
92+
# Move timeseries axis to the end
93+
x_moved = da.moveaxis(x_rechunked, timeseries_axis, -1)
94+
95+
def _block_wrapper(block):
96+
# block shape: (..., series_length)
97+
mat = block.reshape(-1, block.shape[-1]) # (n_timeseries, series_length)
98+
result = rqatrend_matrix(mat, threshold, border, theiler) # (n_timeseries,)
99+
return result.reshape(block.shape[:-1]) # reduce last axis
100+
101+
return x_moved.map_blocks(
102+
_block_wrapper,
103+
dtype=out_dtype,
104+
drop_axis=-1 # <---- tell Dask we removed the last axis
105+
)
106+
46107
def main() -> None:
47108
pass
48109

tests/test_rqadeforestation.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import numpy as np
2+
from rqadeforestation import rqatrend, rqatrend_matrix, rqatrend_dask
3+
import time
4+
import dask.array as da
5+
6+
def test_rqatrend_versus_rqatrendmatrix() -> None:
7+
x = np.arange(1, 30, step=0.01)
8+
y = np.sin(x) + 0.1 * x
9+
assert np.all(rqatrend(y, 0.5, 10, 1) == rqatrend_matrix(np.tile(y, (5, 1)), 0.5, 10, 1))
10+
11+
12+
def _vector_method(ts: np.ndarray, threshold: float) -> float:
13+
return rqatrend(ts.ravel(), threshold, 10, 1) # scalar
14+
15+
16+
def vector_method(darr, time_axis, threshold):
17+
return da.apply_along_axis(_vector_method, time_axis, darr, threshold=threshold)
18+
19+
20+
def matrix_method(darr, time_axis, threshold):
21+
return rqatrend_dask(darr, timeseries_axis=time_axis, threshold=threshold)
22+
23+
24+
def test_dask_rqatrend_versus_rqatrendmatrix() -> None:
25+
# ----- Synthetic multidimensional data -----
26+
# Shape: (batch, channels, time)
27+
batch = 5000
28+
channels = 4
29+
30+
# Base timeseries
31+
x = np.arange(1, 30, step=0.01)
32+
y = np.sin(x) + 0.1 * x # shape (2900,)
33+
34+
# Create data, broadcast on first dimension
35+
time_axis = 0
36+
big_data = np.tile(y[:, None, None], (1, batch, channels))
37+
38+
# Create data, broadcast on last dimension
39+
# time_axis = -1
40+
# big_data = np.tile(y[None, None, :], (batch, channels, 1))
41+
42+
# Wrap in Dask
43+
# darr = da.from_array(big_data, chunks=(200, channels, -1))
44+
darr = da.from_array(big_data, chunks=400)
45+
46+
print("Original shape:", darr.shape)
47+
print("Original chunks:", darr.chunks)
48+
49+
y1 = vector_method(darr, time_axis, threshold=0.5)
50+
y2 = matrix_method(darr, time_axis, threshold=0.5)
51+
52+
print("Computing vector method...")
53+
t0 = time.time()
54+
res1 = y1.compute()
55+
t1 = time.time()
56+
print("Slow method time: %.2f s" % (t1 - t0,))
57+
58+
print("Computing matrix method...")
59+
t0 = time.time()
60+
res2 = y2.compute()
61+
t1 = time.time()
62+
print("Fast method time: %.2f s" % (t1 - t0,))
63+
64+
# Sanity check
65+
print("Output shapes:", res1.shape, res2.shape)
66+
assert np.allclose(res1, res2)

0 commit comments

Comments
 (0)