-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathWord2Vec.py
360 lines (278 loc) · 11 KB
/
Word2Vec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
import pickle
import numpy as np
import re
from collections import Counter
import nltk
from nltk import skipgrams
from nltk.corpus import stopwords
from numpy import random
def normalize_text(fn):
""" Loading a text file and normalizing it, returning a list of sentences.
Args:
fn: full path to the text file to process
"""
sentences = []
# try:
# file = open(fn, 'r')
# except :
file = open(fn, "r", encoding="cp1252")
lines = file.readlines()
file.close()
for line in lines:
line = line.strip()
if line == "":
continue
line = re.sub(r'["|“|”|.|!|?|,]+', "", line)
line = line.lower()
sentences.append(line)
return sentences
def sigmoid(x):
return 1.0 / (1 + np.exp(-x))
def load_model(fn):
""" Loads a model pickle and return it.
Args:
fn: the full path to the model to load.
"""
file = open(fn, "rb")
sg_model = pickle.load(file)
file.close()
return sg_model
class SkipGram:
def __init__(
self, sentences, d=100, neg_samples=4, context=4, word_count_threshold=5
):
self.sentences = sentences
self.d = d # embedding dimension
self.neg_samples = (
neg_samples # num of negative samples for one positive sample
)
self.context = (
context # the size of the context window (not counting the target word)
)
self.word_count_threshold = word_count_threshold # ignore low frequency words (appearing under the threshold)
self.T = [] # embedding matrix
self.C = [] # embedding matrix
# word:count dictionary
counts = Counter()
for line in sentences:
counts.update(line.split())
# ignore low frequency words and stopwords
nltk.download("stopwords", quiet=True)
stop_words = set(stopwords.words("english"))
counts = Counter(
{
k: v
for k, v in counts.items()
if k not in stop_words and v >= word_count_threshold and "’" not in k
}
)
self.word_count = dict(counts)
# how many unique words in our dictionary
self.vocab_size = len(counts)
# word-index map
self.word_index = {}
index = 0
for word in dict(counts).keys():
self.word_index[word] = index
index += 1
def compute_similarity(self, w1, w2):
""" Returns the cosine similarity (in [0,1]) between the specified words.
Args:
w1: a word
w2: a word
Returns: a float in [0,1]; defaults to 0.0 if one of specified words is OOV.
"""
sim = 0.0 # default
if w1 not in self.word_index or w2 not in self.word_index:
return sim # default
nx = self.T[:, self.word_index[w1]]
ny = self.T[:, self.word_index[w2]]
sim = np.dot(nx, ny) / (np.linalg.norm(nx) * np.linalg.norm(ny))
return sim
def get_closest_words(self, w, n=5):
"""Returns a list containing the n words that are the closest to the specified word.
Args:
w: the word to find close words to.
n: the number of words to return. Defaults to 5.
"""
if w not in self.word_index:
return [] # default
y = self.feed_forward(w)
n = min(n, self.vocab_size)
candidates = []
for word, index in self.word_index.items():
candidates.append((word, y[index]))
candidates = sorted(candidates, key=lambda x: x[1], reverse=True)
candidates = [word for word, score in candidates]
return candidates[:n]
def feed_forward(self, w):
"""Returns a normalized output layer for a word
Args:
w: word to get output for
"""
# Input layer x T = Hidden layer
input_layer_id = self.word_index[w]
hidden = self.T[:, input_layer_id][:, None]
# Hidden layer x C = Output layer
output_layer = np.dot(self.C, hidden)
y = sigmoid(output_layer)
return y
def learn_embeddings(self, step_size=0.001, epochs=50, save_as="vocab.pickle"):
"""Returns a trained embedding models and saves it as specified name
this function also backup the last model ( done by the last epoch)
with the file name of temp.pickle
Args:
step_size: step size for the gradient descent. Defaults to 0.0001
epochs: number or training epochs. Defaults to 50
save_as: name of the trained model
"""
print("start preprocessing")
vocab_size = self.vocab_size
# in skip gram we want to predict the context words from the target words
T = np.random.rand(self.d, vocab_size) # embedding matrix of target words
C = np.random.rand(vocab_size, self.d) # embedding matrix of context words
# create learning vectors
learning_vector = []
for sentence in self.sentences:
dic = {}
# create positive and negative lists
pos_lst = list(skipgrams(sentence.split(), int(self.context / 2), 1))
pos_lst += [(tup[1], tup[0]) for tup in pos_lst]
neg_lst = []
for _ in range(self.neg_samples):
neg_lst += [
(word, random.choice(list(self.word_count.keys())))
for word in sentence.split()
]
# merge to key value
pos = {}
for x, y in pos_lst:
if x not in self.word_count or y not in self.word_count:
continue
pos.setdefault(x, []).append(y)
neg = {}
for x, y in neg_lst:
if x not in self.word_count or y not in self.word_count:
continue
neg.setdefault(x, []).append(y)
# create the learning context vector
for key, val in pos.items():
dic[key] = np.zeros(self.vocab_size, dtype=int)
for v in val:
dic[key][self.word_index[v]] += 1
for v in neg[key]:
dic[key][self.word_index[v]] -= 1
learning_vector += dic.items()
print("done preprocessing")
print("start training")
for i in range(epochs):
print(f"epoch {i + 1}")
# learning:
for key, val in learning_vector:
# Input layer x T = Hidden layer
input_layer_id = self.word_index[key]
input_layer = np.zeros(self.vocab_size, dtype=int)
input_layer[input_layer_id] = 1
input_layer = np.vstack(input_layer)
hidden = T[:, input_layer_id][:, None]
# Hidden layer x C = Output layer
output_layer = np.dot(C, hidden)
y = sigmoid(output_layer)
# calculate gradient
e = y - val.reshape(self.vocab_size, 1)
outer_grad = np.dot(hidden, e.T).T
inner_grad = np.dot(input_layer, np.dot(C.T, e).T).T
C -= step_size * outer_grad
T -= step_size * inner_grad
# backup the last trained model (the last epoch)
self.T = T
self.C = C
with open("temp.pickle", "wb") as f:
pickle.dump(self, f)
step_size *= 1 / (1 + step_size * i)
print("done training")
self.T = T
self.C = C
with open(save_as, "wb") as f:
pickle.dump(self, f)
print("saved as 'vocab.pickle' file")
return T, C
class SemantleSolver:
def __init__(self, sg_model, top_ranks=1000, give_up=500):
""" Initializes the Semantle solver
Args:
sg_model: a SkipGram object
top_ranks: the proximity rank over which the rank is returned by check_word
give_up: the max number of words to guess before quitting in disgrace.
"""
self.sg_model = sg_model
self.top_ranks = top_ranks
self.top_ranks_dict = None # the word to guess top rank dict - word:(sim, rank)
self.give_up = give_up
self.target_word = None # the word to guess
self.target_word_index = None # the word to guess index
self.set_target_word()
def set_target_word(self, target=None):
""" Sets a target word for the solver to guess.
Samples from the model vocabulary if no word is specified,
or that the specified word is not part of the model.
"""
word_index = self.sg_model.word_index
if target is None or target not in word_index:
target = random.choice(list(word_index.keys()))
self.target_word = target
self.target_word_index = word_index[target]
top_ranks_list = self.sg_model.get_closest_words(target, self.top_ranks)
self.top_ranks_dict = {}
i = 1
for word in top_ranks_list:
self.top_ranks_dict[word] = (
self.sg_model.compute_similarity(target, word),
i,
)
i += 1
def check_word(self, w):
""" Returns a tuple sim, rank (float, int), indicating the similarity between the specified word and the target
word and the distance rank if the specified word is within the specified rank.
The returned rank should be -1 if the word is not among the self.top_ranks closest words.
Args:
w: the word to guess (match against the target word)
"""
sim = 0.0
rank = -1
if w in self.top_ranks_dict:
sim, rank = self.top_ranks_dict[w]
return sim, rank
def semantle_game(self):
"""Returns won, shots - won is true if the player guessed the correct word,
shots is a list of triplets (w, sim,rank). sim and rank are returned by check_word(w)
len(shots) cannot exceed self.give_up
shots[0] should hold the first guess
shots[-1] should hold the last guess (hopefully the match)
"""
shots = []
won = False
print("Guess the word")
print('If you want to give up please write "give up" as your guess')
for i in range(self.give_up):
guess = input("Your next guess is :")
if guess == "give up":
break
sim, rank = self.check_word(guess)
shots.append((guess, sim, rank))
if guess == self.target_word:
won = True
break
if rank == -1:
print(f"Your guess {guess} not close to the target word")
else:
print(
f"Your guess {guess} ranked as top {rank} with similarity of {sim}"
)
print(f"The correct word was {self.target_word}")
if won:
print("You guessed correctly")
else:
print("You lost, better luck next time")
self.set_target_word()
return won, shots