-
Notifications
You must be signed in to change notification settings - Fork 129
/
financial_dataloader.py
106 lines (87 loc) · 3.96 KB
/
financial_dataloader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from torch.utils.data import DataLoader
from torch.autograd import Variable
import torch
import numpy as np
def normal_std(x):
return x.std() * np.sqrt((len(x) - 1.) / (len(x)))
class DataLoaderH(object):
# train and valid is the ratio of training set and validation set. test = 1 - train - valid
def __init__(self, file_name, train, valid, horizon, window, normalize=2):
self.P = window
self.h = horizon
fin = open(file_name)
self.rawdat = np.loadtxt(fin, delimiter=',')
self.dat = np.zeros(self.rawdat.shape)
self.n, self.m = self.dat.shape
self.normalize = 2
self.scale = np.ones(self.m)
self.bias = np.zeros(self.m)
self._normalized(normalize)
self._split(int(train * self.n), int((train + valid) * self.n), self.n)
self.scale = torch.from_numpy(self.scale).float()
self.bias = torch.from_numpy(self.bias).float()
tmp = self.test[1] * self.scale.expand(self.test[1].size(0), self.h, self.m)
self.scale = self.scale.cuda()
self.scale = Variable(self.scale)
self.bias = self.bias.cuda()
self.bias = Variable(self.bias)
tmp = tmp[:, -1, :].squeeze()
self.rse = normal_std(tmp)
self.rae = torch.mean(torch.abs(tmp - torch.mean(tmp)))
def _normalized(self, normalize):
if (normalize == 0):
self.dat = self.rawdat
if (normalize == 1):
# normalized by the maximum value of entire matrix.
self.dat = self.rawdat / np.max(self.rawdat)
if (normalize == 2):
# normlized by the maximum value of each row (sensor).
for i in range(self.m):
self.scale[i] = np.max(np.abs(self.rawdat[:, i]))
self.dat[:, i] = self.rawdat[:, i] / np.max(np.abs(self.rawdat[:, i]))
if (normalize == 3):
# normlized by the mean/std value of each row (sensor).
for i in range(self.m):
self.scale[i] = np.std(self.rawdat[:, i]) #std
self.bias[i] = np.mean(self.rawdat[:, i])
self.dat[:, i] = (self.rawdat[:, i] - self.bias[i]) / self.scale[i]
if (normalize == 4):
# normlized by the mean/std value of each row (sensor).
for i in range(self.m):
self.scale[i] = np.std(self.rawdat[:int(self.dat.shape[0]*0.7), i]) #std
self.bias[i] = np.mean(self.rawdat[:int(self.dat.shape[0]*0.7), i])
self.dat[:, i] = (self.rawdat[:, i] - self.bias[i]) / self.scale[i]
def _split(self, train, valid, test):
train_set = range(self.P + self.h - 1, train)
valid_set = range(train, valid)
test_set = range(valid, self.n)
self.train = self._batchify(train_set, self.h)
self.valid = self._batchify(valid_set, self.h)
self.test = self._batchify(test_set, self.h)
def _batchify(self, idx_set, horizon):
n = len(idx_set)
X = torch.zeros((n, self.P, self.m))
Y = torch.zeros((n, self.h, self.m))
for i in range(n):
end = idx_set[i] - self.h + 1
start = end - self.P
X[i, :, :] = torch.from_numpy(self.dat[start:end, :])
# Y[i, :, :] = torch.from_numpy(self.dat[idx_set[i] - self.h:idx_set[i], :])
Y[i, :, :] = torch.from_numpy(self.dat[end:(idx_set[i]+1), :])
return [X, Y]
def get_batches(self, inputs, targets, batch_size, shuffle=True):
length = len(inputs)
if shuffle:
index = torch.randperm(length)
else:
index = torch.LongTensor(range(length))
start_idx = 0
while (start_idx < length):
end_idx = min(length, start_idx + batch_size)
excerpt = index[start_idx:end_idx]
X = inputs[excerpt]
Y = targets[excerpt]
X = X.cuda()
Y = Y.cuda()
yield Variable(X), Variable(Y)
start_idx += batch_size