Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions movement/kinematics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
compute_forward_vector,
compute_forward_vector_angle,
compute_head_direction_vector,
compute_directional_change,
)
from movement.kinematics.kinetic_energy import compute_kinetic_energy

Expand All @@ -31,5 +32,6 @@
"compute_forward_vector",
"compute_head_direction_vector",
"compute_forward_vector_angle",
"compute_directional_change",
"compute_kinetic_energy",
]
71 changes: 71 additions & 0 deletions movement/kinematics/orientation.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from movement.utils.logging import logger
from movement.utils.vector import (
compute_norm,
compute_signed_angle_2d,
convert_to_unit,
)
Expand Down Expand Up @@ -279,3 +280,73 @@ def _validate_type_data_array(data: xr.DataArray) -> None:
f"but got {type(data)}."
)
)


def compute_directional_change(data: xr.DataArray) -> xr.DataArray:
"""Compute per-time-step directional change (angular change per unit time).

Directional change at time i is defined as the absolute turning angle
between consecutive displacement vectors divided by the time interval
between the neighbouring samples::

DC_i = |theta_i| / (time[i+1] - time[i-1])

Parameters
----------
data
Position data with dims ``(time, space, individuals)`` or
``(time, space, keypoints, individuals)``. The ``space`` dimension
must contain the coordinates ``"x"`` and ``"y"``. The coordinate
``time`` must be numeric.

Returns
-------
xarray.DataArray
Directional change with dims ``(time, individuals)`` or
``(time, keypoints, individuals)``. The ``space`` dimension is
removed. Name is set to ``"directional_change"``.

Notes
-----
- First and last timepoints are set to NaN because turning angle is
undefined there.
- Zero-length displacement vectors produce NaN in the output.

"""
_validate_type_data_array(data)
validate_dims_coords(data, {"time": [], "space": ["x", "y"]})

if data.sizes["space"] != 2:
raise ValueError("Input data must have exactly 2 spatial dimensions.")

# Displacement vectors:
# v_i = p_{i+1} - p_i
v = data.shift(time=-1) - data

# v_{i-1} = p_i - p_{i-1}
v_prev = data - data.shift(time=1)

# Mask zero-length displacement vectors so their angles become NaN
v = v.where(compute_norm(v) != 0)
v_prev = v_prev.where(compute_norm(v_prev) != 0)

# Signed turning angle between consecutive displacement vectors
theta: xr.DataArray = compute_signed_angle_2d(v_prev, v)

# Absolute turning angle
abs_theta: xr.DataArray = xr.apply_ufunc(np.abs, theta)

# Time interval: dt[i] = time[i+1] - time[i-1]
time_coord = data.coords["time"]
dt: xr.DataArray = (
time_coord.shift(time=-1) - time_coord.shift(time=1)
).astype("float64")

# Compute directional change; dt broadcasts over non-time dims
result: xr.DataArray = abs_theta / dt

# Avoid inf if dt contains zeros
result = result.where(dt != 0)

result.name = "directional_change"
return result
78 changes: 78 additions & 0 deletions tests/test_unit/test_kinematics/test_orientation.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,81 @@ def test_casts_from_tuple(

xr.testing.assert_allclose(pass_numpy, pass_tuple)
xr.testing.assert_allclose(pass_numpy, pass_list)


def make_pos(positions, times=None, individuals=None):
times = np.arange(len(positions)) if times is None else np.array(times)
if individuals is None:
individuals = ["ind1"]
positions = np.asarray(positions)
data = positions.reshape(len(times), 2, 1)
data = np.tile(data, (1, 1, len(individuals)))
return xr.DataArray(
data,
dims=("time", "space", "individuals"),
coords={
"time": times,
"space": ["x", "y"],
"individuals": individuals,
},
)


def test_straight_line_zero_dc():
pos = [[0, 0], [1, 0], [2, 0], [3, 0]]
da = make_pos(pos)
dc = kinematics.compute_directional_change(da)
assert np.allclose(dc.sel(time=1).values, 0.0, equal_nan=True)
assert np.allclose(dc.sel(time=2).values, 0.0, equal_nan=True)
assert np.isnan(dc.sel(time=0).values).all()
assert np.isnan(dc.sel(time=3).values).all()


def test_ninety_degree_turn():
pos = [[0, 0], [1, 0], [1, 1]]
times = [0.0, 1.0, 2.0]
da = make_pos(pos, times=times)
dc = kinematics.compute_directional_change(da)
expected = (np.pi / 2) / (times[2] - times[0])
assert np.allclose(dc.sel(time=1.0).values, expected)
assert np.isnan(dc.sel(time=0.0).values).all()
assert np.isnan(dc.sel(time=2.0).values).all()


def test_irregular_time_intervals():
pos = [[0, 0], [1, 0], [1, 1]]
times = [0.0, 0.5, 1.5]
da = make_pos(pos, times=times)
dc = kinematics.compute_directional_change(da)
expected = (np.pi / 2) / (times[2] - times[0])
assert np.allclose(dc.sel(time=0.5).values, expected)


def test_nan_and_zero_length_vectors():
pos = [[0, 0], [np.nan, np.nan], [1, 1]]
times = [0.0, 1.0, 2.0]
da = make_pos(pos, times=times)
dc = kinematics.compute_directional_change(da)
assert np.isnan(dc.sel(time=1.0).values).all()

pos2 = [[0, 0], [0, 0], [1, 0]]
da2 = make_pos(pos2, times=[0.0, 1.0, 2.0])
dc2 = kinematics.compute_directional_change(da2)
assert np.isnan(dc2.sel(time=1.0).values).all()


def test_multi_individual():
times = [0.0, 1.0, 2.0]
pos = np.array([[0, 0], [1, 0], [1, 1]])
inds = ["a", "b"]
data = pos.reshape(3, 2, 1)
data = np.tile(data, (1, 1, 2))
da = xr.DataArray(
data,
dims=("time", "space", "individuals"),
coords={"time": times, "space": ["x", "y"], "individuals": inds},
)
dc = kinematics.compute_directional_change(da)
assert np.allclose(
dc.sel(time=1.0, individuals="a"), dc.sel(time=1.0, individuals="b")
)