diff --git a/aeon/transformations/collection/convolution_based/rocketGPU/__init__.py b/aeon/transformations/collection/convolution_based/rocketGPU/__init__.py new file mode 100644 index 0000000000..5b4917a47a --- /dev/null +++ b/aeon/transformations/collection/convolution_based/rocketGPU/__init__.py @@ -0,0 +1,7 @@ +"""Rocket transformers for GPU.""" + +__all__ = ["ROCKETGPU"] + +from aeon.transformations.collection.convolution_based.rocketGPU._rocket_gpu import ( + ROCKETGPU, +) diff --git a/aeon/transformations/collection/convolution_based/rocketGPU/_rocket_gpu.py b/aeon/transformations/collection/convolution_based/rocketGPU/_rocket_gpu.py new file mode 100644 index 0000000000..49310bc1ac --- /dev/null +++ b/aeon/transformations/collection/convolution_based/rocketGPU/_rocket_gpu.py @@ -0,0 +1,250 @@ +"""Rocket transformer for GPU.""" + +__author__ = ["hadifawaz1999"] +__all__ = ["ROCKETGPU"] + +from aeon.transformations.collection.convolution_based.rocketGPU.base import ( + BaseROCKETGPU, +) + + +class ROCKETGPU(BaseROCKETGPU): + """RandOm Convolutional KErnel Transform (ROCKET) for GPU. + + A kernel (or convolution) is a subseries used to create features that can be used + in machine learning tasks. ROCKET [1]_ generates a large number of random + convolutional kernels in the fit method. The length and dilation of each kernel + are also randomly generated. The kernels are used in the transform stage to + generate a new set of features. A kernel is used to create an activation map for + each series by running it across a time series, including random length and + dilation. It transforms the time series with two features per kernel. The first + feature is global max pooling and the second is proportion of positive values + (or PPV). + + + Parameters + ---------- + n_filters : int, default=10,000 + Number of random convolutional filters. + kernel_size : list, default = None + The list of possible kernel sizes, default is [7, 9, 11]. + padding : list, default = None + The list of possible tensorflow padding, default is ["SAME", "VALID"]. + use_dilation : bool, default = True + Whether or not to use dilation in convolution operations. + bias_range : Tuple, default = None + The min and max value of bias values, default is (-1.0, 1.0). + batch_size : int, default = 64 + The batch to parallelize over GPU. + random_state : None or int, optional, default = None + Seed for random number generation. + + References + ---------- + .. [1] Tan, Chang Wei and Dempster, Angus and Bergmeir, Christoph + and Webb, Geoffrey I, + "ROCKET: Exceptionally fast and accurate time series + classification using random convolutional kernels",2020, + https://link.springer.com/article/10.1007/s10618-020-00701-z, + https://arxiv.org/abs/1910.13051 + """ + + def __init__( + self, + n_filters=1000, + kernel_size=None, + padding=None, + use_dilation=True, + bias_range=None, + batch_size=64, + random_state=None, + ): + super().__init__(n_filters) + + self.n_filters = n_filters + self.kernel_size = kernel_size + self.padding = padding + self.use_dilation = use_dilation + self.bias_range = bias_range + self.batch_size = batch_size + self.random_state = random_state + + def _define_parameters(self): + """Define the parameters of ROCKET.""" + import numpy as np + + rng = np.random.default_rng(self.random_state) + + self._list_of_kernels = [] + self._list_of_dilations = [] + self._list_of_paddings = [] + self._list_of_biases = [] + + for _ in range(self.n_filters): + _kernel_size = rng.choice(self._kernel_size, size=1)[0] + _convolution_kernel = rng.normal(size=(_kernel_size, self.n_channels, 1)) + _convolution_kernel = _convolution_kernel - _convolution_kernel.mean( + axis=0, keepdims=True + ) + + if self.use_dilation: + _dilation_rate = 2 ** rng.uniform( + 0, np.log2((self.input_length - 1) / (_kernel_size - 1)) + ) + else: + _dilation_rate = 1 + + _padding = rng.choice(self._padding, size=1)[0] + assert _padding in ["SAME", "VALID"] + + _bias = rng.uniform(self._bias_range[0], self._bias_range[1]) + + self._list_of_kernels.append(_convolution_kernel) + self._list_of_dilations.append(_dilation_rate) + self._list_of_paddings.append(_padding) + self._list_of_biases.append(_bias) + + def _fit(self, X, y=None): + """Generate random kernels adjusted to time series shape. + + Infers time series length and number of channels from input numpy array, + and generates random kernels. + + Parameters + ---------- + X : 3D np.ndarray of shape = (n_instances, n_channels, n_timepoints) + collection of time series to transform. + y : ignored argument for interface compatibility. + + Returns + ------- + self + """ + self.input_length = X.shape[2] + self.n_channels = X.shape[1] + + self._kernel_size = [7, 9, 11] if self.kernel_size is None else self.kernel_size + self._padding = ["VALID", "SAME"] if self.padding is None else self.padding + self._bias_range = (-1.0, 1.0) if self.bias_range is None else self.bias_range + + assert self._bias_range[0] <= self._bias_range[1] + + self._define_parameters() + + def _generate_batch_indices(self, n): + """Generate the list of batches. + + Parameters + ---------- + n : int + The number of samples in the dataset. + + Returns + ------- + batch_indices_list : list + A list of multiple np.ndarray containing indices of batches. + """ + import numpy as np + + all_indices = np.arange(n) + + if self.batch_size >= n: + return [all_indices] + + remainder_batch_size = n % self.batch_size + number_batches = n // self.batch_size + + batch_indices_list = np.array_split( + ary=all_indices[: n - remainder_batch_size], + indices_or_sections=number_batches, + ) + + if remainder_batch_size > 0: + batch_indices_list.append(all_indices[n - remainder_batch_size :]) + + return batch_indices_list + + def _transform(self, X, y=None): + """Transform input time series using random convolutional kernels. + + Parameters + ---------- + X : 3D np.ndarray of shape = [n_instances, n_channels, n_timepoints] + collection of time series to transform. + y : ignored argument for interface compatibility. + + Returns + ------- + output_rocket : np.ndarray [n_instances, n_filters * 2] + transformed features. + """ + import numpy as np + import tensorflow as tf + + tf.random.set_seed(self.random_state) + + X = X.transpose(0, 2, 1) + + batch_indices_list = self._generate_batch_indices(n=len(X)) + + output_features = [] + + for f in range(self.n_filters): + output_features_filter = [] + + for batch_indices in batch_indices_list: + _output_convolution = tf.nn.conv1d( + input=X[batch_indices], + stride=1, + filters=self._list_of_kernels[f], + dilations=self._list_of_dilations[f], + padding=self._list_of_paddings[f], + ) + + _output_convolution = np.squeeze(_output_convolution.numpy(), axis=-1) + _output_convolution += self._list_of_biases[f] + + _ppv = self._get_ppv(x=_output_convolution) + _max = self._get_max(x=_output_convolution) + + output_features_filter.append( + np.concatenate( + (np.expand_dims(_ppv, axis=-1), np.expand_dims(_max, axis=-1)), + axis=1, + ) + ) + + output_features.append( + np.expand_dims(np.concatenate(output_features_filter, axis=0), axis=0) + ) + + output_rocket = np.concatenate(output_features, axis=0).swapaxes(0, 1) + output_rocket = output_rocket.reshape( + (output_rocket.shape[0], output_rocket.shape[1] * output_rocket.shape[2]) + ) + + return output_rocket + + @classmethod + def get_test_params(cls, parameter_set="default"): + """Return testing parameter settings for the transformer. + + Parameters + ---------- + parameter_set : str, default="default" + Name of the set of test parameters to return, for use in tests. If no + special parameters are defined for a value, will return `"default"` set. + + + Returns + ------- + params : dict or list of dict, default = {} + Parameters to create testing instances of the class + Each dict are parameters to construct an "interesting" test instance, i.e., + `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. + `create_test_instance` uses the first (or only) dictionary in `params` + """ + params = { + "n_filters": 5, + } + return params diff --git a/aeon/transformations/collection/convolution_based/rocketGPU/base.py b/aeon/transformations/collection/convolution_based/rocketGPU/base.py new file mode 100644 index 0000000000..5dd118431a --- /dev/null +++ b/aeon/transformations/collection/convolution_based/rocketGPU/base.py @@ -0,0 +1,44 @@ +"""Base of Rocket based transformer for GPU.""" + +__author__ = ["hadifawaz1999"] +__all__ = ["BaseROCKETGPU"] + +from aeon.transformations.collection import BaseCollectionTransformer + + +class BaseROCKETGPU(BaseCollectionTransformer): + """Base class for ROCKET GPU based transformers. + + Parameters + ---------- + nb_filters : int, default = 1000 + Number of random convolutional kernels. + """ + + _tags = { + "X_inner_type": "numpy3D", + "output_data_type": "Tabular", + "capability:multivariate": True, + "algorithm_type": "convolution", + "capability:unequal_length": False, + "cant-pickle": True, + "python_dependencies": "tensorflow", + } + + def __init__( + self, + n_filters=10000, + ): + super().__init__() + self.n_filters = n_filters + + def _get_ppv(self, x): + import tensorflow as tf + + x_pos = tf.math.count_nonzero(tf.nn.relu(x), axis=1) + return tf.math.divide(x_pos, x.shape[1]) + + def _get_max(self, x): + import tensorflow as tf + + return tf.math.reduce_max(x, axis=1) diff --git a/aeon/transformations/collection/convolution_based/rocketGPU/tests/__init__.py b/aeon/transformations/collection/convolution_based/rocketGPU/tests/__init__.py new file mode 100644 index 0000000000..a3ec599f4e --- /dev/null +++ b/aeon/transformations/collection/convolution_based/rocketGPU/tests/__init__.py @@ -0,0 +1 @@ +"""Rocket GPU unit tests.""" diff --git a/aeon/transformations/collection/convolution_based/rocketGPU/tests/test_base_rocketGPU.py b/aeon/transformations/collection/convolution_based/rocketGPU/tests/test_base_rocketGPU.py new file mode 100644 index 0000000000..68c8bc9ef5 --- /dev/null +++ b/aeon/transformations/collection/convolution_based/rocketGPU/tests/test_base_rocketGPU.py @@ -0,0 +1,119 @@ +"""Unit tests for rocket GPU base functionality.""" + +import pytest + +from aeon.testing.utils.data_gen import make_example_2d_numpy, make_example_3d_numpy +from aeon.transformations.collection.convolution_based.rocketGPU.base import ( + BaseROCKETGPU, +) +from aeon.utils.validation._dependencies import _check_soft_dependencies + +__author__ = ["hadifawaz1999"] +__all__ = ["test_base_rocketGPU_univariate", "test_base_rocketGPU_multivariate"] + + +class DummyROCKETGPU(BaseROCKETGPU): + + def __init__(self, n_filters=1): + super().__init__(n_filters) + + def _fit(self, X, y=None): + """Generate random kernels adjusted to time series shape. + + Infers time series length and number of channels from input numpy array, + and generates random kernels. + + Parameters + ---------- + X : 3D np.ndarray of shape = (n_instances, n_channels, n_timepoints) + collection of time series to transform. + y : ignored argument for interface compatibility. + + Returns + ------- + self + """ + self.kernel_size = 2 + + def _transform(self, X, y=None): + """Transform input time series using random convolutional kernels. + + Parameters + ---------- + X : 3D np.ndarray of shape = [n_instances, n_channels, n_timepoints] + collection of time series to transform. + y : ignored argument for interface compatibility. + + Returns + ------- + output_rocket : np.ndarray [n_instances, n_filters * 2] + transformed features. + """ + import numpy as np + import tensorflow as tf + + X = X.transpose(0, 2, 1) + + rng = np.random.default_rng() + + _output_convolution = tf.nn.conv1d( + input=X, + filters=rng.normal(size=(self.kernel_size, X.shape[-1], self.n_filters)), + stride=1, + padding="VALID", + dilations=1, + ) + + _output_convolution = np.squeeze(_output_convolution.numpy(), axis=-1) + + _ppv = self._get_ppv(x=_output_convolution) + _max = self._get_max(x=_output_convolution) + + _output_features = np.concatenate( + (np.expand_dims(_ppv, axis=-1), np.expand_dims(_max, axis=-1)), + axis=1, + ) + + return _output_features + + +@pytest.mark.skipif( + not _check_soft_dependencies("tensorflow", severity="none"), + reason="skip test if required soft dependency not available", +) +def test_base_rocketGPU_univariate(): + """Test base rocket GPU functionality univariate.""" + X, _ = make_example_2d_numpy() + + dummy_transform = DummyROCKETGPU(n_filters=1) + dummy_transform.fit(X) + + X_transform = dummy_transform.transform(X) + + assert X_transform.shape[0] == len(X) + assert len(X_transform.shape) == 2 + assert X_transform.shape[1] == 2 + + # check all ppv values are >= 0 + assert (X_transform[:, 0] >= 0).sum() == len(X) + + +@pytest.mark.skipif( + not _check_soft_dependencies("tensorflow", severity="none"), + reason="skip test if required soft dependency not available", +) +def test_base_rocketGPU_multivariate(): + """Test base rocket GPU functionality multivariate.""" + X, _ = make_example_3d_numpy(n_channels=3) + + dummy_transform = DummyROCKETGPU(n_filters=1) + dummy_transform.fit(X) + + X_transform = dummy_transform.transform(X) + + assert X_transform.shape[0] == len(X) + assert len(X_transform.shape) == 2 + assert X_transform.shape[1] == 2 + + # check all ppv values are >= 0 + assert (X_transform[:, 0] >= 0).sum() == len(X)