-
Notifications
You must be signed in to change notification settings - Fork 145
/
Copy pathregression_tree.py
208 lines (166 loc) · 5.87 KB
/
regression_tree.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python
# -*- coding: utf-8 -*-
''' 回归树实现
'''
import uuid
from functools import namedtuple
import numpy as np
import matplotlib.pyplot as plt
def load_data(filename):
''' 加载文本文件中的数据.
'''
dataset = []
with open(filename, 'r') as f:
for line in f:
line_data = [float(data) for data in line.split()]
dataset.append(line_data)
return dataset
def split_dataset(dataset, feat_idx, value):
''' 根据给定的特征编号和特征值对数据集进行分割
'''
ldata, rdata = [], []
for data in dataset:
if data[feat_idx] < value:
ldata.append(data)
else:
rdata.append(data)
return ldata, rdata
def create_tree(dataset, fleaf, ferr, opt=None):
''' 递归创建树结构
dataset: 待划分的数据集
fleaf: 创建叶子节点的函数
ferr: 计算数据误差的函数
opt: 回归树参数.
err_tolerance: 最小误差下降值;
n_tolerance: 数据切分最小样本数
'''
if opt is None:
opt = {'err_tolerance': 1, 'n_tolerance': 4}
# 选择最优化分特征和特征值
feat_idx, value = choose_best_feature(dataset, fleaf, ferr, opt)
# 触底条件
if feat_idx is None:
return value
# 创建回归树
tree = {'feat_idx': feat_idx, 'feat_val': value}
# 递归创建左子树和右子树
ldata, rdata = split_dataset(dataset, feat_idx, value)
ltree = create_tree(ldata, fleaf, ferr, opt)
rtree = create_tree(rdata, fleaf, ferr, opt)
tree['left'] = ltree
tree['right'] = rtree
return tree
def fleaf(dataset):
''' 计算给定数据的叶节点数值, 这里为均值
'''
dataset = np.array(dataset)
return np.mean(dataset[:, -1])
def ferr(dataset):
''' 计算数据集的误差.
'''
dataset = np.array(dataset)
m, _ = dataset.shape
return np.var(dataset[:, -1])*dataset.shape[0]
def choose_best_feature(dataset, fleaf, ferr, opt):
''' 选取最佳分割特征和特征值
dataset: 待划分的数据集
fleaf: 创建叶子节点的函数
ferr: 计算数据误差的函数
opt: 回归树参数.
err_tolerance: 最小误差下降值;
n_tolerance: 数据切分最小样本数
'''
dataset = np.array(dataset)
m, n = dataset.shape
err_tolerance, n_tolerance = opt['err_tolerance'], opt['n_tolerance']
err = ferr(dataset)
best_feat_idx, best_feat_val, best_err = 0, 0, float('inf')
# 遍历所有特征
for feat_idx in range(n-1):
values = dataset[:, feat_idx]
# 遍历所有特征值
for val in values:
# 按照当前特征和特征值分割数据
ldata, rdata = split_dataset(dataset.tolist(), feat_idx, val)
if len(ldata) < n_tolerance or len(rdata) < n_tolerance:
# 如果切分的样本量太小
continue
# 计算误差
new_err = ferr(ldata) + ferr(rdata)
if new_err < best_err:
best_feat_idx = feat_idx
best_feat_val = val
best_err = new_err
# 如果误差变化并不大归为一类
if abs(err - best_err) < err_tolerance:
return None, fleaf(dataset)
# 检查分割样本量是不是太小
ldata, rdata = split_dataset(dataset.tolist(), best_feat_idx, best_feat_val)
if len(ldata) < n_tolerance or len(rdata) < n_tolerance:
return None, fleaf(dataset)
return best_feat_idx, best_feat_val
def get_nodes_edges(tree, root_node=None):
''' 返回树中所有节点和边
'''
Node = namedtuple('Node', ['id', 'label'])
Edge = namedtuple('Edge', ['start', 'end'])
nodes, edges = [], []
if type(tree) is not dict:
return nodes, edges
if root_node is None:
label = '{}: {}'.format(tree['feat_idx'], tree['feat_val'])
root_node = Node._make([uuid.uuid4(), label])
nodes.append(root_node)
for sub_tree in (tree['left'], tree['right']):
if type(sub_tree) is dict:
node_label = '{}: {}'.format(sub_tree['feat_idx'], sub_tree['feat_val'])
else:
node_label = '{:.2f}'.format(sub_tree)
sub_node = Node._make([uuid.uuid4(), node_label])
nodes.append(sub_node)
edge = Edge._make([root_node, sub_node])
edges.append(edge)
sub_nodes, sub_edges = get_nodes_edges(sub_tree, root_node=sub_node)
nodes.extend(sub_nodes)
edges.extend(sub_edges)
return nodes, edges
def dotify(tree):
''' 获取树的Graphviz Dot文件的内容
'''
content = 'digraph decision_tree {\n'
nodes, edges = get_nodes_edges(tree)
for node in nodes:
content += ' "{}" [label="{}"];\n'.format(node.id, node.label)
for edge in edges:
start, end = edge.start, edge.end
content += ' "{}" -> "{}";\n'.format(start.id, end.id)
content += '}'
return content
def tree_predict(data, tree):
''' 根据给定的回归树预测数据值
'''
if type(tree) is not dict:
return tree
feat_idx, feat_val = tree['feat_idx'], tree['feat_val']
if data[feat_idx] < feat_val:
sub_tree = tree['left']
else:
sub_tree = tree['right']
return tree_predict(data, sub_tree)
if '__main__' == __name__:
datafile = 'ex0.txt'
dataset = load_data(datafile)
tree = create_tree(dataset, fleaf, ferr, opt={'n_tolerance': 4,
'err_tolerance': 1})
dotfile = '{}.dot'.format(datafile.split('.')[0])
with open(dotfile, 'w') as f:
content = dotify(tree)
f.write(content)
dataset = np.array(dataset)
# 绘制散点
plt.scatter(dataset[:, 0], dataset[:, 1])
# 绘制回归曲线
x = np.linspace(0, 1, 50)
y = [tree_predict([i], tree) for i in x]
plt.plot(x, y, c='r')
plt.show()