-
Notifications
You must be signed in to change notification settings - Fork 10
Expand file tree
/
Copy pathutils.py
More file actions
160 lines (125 loc) · 6.04 KB
/
utils.py
File metadata and controls
160 lines (125 loc) · 6.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
from enum import Enum
from typing import Union
import pandas as pd
from torch.utils.data import Dataset
import numpy as np
from pytorch_lightning import Callback
import torch
import os
import logging
from typing import Union
def beauty_string(message:str,type:str,verbose:bool):
size = 150
if verbose is True:
if type=='block':
characters = len(message)
border = max((100-characters)//2-5,0)
logging.info('\n')
logging.info(f"{'#'*size}")
logging.info(f"{'#'*border}{' '*(size-border*2)}{'#'*border}")
logging.info(f"{ message:^{size}}")
logging.info(f"{'#'*border}{' '*(size-border*2)}{'#'*border}")
logging.info(f"{'#'*size}")
elif type=='section':
logging.info('\n')
logging.info(f"{'#'*size}")
logging.info(f"{ message:^{size}}")
logging.info(f"{'#'*size}")
elif type=='info':
logging.info(f"{ message:^{size}}")
else:
logging.info(message)
def extend_time_df(x:pd.DataFrame,freq:Union[str,int],group:Union[str,None]=None,global_minmax:bool=False)-> pd.DataFrame:
"""Utility for generating a full dataset and then merge the real data
Args:
x (pd.DataFrame): dataframe containing the column time
freq (str): frequency (in pandas notation) of the resulting dataframe
group (string or None): if not None the min max are computed by the group column, default None
global_minmax (bool): if True the min_max is computed globally for each group. Usually used for stacked model
Returns:
pd.DataFrame: a dataframe with the column time ranging from thr minumum of x to the maximum with frequency `freq`
"""
if group is None:
if isinstance(freq,int):
empty = pd.DataFrame({'time':list(range(x.time.min(),x.time.max(),freq))})
else:
empty = pd.DataFrame({'time':pd.date_range(x.time.min(),x.time.max(),freq=freq)})
else:
if global_minmax:
_min = pd.DataFrame({group:x[group].unique(),'time':x.time.min()})
_max = pd.DataFrame({group:x[group].unique(),'time':x.time.max()})
else:
_min = x.groupby(group).time.min().reset_index()
_max = x.groupby(group).time.max().reset_index()
empty = []
for c in x[group].unique():
if isinstance(freq,int):
empty.append(pd.DataFrame({group:c,'time':np.arange(_min.time[_min[group]==c].values[0],_max.time[_max[group]==c].values[0],freq)}))
else:
empty.append(pd.DataFrame({group:c,'time':pd.date_range(_min.time[_min[group]==c].values[0],_max.time[_max[group]==c].values[0],freq=freq)}))
empty = pd.concat(empty,ignore_index=True)
return empty
class MetricsCallback(Callback):
"""PyTorch Lightning metric callback.
:meta private:
"""
def __init__(self,dirpath):
super().__init__()
self.dirpath = dirpath
self.metrics = {'val_loss':[],'train_loss':[]}
def on_validation_end(self, trainer, pl_module):
for c in trainer.callback_metrics:
self.metrics[c].append(trainer.callback_metrics[c].item())
##Write csv in a convenient way
tmp = self.metrics.copy()
tmp['val_loss'] = tmp['val_loss'][2:]
losses = pd.DataFrame(tmp)
losses.to_csv(os.path.join(self.dirpath,'loss.csv'),index=False)
def on_train_end(self, trainer, pl_module):
losses = self.metrics
##non so perche' le prime due le chiama prima del train
losses['val_loss'] = losses['val_loss'][2:]
losses = pd.DataFrame(losses)
##accrocchio per quando ci sono piu' gpu!
losses.to_csv(os.path.join(self.dirpath,f'{np.random.randint(10000)}__losses__.csv'),index=False)
print("Saving losses on file because multigpu not working")
class MyDataset(Dataset):
def __init__(self, data:dict,t:np.array,groups:np.array,idx_target:Union[np.array,None],idx_target_future:Union[np.array,None])->torch.utils.data.Dataset:
"""
Extension of Dataset class. While training the returned item is a batch containing the standard keys
Args:
data (dict): a dictionary. Each key is a np.array containing the data. The keys are:
y : the target variable(s)
x_num_past: the numerical past variables
x_num_future: the numerical future variables
x_cat_past: the categorical past variables
x_cat_future: the categorical future variables
idx_target: index of target features in the past array
t (np.array): the time array related to the target variables
idx_target (Union[np.array,None]): you can specify the index in the past data that represent the input features (for differntial analysis or detrending strategies)
idx_target_future (Union[np.array,None]): you can specify the index in the future data that represent the input features (for differntial analysis or detrending strategies)
Returns:
torch.utils.data.Dataset: a torch Dataset to be used in a Dataloader
"""
self.data = data
self.t = t
self.groups = groups
self.idx_target = np.array(idx_target) if idx_target is not None else None
self.idx_target_future = np.array(idx_target_future) if idx_target_future is not None else None
def __len__(self):
return len(self.data['y'])
def __getitem__(self, idxs):
sample = {}
for k in self.data:
sample[k] = self.data[k][idxs]
if self.idx_target is not None:
sample['idx_target'] = self.idx_target
if self.idx_target_future is not None:
sample['idx_target_future'] = self.idx_target_future
return sample
class ActionEnum(Enum):
"""action of categorical variable
:meta private:
"""
multiplicative: str = 'multiplicative'
additive: str = 'additive'