forked from datawhalechina/fun-rec
-
Notifications
You must be signed in to change notification settings - Fork 0
/
xDeepFM.py
286 lines (223 loc) · 14.9 KB
/
xDeepFM.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
import pandas as pd
import numpy as np
from tqdm import tqdm
import warnings, random, math, os
from collections import namedtuple, OrderedDict
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
import tensorflow.keras.backend as K
from tensorflow.python.keras.initializers import (Zeros, glorot_normal,
glorot_uniform)
from tensorflow.python.keras.regularizers import l2
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler, StandardScaler, LabelEncoder
from utils import DenseFeat, SparseFeat, VarLenSparseFeat
# 简单处理特征,包括填充缺失值,数值处理,类别编码
def data_process(data_df, dense_features, sparse_features):
data_df[dense_features] = data_df[dense_features].fillna(0.0)
for f in dense_features:
data_df[f] = data_df[f].apply(lambda x: np.log(x+1) if x > -1 else -1)
data_df[sparse_features] = data_df[sparse_features].fillna("-1")
for f in sparse_features:
lbe = LabelEncoder()
data_df[f] = lbe.fit_transform(data_df[f])
return data_df[dense_features + sparse_features]
# 构建输入层
# 将输入的数据转换成字典的形式,定义输入层的时候让输入层的name和字典中特征的key一致,就可以使得输入的数据和对应的Input层对应
def build_input_layers(feature_columns):
"""构建Input层字典,并以dense和sparse两类字典的形式返回"""
dense_input_dict, sparse_input_dict = {}, {}
for fc in feature_columns:
if isinstance(fc, SparseFeat):
sparse_input_dict[fc.name] = Input(shape=(1, ), name=fc.name, dtype=fc.dtype)
elif isinstance(fc, DenseFeat):
dense_input_dict[fc.name] = Input(shape=(fc.dimension, ), name=fc.name, dtype=fc.dtype)
return dense_input_dict, sparse_input_dict
# 构建embedding层
def build_embedding_layers(feature_columns, input_layer_dict, is_linear):
# 定义一个embedding层对应的字典
embedding_layers_dict = dict()
# 将特征中的sparse特征筛选出来
sparse_features_columns = list(filter(lambda x: isinstance(x, SparseFeat), feature_columns)) if feature_columns else []
# 如果是用于线性部分的embedding层,其维度是1,否则维度是自己定义的embedding维度
if is_linear:
for fc in sparse_features_columns:
embedding_layers_dict[fc.name] = Embedding(fc.vocabulary_size, 1, name='1d_emb_'+fc.name)
else:
for fc in sparse_features_columns:
embedding_layers_dict[fc.name] = Embedding(fc.vocabulary_size, fc.embedding_dim, name='kd_emb_'+fc.name)
return embedding_layers_dict
# 将所有的sparse特征embedding拼接
def concat_embedding_list(feature_columns, input_layer_dict, embedding_layer_dict, flatten=False):
# 将sparse特征筛选出来
sparse_feature_columns = list(filter(lambda x: isinstance(x, SparseFeat), feature_columns))
embedding_list = []
for fc in sparse_feature_columns:
_input = input_layer_dict[fc.name] # 获取输入层
_embed = embedding_layer_dict[fc.name] # B x 1 x dim 获取对应的embedding层
embed = _embed(_input) # B x dim 将input层输入到embedding层中
# 是否需要flatten, 如果embedding列表最终是直接输入到Dense层中,需要进行Flatten,否则不需要
if flatten:
embed = Flatten()(embed)
embedding_list.append(embed)
return embedding_list
def get_dnn_output(dnn_input, hidden_units=[1024, 512, 256], dnn_dropout=0.3, activation='relu'):
# 建立dnn_network
dnn_network = [Dense(units=unit, activation=activation) for unit in hidden_units]
dropout = Dropout(dnn_dropout)
# 前向传播
x = dnn_input
for dnn in dnn_network:
x = dropout(dnn(x))
return x
# 得到线性部分的计算结果, 即线性部分计算的前向传播逻辑
def get_linear_logits(dense_input_dict, sparse_input_dict, linear_feature_columns):
"""
线性部分的计算,所有特征的Input层,然后经过一个全连接层线性计算结果logits
即FM线性部分的那块计算w1x1+w2x2+...wnxn + b,只不过,连续特征和离散特征这里的线性计算还不太一样
连续特征由于是数值,可以直接过全连接,得到线性这边的输出。
离散特征需要先embedding得到1维embedding,然后直接把这个1维的embedding相加就得到离散这边的线性输出。
:param dense_input_dict: A dict. 连续特征构建的输入层字典 形式{'dense_name': Input(shape, name, dtype)}
:param sparse_input_dict: A dict. 离散特征构建的输入层字典 形式{'sparse_name': Input(shape, name, dtype)}
:param linear_feature_columns: A list. 里面的每个元素是namedtuple(元组的一种扩展类型,同时支持序号和属性名访问组件)类型,表示的是linear数据的特征封装版
"""
# 把所有的dense特征合并起来,经过一个神经元的全连接,做的计算 w1x1 + w2x2 + w3x3....wnxn
concat_dense_inputs = Concatenate(axis=1)(list(dense_input_dict.values()))
dense_logits_output = Dense(1)(concat_dense_inputs)
# 获取linear部分sparse特征的embedding层,这里使用embedding的原因:
# 对于linear部分直接将特征进行OneHot然后通过一个全连接层,当维度特别大的时候,计算比较慢
# 使用embedding层的好处就是可以通过查表的方式获取到非零元素对应的权重,然后将这些权重相加,提升效率
linear_embedding_layers = build_embedding_layers(linear_feature_columns, sparse_input_dict, is_linear=True)
# 将一维的embedding拼接,注意这里需要一个Flatten层, 使维度对应
sparse_1d_embed = []
for fc in linear_feature_columns:
# 离散特征要进行embedding
if isinstance(fc, SparseFeat):
# 找到对应Input层,然后后面接上embedding层
feat_input = sparse_input_dict[fc.name]
embed = Flatten()(linear_embedding_layers[fc.name](feat_input))
sparse_1d_embed.append(embed)
# embedding中查询得到的权重就是对应onehot向量中一个位置的权重,所以后面不用再接一个全连接了,本身一维的embedding就相当于全连接
# 只不过是这里的输入特征只有0和1,所以直接向非零元素对应的权重相加就等同于进行了全连接操作(非零元素部分乘的是1)
sparse_logits_output = Add()(sparse_1d_embed)
# 最终将dense特征和sparse特征对应的logits相加,得到最终linear的logits
linear_part = Add()([dense_logits_output, sparse_logits_output])
return linear_part
class CIN(Layer):
def __init__(self, cin_size, l2_reg=1e-4):
"""
:param: cin_size: A list. [H_1, H_2, ....H_T], a list of number of layers
"""
super(CIN, self).__init__()
self.cin_size = cin_size
self.l2_reg = l2_reg
def build(self, input_shape):
# input_shape [None, field_nums, embedding_dim]
self.field_nums = input_shape[1]
# CIN 的每一层大小,这里加入第0层,也就是输入层H_0
self.field_nums = [self.field_nums] + self.cin_size
# 过滤器
self.cin_W = {
'CIN_W_' + str(i): self.add_weight(
name='CIN_W_' + str(i),
shape = (1, self.field_nums[0] * self.field_nums[i], self.field_nums[i+1]), # 这个大小要理解
initializer='random_uniform',
regularizer=l2(self.l2_reg),
trainable=True
)
for i in range(len(self.field_nums)-1)
}
super(CIN, self).build(input_shape)
def call(self, inputs):
# inputs [None, field_num, embed_dim]
embed_dim = inputs.shape[-1]
hidden_layers_results = [inputs]
# 从embedding的维度把张量一个个的切开,这个为了后面逐通道进行卷积,算起来好算
# 这个结果是个list, list长度是embed_dim, 每个元素维度是[None, field_nums[0], 1] field_nums[0]即输入的特征个数
# 即把输入的[None, field_num, embed_dim],切成了embed_dim个[None, field_nums[0], 1]的张量
split_X_0 = tf.split(hidden_layers_results[0], embed_dim, 2)
for idx, size in enumerate(self.cin_size):
# 这个操作和上面是同理的,也是为了逐通道卷积的时候更加方便,分割的是当一层的输入Xk-1
split_X_K = tf.split(hidden_layers_results[-1], embed_dim, 2) # embed_dim个[None, field_nums[i], 1] feild_nums[i] 当前隐藏层单元数量
# 外积的运算
out_product_res_m = tf.matmul(split_X_0, split_X_K, transpose_b=True) # [embed_dim, None, field_nums[0], field_nums[i]]
out_product_res_o = tf.reshape(out_product_res_m, shape=[embed_dim, -1, self.field_nums[0]*self.field_nums[idx]]) # 后两维合并起来
out_product_res = tf.transpose(out_product_res_o, perm=[1, 0, 2]) # [None, dim, field_nums[0]*field_nums[i]]
# 卷积运算
# 这个理解的时候每个样本相当于1张通道为1的照片 dim为宽度, field_nums[0]*field_nums[i]为长度
# 这时候的卷积核大小是field_nums[0]*field_nums[i]的, 这样一个卷积核的卷积操作相当于在dim上进行滑动,每一次滑动会得到一个数
# 这样一个卷积核之后,会得到dim个数,即得到了[None, dim, 1]的张量, 这个即当前层某个神经元的输出
# 当前层一共有field_nums[i+1]个神经元, 也就是field_nums[i+1]个卷积核,最终的这个输出维度[None, dim, field_nums[i+1]]
cur_layer_out = tf.nn.conv1d(input=out_product_res, filters=self.cin_W['CIN_W_'+str(idx)], stride=1, padding='VALID')
cur_layer_out = tf.transpose(cur_layer_out, perm=[0, 2, 1]) # [None, field_num[i+1], dim]
hidden_layers_results.append(cur_layer_out)
# 最后CIN的结果,要取每个中间层的输出,这里不要第0层的了
final_result = hidden_layers_results[1:] # 这个的维度T个[None, field_num[i], dim] T 是CIN的网络层数
# 接下来在第一维度上拼起来
result = tf.concat(final_result, axis=1) # [None, H1+H2+...HT, dim]
# 接下来, dim维度上加和,并把第三个维度1干掉
result = tf.reduce_sum(result, axis=-1, keepdims=False) # [None, H1+H2+..HT]
return result
def xDeepFM(linear_feature_columns, dnn_feature_columns, cin_size=[128, 128]):
# 构建输入层,即所有特征对应的Input()层,这里使用字典的形式返回,方便后续构建模型
dense_input_dict, sparse_input_dict = build_input_layers(linear_feature_columns+dnn_feature_columns)
# 构建模型的输入层,模型的输入层不能是字典的形式,应该将字典的形式转换成列表的形式
# 注意:这里实际的输入预Input层对应,是通过模型输入时候的字典数据的key与对应name的Input层
input_layers = list(dense_input_dict.values()) + list(sparse_input_dict.values())
# 线性部分的计算逻辑 -- linear
linear_logits = get_linear_logits(dense_input_dict, sparse_input_dict, linear_feature_columns)
# 构建维度为k的embedding层,这里使用字典的形式返回,方便后面搭建模型
# 线性层和dnn层统一的embedding层
embedding_layer_dict = build_embedding_layers(linear_feature_columns+dnn_feature_columns, sparse_input_dict, is_linear=False)
# DNN侧的计算逻辑 -- Deep
# 将dnn_feature_columns里面的连续特征筛选出来,并把相应的Input层拼接到一块
dnn_dense_feature_columns = list(filter(lambda x: isinstance(x, DenseFeat), dnn_feature_columns)) if dnn_feature_columns else []
dnn_dense_feature_columns = [fc.name for fc in dnn_dense_feature_columns]
dnn_concat_dense_inputs = Concatenate(axis=1)([dense_input_dict[col] for col in dnn_dense_feature_columns])
# 将dnn_feature_columns里面的离散特征筛选出来,相应的embedding层拼接到一块
dnn_sparse_kd_embed = concat_embedding_list(dnn_feature_columns, sparse_input_dict, embedding_layer_dict, flatten=True)
dnn_concat_sparse_kd_embed = Concatenate(axis=1)(dnn_sparse_kd_embed)
# DNN层的输入和输出
dnn_input = Concatenate(axis=1)([dnn_concat_dense_inputs, dnn_concat_sparse_kd_embed])
dnn_out = get_dnn_output(dnn_input)
dnn_logits = Dense(1)(dnn_out)
# CIN侧的计算逻辑, 这里使用的DNN feature里面的sparse部分,这里不要flatten
exFM_sparse_kd_embed = concat_embedding_list(dnn_feature_columns, sparse_input_dict, embedding_layer_dict, flatten=False)
exFM_input = Concatenate(axis=1)(exFM_sparse_kd_embed)
exFM_out = CIN(cin_size=cin_size)(exFM_input)
exFM_logits = Dense(1)(exFM_out)
# 三边的结果stack
stack_output = Add()([linear_logits, dnn_logits, exFM_logits])
# 输出层
output_layer = Dense(1, activation='sigmoid')(stack_output)
model = Model(input_layers, output_layer)
return model
if __name__ == "__main__":
# 读取数据
data = pd.read_csv('../data/criteo_sample.txt')
# 划分dense和sparse特征
columns = data.columns.values
dense_features = [feat for feat in columns if 'I' in feat]
sparse_features = [feat for feat in columns if 'C' in feat]
# 简单的数据预处理
train_data = data_process(data, dense_features, sparse_features)
train_data['label'] = data['label']
# 将特征分组,分成linear部分和dnn部分(根据实际场景进行选择),并将分组之后的特征做标记(使用DenseFeat, SparseFeat)
linear_feature_columns = [SparseFeat(feat, vocabulary_size=data[feat].nunique(),embedding_dim=4)
for i,feat in enumerate(sparse_features)] + [DenseFeat(feat, 1,)
for feat in dense_features]
dnn_feature_columns = [SparseFeat(feat, vocabulary_size=data[feat].nunique(),embedding_dim=4)
for i,feat in enumerate(sparse_features)] + [DenseFeat(feat, 1,)
for feat in dense_features]
# 构建xDeepFM模型
model = xDeepFM(linear_feature_columns, dnn_feature_columns)
model.summary()
model.compile(optimizer="adam",
loss="binary_crossentropy",
metrics=["binary_crossentropy", tf.keras.metrics.AUC(name='auc')])
# 将输入数据转化成字典的形式输入
train_model_input = {name: data[name] for name in dense_features + sparse_features}
# 模型训练
model.fit(train_model_input, train_data['label'].values,
batch_size=64, epochs=5, validation_split=0.2, )