-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
ReversibleHyperEmbeddings.py
127 lines (112 loc) · 4.63 KB
/
ReversibleHyperEmbeddings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import tensorflow as tf
from Utils.utils import CFakeObject
class CReversibleHyperEmbeddings(tf.keras.layers.Layer):
def __init__(self, input_dim, output_dim, name, **kwargs):
super().__init__(name=name, **kwargs)
self._N = input_dim
self.output_dim = output_dim
self._ittr = tf.Variable(0., trainable=False, name='%s/ittr' % self.name)
self._embeddings = tf.Variable(
initial_value=tf.random.normal((input_dim, output_dim), dtype=tf.float32, stddev=0.1),
trainable=True,
name='%s/embeddings' % name
)
return
@property
def embeddings(self):
res = self._embeddings
return res
def call(self, inputs):
B = tf.shape(inputs)[0]
tf.assert_equal(tf.shape(inputs), (B, ))
res = tf.gather(self.embeddings, inputs)
tf.assert_equal(tf.shape(res), (B, self.output_dim))
return res
def _score(self, x):
# calculate the softmax of the distance between the embeddings and the input
# Ensure `x` is 2D: [batch_size, num_features]
B = tf.shape(x)[0]
tf.assert_equal(tf.shape(x), (B, self.output_dim))
embeddings = self.embeddings
dot_product = tf.matmul(x, embeddings, transpose_b=True) # [B, N]
tf.assert_equal(tf.shape(dot_product), (B, self._N))
embLen = tf.reduce_sum(embeddings ** 2, axis=-1, keepdims=True)
embLen = tf.transpose(embLen)
tf.assert_equal(tf.shape(embLen), (1, self._N))
xLen = tf.reduce_sum(x ** 2, axis=-1, keepdims=True)
tf.assert_equal(tf.shape(xLen), (B, 1))
distance = embLen + xLen - 2 * dot_product
distance = tf.maximum(distance, 0.0)
tf.assert_equal(tf.shape(distance), (B, self._N))
res = tf.nn.softmax(-distance, axis=-1)
tf.assert_equal(tf.shape(res), (B, self._N))
return res
def separability(self):
scores = self._score(self.embeddings)
tf.assert_equal(tf.shape(scores), (self._N, self._N))
# maximize the separability of the embeddings
idx = tf.range(self._N)[..., None]
separability = tf.reduce_mean(
tf.losses.sparse_categorical_crossentropy(idx, scores)
)
# distance from the origin of the embeddings
# minimize the distance from the origin
distance = tf.reduce_sum(self.embeddings ** 2, axis=-1)
distance = tf.sqrt(distance)
distance = tf.reduce_mean(distance)
return separability + distance
@tf.function
def loss(self, x, target):
B = tf.shape(x)[0]
tf.assert_equal(tf.shape(x), (B, self.output_dim))
tf.assert_equal(tf.shape(target), (B, 1))
res = tf.losses.sparse_categorical_crossentropy(target, self._score(x))
self._ittr.assign_add(1.0)
# Each N iterations, print debug info
N = 1000
if tf.cast(self._ittr, tf.int32) % N == 0:
self.debug(x, target)
return res
def encode(self, color):
# color is in range [-1, 1], single value
N = tf.size(color)
x = tf.reshape(color, (N, 1))
x = tf.clip_by_value(x, clip_value_min=-1.0, clip_value_max=1.0)
x = (x + 1.0) / 2.0 # [0, 1]
idx = tf.cast(x * self._N, tf.int32)
idx = tf.clip_by_value(idx, clip_value_min=0, clip_value_max=self._N - 1)
return CFakeObject(indices=idx, embeddings=self(idx[:, 0]))
def decode(self, x):
B = tf.shape(x)[0]
tf.assert_equal(tf.shape(x), (B, self.output_dim))
scores = self._score(x)
tf.assert_equal(tf.shape(scores), (B, self._N))
idx = tf.argmax(scores, axis=-1)[..., None]
idx = tf.cast(idx, tf.int32)
x = tf.cast(idx, tf.float32) / self._N
x = x * 2.0 - 1.0
return CFakeObject(values=x, indices=idx)
def debug(self, x, target):
tf.print('-' * 80)
scores = self._score(x)
# top-1 accuracy
predIdx = tf.cast(tf.argmax(scores, axis=-1), tf.int32)[..., None]
acc = tf.reduce_mean(tf.cast(predIdx == target, tf.float32))
tf.print('[%s] Accuracy TOP-1:' % self.name, acc)
# find avg position of the correct answer
sortedInd = tf.argsort(-scores, axis=-1)
ranks = tf.argsort(sortedInd, axis=-1) # get rank positions
K = tf.gather_nd(ranks, target, batch_dims=1)[:, None] # get ranks of targets
K = tf.cast(K, tf.float32)
tf.assert_equal(tf.shape(K)[-1], 1)
tf.print('[%s] Avg. rank (K):' % self.name, tf.reduce_mean(K))
# find avg euclidean distance between the correct answer and the predicted one
correct = tf.stop_gradient(self(target[..., 0]))
dist = tf.reduce_sum((x - correct) ** 2, axis=-1)
dist = tf.sqrt(dist)
tf.print('[%s] Avg. distance:' % self.name, tf.reduce_mean(dist))
# find avg "radius" of the embeddings
radius = tf.reduce_sum(self.embeddings ** 2, axis=-1)
radius = tf.sqrt(radius)
tf.print('[%s] Avg. radius:' % self.name, tf.reduce_mean(radius))
return