-
Notifications
You must be signed in to change notification settings - Fork 0
/
Estimation.py
110 lines (93 loc) · 4.44 KB
/
Estimation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import numpy as np
from Optimization import coordinate_descent_lasso, coordinate_descent_adaptive_lasso, lasso_cv, adalasso_cv
def MuSP(X, y, n_itr=6, num_iters = 100):
'''Return the coefficients vector and residual vector estimated by the MuSP.
There is no need for intercept row or bias row, just pass the original design matrix
As functions in one file do not inherit the default values of arguments from functions in another file,
I have to redefine num_iters again, even though is it totally redundant.'''
global resid
m = X.shape[1]
n = X.shape[0]
y = y.reshape(-1, 1) # Convert the 1d array into the 2d array to avoid improper broadcasting
ones_ = np.ones((n, 1))
X_augmented = np.column_stack((ones_, X))
X = X_augmented
active_index_theta = np.array(range(X.shape[1]))
active_index_XAug = np.array(range(X.shape[1]))
theta_MuSP = np.full(m+1, 0)
#resid_holder = np.full((m, n_itr), np.nan)
theta_active = np.zeros(m + 1) # Initialize the coefficients vector randomly so pycharm stops giving me warning.
for i in range(n_itr):
if i == 0:
best_lamda = lasso_cv(X = X[:, active_index_theta], y = y, num_iters = num_iters, lambdas = np.logspace(0, 10, 100)/10, k_folds = 5, intercept=True)
theta_active = coordinate_descent_lasso(X = X[:, active_index_theta], y = y, lamda = best_lamda)
#active_index_XAug = active_index_theta[1:] - 1
#resid_holder[:, 0] = resid
else:
best_lamda = adalasso_cv(theta = theta_active, X = X_augmented[:,active_index_theta], y = y, num_iters = num_iters, lambdas = np.logspace(0, 10, 100)/10, k_folds = 5, intercept=True)
theta_active = coordinate_descent_adaptive_lasso(theta = theta_active, X = X_augmented[:,active_index_theta], y = y, lamda = best_lamda)
#active_index_XAug = active_index_theta[1:] - 1
#resid_holder[:,i] = resid
resid = y - X_augmented[:, active_index_theta] @ theta_active.T.reshape(-1, 1)
active_index_theta = active_index_theta[theta_active != 0]
theta_active = theta_active[theta_active != 0]
theta_MuSP[active_index_theta] = theta_active
return theta_MuSP, resid
def embed(data, lag):
"""
Create a lagged embedding of the input series.
Parameters:
series (array-like): Input time series.
lag (int): Number of lags (including the current time step).
Returns:
numpy.ndarray: Matrix with lagged values as columns.
"""
n = len(data)
embedded_matrix = np.column_stack([data[lag - i: n - i] for i in range(lag + 1)]) # [:] works for both nparray and df
return embedded_matrix
def MuSP_ts(X, n_lags):
"""
There is no need to return the lagged version of the design matrix, just pass the original one.
:param X: design matrix
:param n_lags: number of lags
:return: theta, residual matrix
"""
k = X.shape[1] # get the number of covariates
X_ = embed(X, n_lags)
y = X_[:,0:k]
X = X_[:,k:]
t = X.shape[0]
Phi = np.full((k, k * n_lags), np.nan)
resid_holder = np.full((t,k), np.nan)
for i in range(k):
theta, resid = MuSP(X, y[:,i], n_itr=6)
Phi[i,:] = theta[1:]
resid_holder[:,i] = resid.flatten()
Sigma = resid_holder.T @ resid_holder / t
return Phi, Sigma
def lasso_fit(X, y, num_iters):
m = X.shape[1]
n = X.shape[0]
# X = X / np.linalg.norm(X, axis=0)
y = y.reshape(-1, 1) # Convert the 1d array into the 2d array to avoid improper broadcasting
ones_ = np.ones((n, 1))
X_augmented = np.column_stack((ones_, X))
X = X_augmented
best_lamda = lasso_cv(X=X, y=y, num_iters=num_iters, lambdas=np.logspace(-2, 2, 100),
k_folds=5, intercept=True)
theta = coordinate_descent_lasso(X=X, y=y, lamda=best_lamda)
return theta
def adaptive_lasso_fit(X, y, num_iters):
m = X.shape[1]
n = X.shape[0]
y = y.reshape(-1, 1) # Convert the 1d array into the 2d array to avoid improper broadcasting
ones_ = np.ones((n, 1))
X_augmented = np.column_stack((ones_, X))
X = X_augmented
XtX = np.dot(X.T, X)
XtX_inv = np.linalg.inv(XtX)
Xty = np.dot(X.T, y)
beta = np.dot(XtX_inv, Xty)
best_lamda = adalasso_cv(beta, X, y, num_iters, lambdas=np.logspace(0, 10, 100) / 10, k_folds=5, intercept=True)
theta = coordinate_descent_adaptive_lasso(beta, X, y, lamda=best_lamda, num_iters=100, intercept=True)
return theta