-
Notifications
You must be signed in to change notification settings - Fork 85
/
Copy pathmodels.py
237 lines (201 loc) · 13.2 KB
/
models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import os
from tensorflow.keras.layers import Input, Concatenate, Dot, Embedding, Dropout, Lambda, Activation, LSTM, Dense
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
from tensorflow.keras.utils import plot_model
import numpy as np
import logging
logger = logging.getLogger(__name__)
class JointEmbeddingModel:
def __init__(self, config):
self.model_params = config.get('model_params', dict())
self.data_params = config.get('data_params',dict())
self.methname = Input(shape=(self.data_params['methname_len'],), dtype='int32', name='i_methname')
self.apiseq= Input(shape=(self.data_params['apiseq_len'],),dtype='int32',name='i_apiseq')
self.tokens=Input(shape=(self.data_params['tokens_len'],),dtype='int32',name='i_tokens')
self.desc_good = Input(shape=(self.data_params['desc_len'],), dtype='int32', name='i_desc_good')
self.desc_bad = Input(shape=(self.data_params['desc_len'],), dtype='int32', name='i_desc_bad')
# initialize a bunch of variables that will be set later
self._code_repr_model=None
self._desc_repr_model=None
self._sim_model = None
self._training_model = None
#self.prediction_model = None
def build(self):
'''
1. Build Code Representation Model
'''
logger.debug('Building Code Representation Model')
methname = Input(shape=(self.data_params['methname_len'],), dtype='int32', name='methname')
apiseq= Input(shape=(self.data_params['apiseq_len'],),dtype='int32',name='apiseq')
tokens=Input(shape=(self.data_params['tokens_len'],),dtype='int32',name='tokens')
## method name representation ##
#1.embedding
init_emb_weights = np.load(self.model_params['init_embed_weights_methname']) if self.model_params['init_embed_weights_methname'] is not None else None
if init_emb_weights is not None: init_emb_weights = [init_emb_weights]
embedding = Embedding(input_dim=self.data_params['n_words'],
output_dim=self.model_params.get('n_embed_dims', 100),
weights=init_emb_weights,
mask_zero=False,#Whether 0 in the input is a special "padding" value that should be masked out.
#If True, all subsequent layers in the model must support masking, otherwise an exception will be raised.
name='embedding_methname')
methname_embedding = embedding(methname)
dropout = Dropout(0.25,name='dropout_methname_embed')
methname_dropout = dropout(methname_embedding)
#2.rnn
f_rnn = LSTM(self.model_params.get('n_lstm_dims', 128), recurrent_dropout=0.2,
return_sequences=True, name='lstm_methname_f')
b_rnn = LSTM(self.model_params.get('n_lstm_dims', 128), return_sequences=True,
recurrent_dropout=0.2, name='lstm_methname_b',go_backwards=True)
methname_f_rnn = f_rnn(methname_dropout)
methname_b_rnn = b_rnn(methname_dropout)
dropout = Dropout(0.25,name='dropout_methname_rnn')
methname_f_dropout = dropout(methname_f_rnn)
methname_b_dropout = dropout(methname_b_rnn)
#3.maxpooling
maxpool = Lambda(lambda x: K.max(x, axis=1, keepdims=False), output_shape=lambda x: (x[0], x[2]),name='maxpool_methname')
methname_pool = Concatenate(name='concat_methname_lstms')([maxpool(methname_f_dropout), maxpool(methname_b_dropout)])
activation = Activation('tanh',name='active_methname')
methname_repr = activation(methname_pool)
## API Sequence Representation ##
#1.embedding
embedding = Embedding(input_dim=self.data_params['n_words'],
output_dim=self.model_params.get('n_embed_dims', 100),
#weights=weights,
mask_zero=False,#Whether 0 in the input is a special "padding" value that should be masked out.
#If True, all subsequent layers must support masking, otherwise an exception will be raised.
name='embedding_apiseq')
apiseq_embedding = embedding(apiseq)
dropout = Dropout(0.25,name='dropout_apiseq_embed')
apiseq_dropout = dropout(apiseq_embedding)
#2.rnn
f_rnn = LSTM(self.model_params.get('n_lstm_dims', 100), return_sequences=True, recurrent_dropout=0.2,
name='lstm_apiseq_f')
b_rnn = LSTM(self.model_params.get('n_lstm_dims', 100), return_sequences=True, recurrent_dropout=0.2,
name='lstm_apiseq_b', go_backwards=True)
apiseq_f_rnn = f_rnn(apiseq_dropout)
apiseq_b_rnn = b_rnn(apiseq_dropout)
dropout = Dropout(0.25,name='dropout_apiseq_rnn')
apiseq_f_dropout = dropout(apiseq_f_rnn)
apiseq_b_dropout = dropout(apiseq_b_rnn)
#3.maxpooling
maxpool = Lambda(lambda x: K.max(x, axis=1, keepdims=False), output_shape=lambda x: (x[0], x[2]),name='maxpool_apiseq')
apiseq_pool = Concatenate(name='concat_apiseq_lstms')([maxpool(apiseq_f_dropout), maxpool(apiseq_b_dropout)])
activation = Activation('tanh',name='active_apiseq')
apiseq_repr = activation(apiseq_pool)
## Tokens Representation ##
#1.embedding
init_emb_weights = np.load(self.model_params['init_embed_weights_tokens']) if self.model_params['init_embed_weights_tokens'] is not None else None
if init_emb_weights is not None: init_emb_weights = [init_emb_weights]
embedding = Embedding(input_dim=self.data_params['n_words'],
output_dim=self.model_params.get('n_embed_dims', 100),
weights=init_emb_weights,
#mask_zero=True,#Whether 0 in the input is a special "padding" value that should be masked out.
#If True, all subsequent layers must support masking, otherwise an exception will be raised.
name='embedding_tokens')
tokens_embedding = embedding(tokens)
dropout = Dropout(0.25,name='dropout_tokens_embed')
tokens_dropout= dropout(tokens_embedding)
#4.maxpooling
maxpool = Lambda(lambda x: K.max(x, axis=1, keepdims=False), output_shape=lambda x: (x[0], x[2]),name='maxpool_tokens')
tokens_pool = maxpool(tokens_dropout)
activation = Activation('tanh',name='active_tokens')
tokens_repr= activation(tokens_pool)
## concatenate the representation of code ##
merged_methname_api=Concatenate(name='merge_methname_api')([methname_repr,apiseq_repr])
merged_code_repr=Concatenate(name='merge_coderepr')([merged_methname_api,tokens_repr])
code_repr=Dense(self.model_params.get('n_hidden',400),activation='tanh',name='dense_coderepr')(merged_code_repr)
self._code_repr_model=Model(inputs=[methname,apiseq,tokens],outputs=[code_repr],name='code_repr_model')
'''
2. Build Desc Representation Model
'''
## Desc Representation ##
logger.debug('Building Desc Representation Model')
desc = Input(shape=(self.data_params['desc_len'],), dtype='int32', name='desc')
#1.embedding
init_emb_weights = np.load(self.model_params['init_embed_weights_desc']) if self.model_params['init_embed_weights_desc'] is not None else None
if init_emb_weights is not None: init_emb_weights = [init_emb_weights]
embedding = Embedding(input_dim=self.data_params['n_words'],
output_dim=self.model_params.get('n_embed_dims', 100),
weights=init_emb_weights,
mask_zero=True,#Whether 0 in the input is a special "padding" value that should be masked out.
#If True, all subsequent layers must support masking, otherwise an exception will be raised.
name='embedding_desc')
desc_embedding = embedding(desc)
dropout = Dropout(0.25,name='dropout_desc_embed')
desc_dropout = dropout(desc_embedding)
#2. rnn
f_rnn = LSTM(self.model_params.get('n_lstm_dims', 100), return_sequences=True, recurrent_dropout=0.2,
name='lstm_desc_f')
b_rnn = LSTM(self.model_params.get('n_lstm_dims', 100), return_sequences=True, recurrent_dropout=0.2,
name='lstm_desc_b', go_backwards=True)
desc_f_rnn = f_rnn(desc_dropout)
desc_b_rnn = b_rnn(desc_dropout)
dropout = Dropout(0.25,name='dropout_desc_rnn')
desc_f_dropout = dropout(desc_f_rnn)
desc_b_dropout = dropout(desc_b_rnn)
#3. maxpooling
maxpool = Lambda(lambda x: K.max(x, axis=1, keepdims=False), output_shape=lambda x: (x[0], x[2]),name='maxpool_desc')
desc_pool = Concatenate(name='concat_desc_rnns')([maxpool(desc_f_dropout), maxpool(desc_b_dropout)])
activation = Activation('tanh',name='active_desc')
desc_repr = activation(desc_pool)
self._desc_repr_model=Model(inputs=[desc],outputs=[desc_repr],name='desc_repr_model')
"""
3: calculate the cosine similarity between code and desc
"""
logger.debug('Building similarity model')
code_repr=self._code_repr_model([methname,apiseq,tokens])
desc_repr=self._desc_repr_model([desc])
cos_sim=Dot(axes=1, normalize=True, name='cos_sim')([code_repr, desc_repr])
sim_model = Model(inputs=[methname,apiseq,tokens,desc], outputs=[cos_sim],name='sim_model')
self._sim_model=sim_model #for model evaluation
'''
4:Build training model
'''
good_sim = sim_model([self.methname,self.apiseq,self.tokens, self.desc_good])# similarity of good output
bad_sim = sim_model([self.methname,self.apiseq,self.tokens, self.desc_bad])#similarity of bad output
loss = Lambda(lambda x: K.maximum(1e-6, self.model_params['margin'] - x[0] + x[1]),
output_shape=lambda x: x[0], name='loss')([good_sim, bad_sim])
logger.debug('Building training model')
self._training_model=Model(inputs=[self.methname,self.apiseq,self.tokens,self.desc_good,self.desc_bad],
outputs=[loss],name='training_model')
def summary(self, export_path):
print('Summary of the code representation model')
self._code_repr_model.summary()
#plot_model(self._code_repr_model, show_shapes=True, to_file= export_path+'code_repr_model.png')
print('Summary of the desc representation model')
self._desc_repr_model.summary()
#plot_model(self._desc_repr_model, show_shapes=True, to_file=export_path+'desc_repr_model.png')
print ("Summary of the similarity model")
self._sim_model.summary()
#plot_model(self._sim_model, show_shapes=True, to_file= export_path+'sim_model.png')
print ('Summary of the training model')
self._training_model.summary()
#plot_model(self._training_model, show_shapes=True, to_file=export_path+'training_model.png')
def compile(self, optimizer, **kwargs):
logger.info('compiling models')
self._code_repr_model.compile(loss='cosine_similarity', optimizer=optimizer, **kwargs)
self._desc_repr_model.compile(loss='cosine_similarity', optimizer=optimizer, **kwargs)
self._training_model.compile(loss=lambda y_true, y_pred: y_pred+y_true-y_true, optimizer=optimizer, **kwargs)
#+y_true-y_true is for avoiding an unused input warning, it can be simply +y_true since y_true is always 0 in the training set.
self._sim_model.compile(loss='binary_crossentropy', optimizer=optimizer, **kwargs)
def fit(self, x, **kwargs):
assert self._training_model is not None, 'Must compile the model before fitting data'
y = np.zeros(shape=x[0].shape[:1],dtype=np.float32)
return self._training_model.fit(x, y, **kwargs)
def repr_code(self, x, **kwargs):
return self._code_repr_model.predict(x, **kwargs)
def repr_desc(self, x, **kwargs):
return self._desc_repr_model.predict(x, **kwargs)
def predict(self, x, **kwargs):
return self._sim_model.predict(x, **kwargs)
def save(self, code_model_file, desc_model_file, **kwargs):
assert self._code_repr_model is not None, 'Must compile the model before saving weights'
self._code_repr_model.save_weights(code_model_file, **kwargs)
assert self._desc_repr_model is not None, 'Must compile the model before saving weights'
self._desc_repr_model.save_weights(desc_model_file, **kwargs)
def load(self, code_model_file, desc_model_file, **kwargs):
assert self._code_repr_model is not None, 'Must compile the model loading weights'
self._code_repr_model.load_weights(code_model_file, **kwargs)
assert self._desc_repr_model is not None, 'Must compile the model loading weights'
self._desc_repr_model.load_weights(desc_model_file, **kwargs)