Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions tsml_eval/segmentation/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
"BasePLA",
"SlidingWindow",
"TopDown",
"BottomUp"
"BottomUp",
"SWAB",
]
from base import BasePLA
from _sw import SlidingWindow
from _td import TopDown
from _bu import BottomUp
from _bu import BottomUp
from _swab import SWAB
57 changes: 54 additions & 3 deletions tsml_eval/segmentation/_bu.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,40 @@
__all__ = ["BottomUp"]

class BottomUp(BasePLA):
"""
Bottom-Up Segmentation.

Uses a bottom-up algorithm to traverse the dataset in an online manner.

Parameters
----------
max_error: float
The maximum error valuefor the function to find before segmenting the dataset

References
----------
.. [1] Keogh, E., Chu, S., Hart, D. and Pazzani, M., 2001, November.
An online algorithm for segmenting time series. (pp. 289-296).
"""

def __init__(self, max_error):
super().__init__(max_error)

#clean the code
def bottomUp(self, time_series):
def segment(self, time_series):
"""Segment a time series

Parameters
----------
time_series : np.array
1D time series to be segmented.

Returns
-------
list
List of segmentations
"""

seg_ts = []
merge_cost = []
for i in range(0, len(time_series), 2):
Expand All @@ -20,7 +48,7 @@ def bottomUp(self, time_series):

merge_cost = np.array(merge_cost)

while len(merge_cost != 0) and min(merge_cost) < self.max_error:
while len(merge_cost) != 0 and min(merge_cost) < self.max_error:
if(len(merge_cost) == len(seg_ts)):
print("error")
pos = np.argmin(merge_cost)
Expand All @@ -38,4 +66,27 @@ def bottomUp(self, time_series):
merge_cost[pos] = self.calculate_error(np.concatenate((seg_ts[pos], seg_ts[pos + 1])))


return seg_ts
return seg_ts


def dense(self, time_series):
"""Return the dense values of a segmented time series

Parameters
----------
time_series : np.array
1D time series to be segmented.

Returns
-------
list
dense values of a segmentation
"""

results = self.segment(time_series)
dense_array = np.zeros(len(results) - 1)
segmentation_point = 0
for i in range(len(results) - 1):
segmentation_point = segmentation_point + len(results[i])
dense_array[i] = segmentation_point
return dense_array
68 changes: 51 additions & 17 deletions tsml_eval/segmentation/_sw.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,39 @@
__all__ = ["SlidingWindow"]

class SlidingWindow(BasePLA):
"""Sliding Window Segmentation.

Uses a sliding window algorithm to traverse the dataset in an online manner.

Parameters
----------
max_error: float
The maximum error valuefor the function to find before segmenting the dataset

References
----------
.. [1] Keogh, E., Chu, S., Hart, D. and Pazzani, M., 2001, November.
An online algorithm for segmenting time series. (pp. 289-296).
"""

def __init__(self, max_error):
super().__init__(max_error)

"""work in progress
def sliding_window(self, time_series):
seg_ts = []
anchor = 0
for i in range(1, len(time_series)):
if self.calculate_error(time_series[anchor:i]) > self.max_error:
seg_ts.append(self.create_segment(time_series[anchor: i - 1]))
anchor = i - 1
if(anchor < i):
seg_ts.append(self.create_segment(time_series[anchor: i - 1]))
return np.concatenate(seg_ts) """

#! clean this up, the while loops are not done in a good manner. This is from the pseudocode
def sliding_window(self, time_series):
def segment(self, time_series):
"""Segment a time series

Parameters
----------
time_series : np.array
1D time series to be segmented.

Returns
-------
list
List of segmentations
"""

seg_ts = []
anchor = 0
while anchor < len(time_series):
Expand All @@ -33,8 +48,27 @@ def sliding_window(self, time_series):
anchor = anchor + i - 1
return seg_ts

def segment(time_series):
return None

def pla(time_series):
return None
def dense(self, time_series):
"""Return the dense values of a segmented time series

Parameters
----------
time_series : np.array
1D time series to be segmented.

Returns
-------
list
dense values of a segmentation
"""

results = self.segment(time_series)
dense_array = np.zeros(len(results) - 1)
segmentation_point = 0
for i in range(len(results) - 1):
segmentation_point = segmentation_point + len(results[i])
dense_array[i] = segmentation_point
return dense_array


99 changes: 83 additions & 16 deletions tsml_eval/segmentation/_swab.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,105 @@
from base import BasePLA
import numpy as np
import sys
import BottomUp
from _bu import BottomUp

__maintainer__ = []
__all__ = ["SWAB"]

class SWAB(BasePLA):
"""
SWAB (Sliding Window And Bottom-Up) Segmentation.

Uses SWAB algorithm as described in [1] to traverse the dataset in an online manner.

Parameters
----------
max_error: float
The maximum error valuefor the function to find before segmenting the dataset

References
----------
.. [1] Keogh, E., Chu, S., Hart, D. and Pazzani, M., 2001, November.
An online algorithm for segmenting time series. (pp. 289-296).
"""

def __init__(self, max_error, seg_num = 6):
self.seg_num = seg_num
def __init__(self, max_error):
self.bottomup = BottomUp(max_error)
super().__init__(max_error)


def swab(self, time_series):
def segment(self, time_series):
"""Segment a time series

Parameters
----------
time_series : np.array
1D time series to be segmented.

Returns
-------
list
List of segmentations
"""

seg_ts = []
buffer = np.empty(self.seg_num, dtype=object)
sw_lower_bound = len(buffer) / 2
sw_upper_bound = len(buffer) * 2
while len(buffer) < 3:
t = self.bottomup(time_series)
seg = self.best_line(time_series, 0)
current_data_point = len(seg)
buffer = np.array(seg)
while len(buffer) > 0:
t = self.bottomup.bottomUp(time_series)
seg_ts.append(t[0])
buffer = buffer[len(t) - 1:]
return None
buffer = buffer[len(t[0]):]
if(current_data_point != len(time_series)):
seg = self.best_line(time_series, current_data_point)
current_data_point = current_data_point + len(seg)
buffer = np.append(buffer, seg)
return seg_ts


#finds the next potential segment
def best_line(self, time_series, current_data_point, sw_lower_bound, sw_upper_bound):
seg_ts = []
def best_line(self, time_series, current_data_point):
"""Uses sliding window to find the next best segmentation candidate

Parameters
----------
time_series : np.array
1D time series to be segmented.
current_data_point : int
the current_data_point we are observing

Returns
-------
np.array
new found segmentation candidate
"""

seg_ts = np.array([])
error = 0
while error < self.max_error:
seg_ts.append = time_series[current_data_point]
while current_data_point < len(time_series) and error < self.max_error:
seg_ts = np.append(seg_ts, time_series[current_data_point])
error = self.calculate_error(seg_ts)
current_data_point = current_data_point + 1
return seg_ts


def dense(self, time_series):
"""Return the dense values of a segmented time series

Parameters
----------
time_series : np.array
1D time series to be segmented.

Returns
-------
list
dense values of a segmentation
"""

results = self.segment(time_series)
dense_array = np.zeros(len(results) - 1)
segmentation_point = 0
for i in range(len(results) - 1):
segmentation_point = segmentation_point + len(results[i])
dense_array[i] = segmentation_point
return dense_array
Loading