forked from ddbourgin/numpy-ml
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdt.py
237 lines (196 loc) · 7.83 KB
/
dt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import numpy as np
class Node:
def __init__(self, left, right, rule):
self.left = left
self.right = right
self.feature = rule[0]
self.threshold = rule[1]
class Leaf:
def __init__(self, value):
"""
`value` is an array of class probabilities if classifier is True, else
the mean of the region
"""
self.value = value
class DecisionTree:
def __init__(
self,
classifier=True,
max_depth=None,
n_feats=None,
criterion="entropy",
seed=None,
):
"""
A decision tree model for regression and classification problems.
Parameters
----------
classifier : bool
Whether to treat target values as categorical (classifier =
True) or continuous (classifier = False). Default is True.
max_depth: int or None
The depth at which to stop growing the tree. If None, grow the tree
until all leaves are pure. Default is None.
n_feats : int
Specifies the number of features to sample on each split. If None,
use all features on each split. Default is None.
criterion : {'mse', 'entropy', 'gini'}
The error criterion to use when calculating splits. When
`classifier` is False, valid entries are {'mse'}. When `classifier`
is True, valid entries are {'entropy', 'gini'}. Default is
'entropy'.
seed : int or None
Seed for the random number generator. Default is None.
"""
if seed:
np.random.seed(seed)
self.depth = 0
self.root = None
self.n_feats = n_feats
self.criterion = criterion
self.classifier = classifier
self.max_depth = max_depth if max_depth else np.inf
if not classifier and criterion in ["gini", "entropy"]:
raise ValueError(
"{} is a valid criterion only when classifier = True.".format(criterion)
)
if classifier and criterion == "mse":
raise ValueError("`mse` is a valid criterion only when classifier = False.")
def fit(self, X, Y):
"""
Fit a binary decision tree to a dataset.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
The training data of `N` examples, each with `M` features
Y : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
An array of integer class labels for each example in `X` if
self.classifier = True, otherwise the set of target values for
each example in `X`.
"""
self.n_classes = max(Y) + 1 if self.classifier else None
self.n_feats = X.shape[1] if not self.n_feats else min(self.n_feats, X.shape[1])
self.root = self._grow(X, Y)
def predict(self, X):
"""
Use the trained decision tree to classify or predict the examples in `X`.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
The training data of `N` examples, each with `M` features
Returns
-------
preds : :py:class:`ndarray <numpy.ndarray>` of shape `(N,)`
The integer class labels predicted for each example in `X` if
self.classifier = True, otherwise the predicted target values.
"""
return np.array([self._traverse(x, self.root) for x in X])
def predict_class_probs(self, X):
"""
Use the trained decision tree to return the class probabilities for the
examples in `X`.
Parameters
----------
X : :py:class:`ndarray <numpy.ndarray>` of shape `(N, M)`
The training data of `N` examples, each with `M` features
Returns
-------
preds : :py:class:`ndarray <numpy.ndarray>` of shape `(N, n_classes)`
The class probabilities predicted for each example in `X`.
"""
assert self.classifier, "`predict_class_probs` undefined for classifier = False"
return np.array([self._traverse(x, self.root, prob=True) for x in X])
def _grow(self, X, Y, cur_depth=0):
# if all labels are the same, return a leaf
if len(set(Y)) == 1:
if self.classifier:
prob = np.zeros(self.n_classes)
prob[Y[0]] = 1.0
return Leaf(prob) if self.classifier else Leaf(Y[0])
# if we have reached max_depth, return a leaf
if cur_depth >= self.max_depth:
v = np.mean(Y, axis=0)
if self.classifier:
v = np.bincount(Y, minlength=self.n_classes) / len(Y)
return Leaf(v)
cur_depth += 1
self.depth = max(self.depth, cur_depth)
N, M = X.shape
feat_idxs = np.random.choice(M, self.n_feats, replace=False)
# greedily select the best split according to `criterion`
feat, thresh = self._segment(X, Y, feat_idxs)
l = np.argwhere(X[:, feat] <= thresh).flatten()
r = np.argwhere(X[:, feat] > thresh).flatten()
# grow the children that result from the split
left = self._grow(X[l, :], Y[l], cur_depth)
right = self._grow(X[r, :], Y[r], cur_depth)
return Node(left, right, (feat, thresh))
def _segment(self, X, Y, feat_idxs):
"""
Find the optimal split rule (feature index and split threshold) for the
data according to `self.criterion`.
"""
best_gain = -np.inf
split_idx, split_thresh = None, None
for i in feat_idxs:
vals = X[:, i]
levels = np.unique(vals)
thresholds = (levels[:-1] + levels[1:]) / 2 if len(levels) > 1 else levels
gains = np.array([self._impurity_gain(Y, t, vals) for t in thresholds])
if gains.max() > best_gain:
split_idx = i
best_gain = gains.max()
split_thresh = thresholds[gains.argmax()]
return split_idx, split_thresh
def _impurity_gain(self, Y, split_thresh, feat_values):
"""
Compute the impurity gain associated with a given split.
IG(split) = loss(parent) - weighted_avg[loss(left_child), loss(right_child)]
"""
if self.criterion == "entropy":
loss = entropy
elif self.criterion == "gini":
loss = gini
elif self.criterion == "mse":
loss = mse
parent_loss = loss(Y)
# generate split
left = np.argwhere(feat_values <= split_thresh).flatten()
right = np.argwhere(feat_values > split_thresh).flatten()
if len(left) == 0 or len(right) == 0:
return 0
# compute the weighted avg. of the loss for the children
n = len(Y)
n_l, n_r = len(left), len(right)
e_l, e_r = loss(Y[left]), loss(Y[right])
child_loss = (n_l / n) * e_l + (n_r / n) * e_r
# impurity gain is difference in loss before vs. after split
ig = parent_loss - child_loss
return ig
def _traverse(self, X, node, prob=False):
if isinstance(node, Leaf):
if self.classifier:
return node.value if prob else node.value.argmax()
return node.value
if X[node.feature] <= node.threshold:
return self._traverse(X, node.left, prob)
return self._traverse(X, node.right, prob)
def mse(y):
"""
Mean squared error for decision tree (ie., mean) predictions
"""
return np.mean((y - np.mean(y)) ** 2)
def entropy(y):
"""
Entropy of a label sequence
"""
hist = np.bincount(y)
ps = hist / np.sum(hist)
return -np.sum([p * np.log2(p) for p in ps if p > 0])
def gini(y):
"""
Gini impurity (local entropy) of a label sequence
"""
hist = np.bincount(y)
N = np.sum(hist)
return 1 - sum([(i / N) ** 2 for i in hist])