-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathlinear_classifier.py
155 lines (113 loc) · 4.98 KB
/
linear_classifier.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#vim:fileencoding=utf-8
import numpy as np
import utils
class linear_classifier(object):
def __init__(self, fact):
self.fact = fact
self.fdact = utils.lookup_dfunc(fact)
def train(self, train_data, valid_data,
verbose, epoch, batch_size,
weight_decay, floss, optimizer, lr_scheduler, weights_initializer):
din, dout = train_data[0][0].shape[0], train_data[0][1].shape[0]
self.w = weights_initializer((dout, din))
self.b = np.zeros((dout,))
for e in range(epoch):
acc, loss = self.run_one_epoch(e, train_data, batch_size, weight_decay, floss, optimizer, lr_scheduler)
if verbose:
valid_acc = self.evaluate(valid_data, batch_size)
print('epoch=%d, acc=%.2f%%, loss=%f, valid_acc=%.2f%%' %
(e + 1, acc * 100, loss, valid_acc * 100))
def run_one_epoch(self, epoch, train_data, batch_size, weight_decay, floss, optimizer, lr_scheduler):
fdloss = utils.lookup_dfunc(floss)
n = len(train_data)
total_loss = 0
total_acc = 0
for batch in utils.break_into_batches(train_data, batch_size, shuffle=True):
grad_w = np.zeros(self.w.shape)
grad_b = np.zeros(self.b.shape)
for x, y_expect in batch:
# forward
wx = np.dot(self.w, x)
wxpb = wx + self.b
y = self.fact(wxpb)
loss = floss(y, y_expect)
# backward
dy = fdloss(loss, y, y_expect)
dwxpb = self.fdact(dy, y, wxpb)
dwx = dwxpb
db = dwxpb
dw = utils.ddot0(dwx.reshape((-1,1)), wx, self.w, x.reshape((-1,1)))
# update grad
grad_w += dw
grad_b += db
# metrics
total_loss += loss
total_acc += y_expect[np.argmax(y)] == 1
# optimize
lr = lr_scheduler(epoch)
optimizer([self.w, self.b], [grad_w, grad_b], n, batch_size, lr, weight_decay)
return total_acc / n, total_loss / n
def evaluate(self, test_data, batch_size):
succ = 0
for x, y in test_data:
if (y == self.predicate(x)).all():
succ = succ + 1
return succ / len(test_data)
def predicate(self, x):
act = self.fact(np.dot(self.w, x) + self.b)
return act == np.max(act)
class linear_classifier2(object):
def __init__(self, fact):
self.fact = fact
self.fdact = utils.lookup_dfunc(fact)
def train(self, train_data, valid_data,
verbose, epoch, batch_size,
weight_decay, floss, optimizer, lr_scheduler, weights_initializer):
din, dout = train_data[0][0].shape[0], train_data[0][1].shape[0]
self.w = weights_initializer((din, dout))
self.b = np.zeros((dout,))
for e in range(epoch):
acc, loss = self.run_one_epoch(e, train_data, batch_size, weight_decay, floss, optimizer, lr_scheduler)
if verbose:
valid_acc = self.evaluate(valid_data, batch_size)
print('epoch=%d, acc=%.2f%%, loss=%f, valid_acc=%.2f%%' %
(e + 1, acc * 100, loss, valid_acc * 100))
def run_one_epoch(self, epoch, train_data, batch_size, weight_decay, floss, optimizer, lr_scheduler):
fdloss = utils.lookup_dfunc(floss)
n = len(train_data)
total_loss = 0
total_succ = 0
for batch in utils.break_into_batches(train_data, batch_size, shuffle=True):
X = np.array([x for x, _ in batch])
Y_expect = np.array([y for _, y in batch])
# forward
Xw = np.dot(X, self.w)
Xwpb = Xw + self.b
Y = self.fact(Xwpb)
Loss = floss(Y, Y_expect)
# backward
dY = fdloss(Loss, Y, Y_expect)
dXwpb = self.fdact(dY, Y, Xwpb)
dXw = dXwpb
db = np.sum(dXwpb, axis=0)
dw = utils.ddot1(dXw, Xw, X, self.w)
# metrics
total_loss += np.sum(Loss)
total_succ += np.sum(Y_expect[range(Y_expect.shape[0]), np.argmax(Y, axis=-1)])
# optimize
lr = lr_scheduler(epoch)
optimizer([self.w, self.b], [dw, db], n, batch_size, lr, weight_decay)
return total_succ / n, total_loss / n
def evaluate(self, test_data, batch_size):
total_succ = 0
for batch in utils.break_into_batches(test_data, batch_size):
X = np.array([x for x, _ in batch])
Y_expect = np.array([y for _, y in batch])
Xw = np.dot(X, self.w)
Xwpb = Xw + self.b
Y = self.fact(Xwpb)
total_succ += np.sum(Y_expect[range(Y_expect.shape[0]), np.argmax(Y, axis=-1)])
return total_succ / len(test_data)
def predicate(self, x):
act = self.fact(np.dot(x, self.w) + self.b)
return act == np.max(act)