-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathem.py
91 lines (70 loc) · 2.1 KB
/
em.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# Expectation-Maximization and mixture density algorithms
#
# [email protected], 2023
import numpy as np
import numba
@numba.njit
def gausspdf(x,mu,sigma):
""" Gaussian pdf """
return 1/np.sqrt(2*np.pi*sigma**2) * np.exp(-0.5 * ((mu-x)/sigma)**2)
@numba.njit
def mixture_nll(pdf, frac):
""" Mixture model negative Log-Likelihood
Args:
pdf : (n x K) density (pdf) values for n measurements, K classes
frac : (K) class fractions with sum 1
Returns:
nll : Negative Log-Likelihood
"""
LL = 0
# Likelihood over all measurements
for i in range(pdf.shape[0]):
f = np.sum(pdf[i,:]*frac)
LL += np.log(f)
return (-1)*LL
#@numba.njit
def EM_frac(pdf, iters=30, EPS=1E-12, verbose=True):
""" EM-algorithm for unknown integrated class fractions
Args:
pdf : (n x K) density (pdf) values for n measurements, K classes
iter : Number of iterations
Returns:
frac : Integrated class fractions
"""
n = pdf.shape[0]
K = pdf.shape[1]
P = np.zeros((n,K))
frac = np.ones(K) / K
for k in range(iters):
# Loop over observations
for i in range(n):
# E-step, obtain normalized probabilities
P[i,:] = pdf[i,:] * frac[:]
P[i,:] /= (np.sum(P[i,:]) + EPS)
# M-step, update fractions by averaging over observations
frac = np.sum(P,axis=0) / n
if verbose:
print(f'EM_frac: iter {k:4}, NLL = {mixture_nll(pdf,frac):.3f}, frac = {frac}')
return frac
def test_EM():
""" Test function to test EM iteration """
# Toy Gaussian problem
mu = np.array([0.0, 3.0])
sigma = np.array([1.0, 2.0])
N = np.array([1000, 2000]) # Number of samples
# Generate random samples for each class
S = []
for i in range(len(N)):
S.append( np.array( np.random.randn(N[i])*sigma[i] + mu[i] ))
# Concatenate into one full sample
SS = np.zeros(0)
for i in range(len(S)):
SS = np.concatenate((SS, np.array(S[i])))
# Evaluate pdf values for each class
pdf = np.zeros((len(SS), len(mu)))
for i in range(len(SS)):
for j in range(len(mu)):
pdf[i,j] = gausspdf(SS[i], mu[j], sigma[j])
# Solve the unknown fractions
frac = EM_frac(pdf)
print(f'True fractions = {N / np.sum(N)}')