-
Notifications
You must be signed in to change notification settings - Fork 6.4k
/
Copy pathrbm_tf.py
139 lines (114 loc) · 4.8 KB
/
rbm_tf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
# https://deeplearningcourses.com/c/unsupervised-deep-learning-in-python
# https://www.udemy.com/unsupervised-deep-learning-in-python
from __future__ import print_function, division
from builtins import range
# Note: you may need to update your version of future
# sudo pip install -U future
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.utils import shuffle
from util import getKaggleMNIST
from autoencoder_tf import DNN
class RBM(object):
def __init__(self, D, M, an_id):
self.D = D
self.M = M
self.id = an_id
self.build(D, M)
def set_session(self, session):
self.session = session
def build(self, D, M):
# params
self.W = tf.Variable(tf.random.normal(shape=(D, M)) * np.sqrt(2.0 / M))
# note: without limiting variance, you get numerical stability issues
self.c = tf.Variable(np.zeros(M).astype(np.float32))
self.b = tf.Variable(np.zeros(D).astype(np.float32))
# data
self.X_in = tf.compat.v1.placeholder(tf.float32, shape=(None, D))
# conditional probabilities
# NOTE: tf.contrib.distributions.Bernoulli API has changed in Tensorflow v1.2
V = self.X_in
p_h_given_v = tf.nn.sigmoid(tf.matmul(V, self.W) + self.c)
self.p_h_given_v = p_h_given_v # save for later
# self.rng_h_given_v = tf.contrib.distributions.Bernoulli(
# probs=p_h_given_v,
# dtype=tf.float32
# )
r = tf.random.uniform(shape=tf.shape(input=p_h_given_v))
H = tf.cast(r < p_h_given_v, dtype=tf.float32)
p_v_given_h = tf.nn.sigmoid(tf.matmul(H, tf.transpose(a=self.W)) + self.b)
# self.rng_v_given_h = tf.contrib.distributions.Bernoulli(
# probs=p_v_given_h,
# dtype=tf.float32
# )
r = tf.random.uniform(shape=tf.shape(input=p_v_given_h))
X_sample = tf.cast(r < p_v_given_h, dtype=tf.float32)
# build the objective
objective = tf.reduce_mean(input_tensor=self.free_energy(self.X_in)) - tf.reduce_mean(input_tensor=self.free_energy(X_sample))
self.train_op = tf.compat.v1.train.AdamOptimizer(1e-2).minimize(objective)
# self.train_op = tf.train.GradientDescentOptimizer(1e-3).minimize(objective)
# build the cost
# we won't use this to optimize the model parameters
# just to observe what happens during training
logits = self.forward_logits(self.X_in)
self.cost = tf.reduce_mean(
input_tensor=tf.nn.sigmoid_cross_entropy_with_logits(
labels=self.X_in,
logits=logits,
)
)
def fit(self, X, epochs=1, batch_sz=100, show_fig=False):
N, D = X.shape
n_batches = N // batch_sz
costs = []
print("training rbm: %s" % self.id)
for i in range(epochs):
print("epoch:", i)
X = shuffle(X)
for j in range(n_batches):
batch = X[j*batch_sz:(j*batch_sz + batch_sz)]
_, c = self.session.run((self.train_op, self.cost), feed_dict={self.X_in: batch})
if j % 10 == 0:
print("j / n_batches:", j, "/", n_batches, "cost:", c)
costs.append(c)
if show_fig:
plt.plot(costs)
plt.show()
def free_energy(self, V):
b = tf.reshape(self.b, (self.D, 1))
first_term = -tf.matmul(V, b)
first_term = tf.reshape(first_term, (-1,))
second_term = -tf.reduce_sum(
# tf.log(1 + tf.exp(tf.matmul(V, self.W) + self.c)),
input_tensor=tf.nn.softplus(tf.matmul(V, self.W) + self.c),
axis=1
)
return first_term + second_term
def forward_hidden(self, X):
return tf.nn.sigmoid(tf.matmul(X, self.W) + self.c)
def forward_logits(self, X):
Z = self.forward_hidden(X)
return tf.matmul(Z, tf.transpose(a=self.W)) + self.b
def forward_output(self, X):
return tf.nn.sigmoid(self.forward_logits(X))
def transform(self, X):
# accepts and returns a real numpy array
# unlike forward_hidden and forward_output
# which deal with tensorflow variables
return self.session.run(self.p_h_given_v, feed_dict={self.X_in: X})
def main():
Xtrain, Ytrain, Xtest, Ytest = getKaggleMNIST()
# same as autoencoder_tf.py
Xtrain = Xtrain.astype(np.float32)
Xtest = Xtest.astype(np.float32)
_, D = Xtrain.shape
K = len(set(Ytrain))
dnn = DNN(D, [1000, 750, 500], K, UnsupervisedModel=RBM)
init_op = tf.compat.v1.global_variables_initializer()
with tf.compat.v1.Session() as session:
session.run(init_op)
dnn.set_session(session)
dnn.fit(Xtrain, Ytrain, Xtest, Ytest, pretrain=True, epochs=10)
if __name__ == '__main__':
main()